inbound.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. // +build !confonly
  2. package inbound
  3. //go:generate errorgen
  4. import (
  5. "context"
  6. "io"
  7. "strconv"
  8. "time"
  9. "v2ray.com/core"
  10. "v2ray.com/core/common"
  11. "v2ray.com/core/common/buf"
  12. "v2ray.com/core/common/errors"
  13. "v2ray.com/core/common/log"
  14. "v2ray.com/core/common/net"
  15. "v2ray.com/core/common/protocol"
  16. "v2ray.com/core/common/retry"
  17. "v2ray.com/core/common/session"
  18. "v2ray.com/core/common/signal"
  19. "v2ray.com/core/common/task"
  20. "v2ray.com/core/features/dns"
  21. feature_inbound "v2ray.com/core/features/inbound"
  22. "v2ray.com/core/features/policy"
  23. "v2ray.com/core/features/routing"
  24. "v2ray.com/core/proxy/vless"
  25. "v2ray.com/core/proxy/vless/encoding"
  26. "v2ray.com/core/transport/internet"
  27. )
  28. func init() {
  29. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  30. var dc dns.Client
  31. if err := core.RequireFeatures(ctx, func(d dns.Client) error {
  32. dc = d
  33. return nil
  34. }); err != nil {
  35. return nil, err
  36. }
  37. return New(ctx, config.(*Config), dc)
  38. }))
  39. }
  40. // Handler is an inbound connection handler that handles messages in VLess protocol.
  41. type Handler struct {
  42. inboundHandlerManager feature_inbound.Manager
  43. policyManager policy.Manager
  44. validator *vless.Validator
  45. dns dns.Client
  46. fallback *Fallback // or nil
  47. addrport string
  48. }
  49. // New creates a new VLess inbound handler.
  50. func New(ctx context.Context, config *Config, dc dns.Client) (*Handler, error) {
  51. v := core.MustFromContext(ctx)
  52. handler := &Handler{
  53. inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager),
  54. policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
  55. validator: new(vless.Validator),
  56. dns: dc,
  57. }
  58. for _, user := range config.User {
  59. u, err := user.ToMemoryUser()
  60. if err != nil {
  61. return nil, newError("failed to get VLESS user").Base(err).AtError()
  62. }
  63. if err := handler.AddUser(ctx, u); err != nil {
  64. return nil, newError("failed to initiate user").Base(err).AtError()
  65. }
  66. }
  67. if config.Fallback != nil {
  68. handler.fallback = config.Fallback
  69. handler.addrport = handler.fallback.Addr.AsAddress().String() + ":" + strconv.Itoa(int(handler.fallback.Port))
  70. }
  71. return handler, nil
  72. }
  73. // Close implements common.Closable.Close().
  74. func (h *Handler) Close() error {
  75. return errors.Combine(common.Close(h.validator))
  76. }
  77. // AddUser implements proxy.UserManager.AddUser().
  78. func (h *Handler) AddUser(ctx context.Context, u *protocol.MemoryUser) error {
  79. return h.validator.Add(u)
  80. }
  81. // RemoveUser implements proxy.UserManager.RemoveUser().
  82. func (h *Handler) RemoveUser(ctx context.Context, e string) error {
  83. return h.validator.Del(e)
  84. }
  85. // Network implements proxy.Inbound.Network().
  86. func (*Handler) Network() []net.Network {
  87. return []net.Network{net.Network_TCP}
  88. }
  89. // Process implements proxy.Inbound.Process().
  90. func (h *Handler) Process(ctx context.Context, network net.Network, connection internet.Connection, dispatcher routing.Dispatcher) error {
  91. sessionPolicy := h.policyManager.ForLevel(0)
  92. if err := connection.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil {
  93. return newError("unable to set read deadline").Base(err).AtWarning()
  94. }
  95. first := buf.New()
  96. first.ReadFrom(connection)
  97. sid := session.ExportIDToError(ctx)
  98. newError("firstLen = ", first.Len()).AtInfo().WriteToLog(sid)
  99. reader := &buf.BufferedReader{
  100. Reader: buf.NewReader(connection),
  101. Buffer: buf.MultiBuffer{first},
  102. }
  103. var request *protocol.RequestHeader
  104. var requestAddons *encoding.Addons
  105. var err error
  106. var pre *buf.Buffer
  107. if h.fallback != nil && first.Len() < 18 {
  108. err = newError("fallback directly")
  109. pre = buf.New()
  110. } else {
  111. request, requestAddons, err, pre = encoding.DecodeRequestHeader(reader, h.validator)
  112. }
  113. if err != nil {
  114. if h.fallback != nil && pre != nil {
  115. newError("fallback starts").AtInfo().WriteToLog(sid)
  116. var conn net.Conn
  117. if err := retry.ExponentialBackoff(5, 100).On(func() error {
  118. var dialer net.Dialer
  119. var err error
  120. if h.fallback.Unix != "" {
  121. conn, err = dialer.DialContext(ctx, "unix", h.fallback.Unix)
  122. } else {
  123. conn, err = dialer.DialContext(ctx, "tcp", h.addrport)
  124. }
  125. if err != nil {
  126. return err
  127. }
  128. return nil
  129. }); err != nil {
  130. return newError("failed to fallback connection").Base(err).AtWarning()
  131. }
  132. defer conn.Close() // nolint: errcheck
  133. ctx, cancel := context.WithCancel(ctx)
  134. timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
  135. writer := buf.NewWriter(connection)
  136. serverReader := buf.NewReader(conn)
  137. serverWriter := buf.NewWriter(conn)
  138. postRequest := func() error {
  139. defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
  140. if pre.Len() > 0 {
  141. if err := serverWriter.WriteMultiBuffer(buf.MultiBuffer{pre}); err != nil {
  142. return newError("failed to fallback request pre").Base(err).AtWarning()
  143. }
  144. }
  145. if err := buf.Copy(reader, serverWriter, buf.UpdateActivity(timer)); err != nil {
  146. return err // ...
  147. }
  148. return nil
  149. }
  150. getResponse := func() error {
  151. defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
  152. if err := buf.Copy(serverReader, writer, buf.UpdateActivity(timer)); err != nil {
  153. return err // ...
  154. }
  155. return nil
  156. }
  157. if err := task.Run(ctx, task.OnSuccess(postRequest, task.Close(serverWriter)), task.OnSuccess(getResponse, task.Close(writer))); err != nil {
  158. common.Interrupt(serverReader)
  159. common.Interrupt(serverWriter)
  160. return newError("fallback ends").Base(err).AtInfo()
  161. }
  162. return nil
  163. }
  164. if errors.Cause(err) != io.EOF {
  165. log.Record(&log.AccessMessage{
  166. From: connection.RemoteAddr(),
  167. To: "",
  168. Status: log.AccessRejected,
  169. Reason: err,
  170. })
  171. err = newError("invalid request from ", connection.RemoteAddr()).Base(err).AtWarning()
  172. }
  173. return err
  174. }
  175. if request.Command != protocol.RequestCommandMux {
  176. ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
  177. From: connection.RemoteAddr(),
  178. To: request.Destination(),
  179. Status: log.AccessAccepted,
  180. Reason: "",
  181. Email: request.User.Email,
  182. })
  183. }
  184. newError("received request for ", request.Destination()).AtInfo().WriteToLog(sid)
  185. if err := connection.SetReadDeadline(time.Time{}); err != nil {
  186. newError("unable to set back read deadline").Base(err).AtWarning().WriteToLog(sid)
  187. }
  188. inbound := session.InboundFromContext(ctx)
  189. if inbound == nil {
  190. panic("no inbound metadata")
  191. }
  192. inbound.User = request.User
  193. sessionPolicy = h.policyManager.ForLevel(request.User.Level)
  194. ctx, cancel := context.WithCancel(ctx)
  195. timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
  196. ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
  197. link, err := dispatcher.Dispatch(ctx, request.Destination())
  198. if err != nil {
  199. return newError("failed to dispatch request to ", request.Destination()).Base(err).AtWarning()
  200. }
  201. serverReader := link.Reader
  202. serverWriter := link.Writer
  203. postRequest := func() error {
  204. defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
  205. // default: clientReader := reader
  206. clientReader := encoding.DecodeBodyAddons(reader, request, requestAddons)
  207. // from clientReader.ReadMultiBuffer to serverWriter.WriteMultiBufer
  208. if err := buf.Copy(clientReader, serverWriter, buf.UpdateActivity(timer)); err != nil {
  209. return newError("failed to transfer request payload").Base(err).AtInfo()
  210. }
  211. return nil
  212. }
  213. getResponse := func() error {
  214. defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
  215. responseAddons := &encoding.Addons{
  216. Scheduler: requestAddons.Scheduler,
  217. }
  218. bufferWriter := buf.NewBufferedWriter(buf.NewWriter(connection))
  219. if err := encoding.EncodeResponseHeader(bufferWriter, request, responseAddons); err != nil {
  220. return newError("failed to encode response header").Base(err).AtWarning()
  221. }
  222. // default: clientWriter := bufferWriter
  223. clientWriter := encoding.EncodeBodyAddons(bufferWriter, request, responseAddons)
  224. {
  225. multiBuffer, err := serverReader.ReadMultiBuffer()
  226. if err != nil {
  227. return err // ...
  228. }
  229. if err := clientWriter.WriteMultiBuffer(multiBuffer); err != nil {
  230. return err // ...
  231. }
  232. }
  233. // Flush; bufferWriter.WriteMultiBufer now is bufferWriter.writer.WriteMultiBuffer
  234. if err := bufferWriter.SetBuffered(false); err != nil {
  235. return newError("failed to write A response payload").Base(err).AtWarning()
  236. }
  237. // from serverReader.ReadMultiBuffer to clientWriter.WriteMultiBufer
  238. if err := buf.Copy(serverReader, clientWriter, buf.UpdateActivity(timer)); err != nil {
  239. return newError("failed to transfer response payload").Base(err).AtInfo()
  240. }
  241. // Indicates the end of response payload.
  242. switch responseAddons.Scheduler {
  243. default:
  244. }
  245. return nil
  246. }
  247. if err := task.Run(ctx, task.OnSuccess(postRequest, task.Close(serverWriter)), getResponse); err != nil {
  248. common.Interrupt(serverReader)
  249. common.Interrupt(serverWriter)
  250. return newError("connection ends").Base(err).AtInfo()
  251. }
  252. return nil
  253. }