inbound.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package inbound
  2. import (
  3. "io"
  4. "sync"
  5. "v2ray.com/core/app"
  6. "v2ray.com/core/app/dispatcher"
  7. "v2ray.com/core/app/proxyman"
  8. "v2ray.com/core/common"
  9. "v2ray.com/core/common/buf"
  10. "v2ray.com/core/common/bufio"
  11. "v2ray.com/core/common/errors"
  12. "v2ray.com/core/common/log"
  13. v2net "v2ray.com/core/common/net"
  14. "v2ray.com/core/common/protocol"
  15. "v2ray.com/core/common/serial"
  16. "v2ray.com/core/common/uuid"
  17. "v2ray.com/core/proxy"
  18. "v2ray.com/core/proxy/vmess"
  19. "v2ray.com/core/proxy/vmess/encoding"
  20. "v2ray.com/core/transport/internet"
  21. )
  22. type userByEmail struct {
  23. sync.RWMutex
  24. cache map[string]*protocol.User
  25. defaultLevel uint32
  26. defaultAlterIDs uint16
  27. }
  28. func NewUserByEmail(users []*protocol.User, config *DefaultConfig) *userByEmail {
  29. cache := make(map[string]*protocol.User)
  30. for _, user := range users {
  31. cache[user.Email] = user
  32. }
  33. return &userByEmail{
  34. cache: cache,
  35. defaultLevel: config.Level,
  36. defaultAlterIDs: uint16(config.AlterId),
  37. }
  38. }
  39. func (v *userByEmail) Get(email string) (*protocol.User, bool) {
  40. var user *protocol.User
  41. var found bool
  42. v.RLock()
  43. user, found = v.cache[email]
  44. v.RUnlock()
  45. if !found {
  46. v.Lock()
  47. user, found = v.cache[email]
  48. if !found {
  49. account := &vmess.Account{
  50. Id: uuid.New().String(),
  51. AlterId: uint32(v.defaultAlterIDs),
  52. }
  53. user = &protocol.User{
  54. Level: v.defaultLevel,
  55. Email: email,
  56. Account: serial.ToTypedMessage(account),
  57. }
  58. v.cache[email] = user
  59. }
  60. v.Unlock()
  61. }
  62. return user, found
  63. }
  64. // Inbound connection handler that handles messages in VMess format.
  65. type VMessInboundHandler struct {
  66. sync.RWMutex
  67. packetDispatcher dispatcher.PacketDispatcher
  68. inboundHandlerManager proxyman.InboundHandlerManager
  69. clients protocol.UserValidator
  70. usersByEmail *userByEmail
  71. accepting bool
  72. listener *internet.TCPHub
  73. detours *DetourConfig
  74. meta *proxy.InboundHandlerMeta
  75. }
  76. func (v *VMessInboundHandler) Port() v2net.Port {
  77. return v.meta.Port
  78. }
  79. func (v *VMessInboundHandler) Close() {
  80. v.accepting = false
  81. if v.listener != nil {
  82. v.Lock()
  83. v.listener.Close()
  84. v.listener = nil
  85. v.clients.Release()
  86. v.clients = nil
  87. v.Unlock()
  88. }
  89. }
  90. func (v *VMessInboundHandler) GetUser(email string) *protocol.User {
  91. v.RLock()
  92. defer v.RUnlock()
  93. if !v.accepting {
  94. return nil
  95. }
  96. user, existing := v.usersByEmail.Get(email)
  97. if !existing {
  98. v.clients.Add(user)
  99. }
  100. return user
  101. }
  102. func (v *VMessInboundHandler) Start() error {
  103. if v.accepting {
  104. return nil
  105. }
  106. tcpListener, err := internet.ListenTCP(v.meta.Address, v.meta.Port, v.HandleConnection, v.meta.StreamSettings)
  107. if err != nil {
  108. log.Error("VMess|Inbound: Unable to listen tcp ", v.meta.Address, ":", v.meta.Port, ": ", err)
  109. return err
  110. }
  111. v.accepting = true
  112. v.Lock()
  113. v.listener = tcpListener
  114. v.Unlock()
  115. return nil
  116. }
  117. func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
  118. defer connection.Close()
  119. if !v.accepting {
  120. return
  121. }
  122. connReader := v2net.NewTimeOutReader(8, connection)
  123. defer connReader.Release()
  124. reader := bufio.NewReader(connReader)
  125. defer reader.Release()
  126. v.RLock()
  127. if !v.accepting {
  128. v.RUnlock()
  129. return
  130. }
  131. session := encoding.NewServerSession(v.clients)
  132. defer session.Release()
  133. request, err := session.DecodeRequestHeader(reader)
  134. v.RUnlock()
  135. if err != nil {
  136. if errors.Cause(err) != io.EOF {
  137. log.Access(connection.RemoteAddr(), "", log.AccessRejected, err)
  138. log.Info("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err)
  139. }
  140. connection.SetReusable(false)
  141. return
  142. }
  143. log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
  144. log.Info("VMessIn: Received request for ", request.Destination())
  145. connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
  146. ray := v.packetDispatcher.DispatchToOutbound(&proxy.SessionInfo{
  147. Source: v2net.DestinationFromAddr(connection.RemoteAddr()),
  148. Destination: request.Destination(),
  149. User: request.User,
  150. Inbound: v.meta,
  151. })
  152. input := ray.InboundInput()
  153. output := ray.InboundOutput()
  154. defer input.Close()
  155. defer output.Release()
  156. var readFinish sync.Mutex
  157. readFinish.Lock()
  158. userSettings := request.User.GetSettings()
  159. connReader.SetTimeOut(userSettings.PayloadReadTimeout)
  160. reader.SetBuffered(false)
  161. go func() {
  162. bodyReader := session.DecodeRequestBody(request, reader)
  163. if err := buf.PipeUntilEOF(bodyReader, input); err != nil {
  164. log.Debug("VMess|Inbound: Error when sending data to outbound: ", err)
  165. connection.SetReusable(false)
  166. }
  167. bodyReader.Release()
  168. input.Close()
  169. readFinish.Unlock()
  170. }()
  171. writer := bufio.NewWriter(connection)
  172. defer writer.Release()
  173. response := &protocol.ResponseHeader{
  174. Command: v.generateCommand(request),
  175. }
  176. if connection.Reusable() {
  177. response.Option.Set(protocol.ResponseOptionConnectionReuse)
  178. }
  179. session.EncodeResponseHeader(response, writer)
  180. bodyWriter := session.EncodeResponseBody(request, writer)
  181. // Optimize for small response packet
  182. if data, err := output.Read(); err == nil {
  183. if err := bodyWriter.Write(data); err != nil {
  184. connection.SetReusable(false)
  185. }
  186. writer.SetBuffered(false)
  187. if err := buf.PipeUntilEOF(output, bodyWriter); err != nil {
  188. log.Debug("VMess|Inbound: Error when sending data to downstream: ", err)
  189. connection.SetReusable(false)
  190. }
  191. }
  192. output.Release()
  193. if request.Option.Has(protocol.RequestOptionChunkStream) {
  194. if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
  195. connection.SetReusable(false)
  196. }
  197. }
  198. writer.Flush()
  199. bodyWriter.Release()
  200. readFinish.Lock()
  201. }
  202. type Factory struct{}
  203. func (v *Factory) StreamCapability() v2net.NetworkList {
  204. return v2net.NetworkList{
  205. Network: []v2net.Network{v2net.Network_TCP, v2net.Network_KCP, v2net.Network_WebSocket},
  206. }
  207. }
  208. func (v *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.InboundHandlerMeta) (proxy.InboundHandler, error) {
  209. if !space.HasApp(dispatcher.APP_ID) {
  210. return nil, common.ErrBadConfiguration
  211. }
  212. config := rawConfig.(*Config)
  213. allowedClients := vmess.NewTimedUserValidator(protocol.DefaultIDHash)
  214. for _, user := range config.User {
  215. allowedClients.Add(user)
  216. }
  217. handler := &VMessInboundHandler{
  218. packetDispatcher: space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher),
  219. clients: allowedClients,
  220. detours: config.Detour,
  221. usersByEmail: NewUserByEmail(config.User, config.GetDefaultValue()),
  222. meta: meta,
  223. }
  224. if space.HasApp(proxyman.APP_ID_INBOUND_MANAGER) {
  225. handler.inboundHandlerManager = space.GetApp(proxyman.APP_ID_INBOUND_MANAGER).(proxyman.InboundHandlerManager)
  226. }
  227. return handler, nil
  228. }
  229. func init() {
  230. proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
  231. }