inbound.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package inbound
  2. import (
  3. "crypto/md5"
  4. "io"
  5. "net"
  6. "sync"
  7. "github.com/v2ray/v2ray-core/app"
  8. "github.com/v2ray/v2ray-core/common/alloc"
  9. v2crypto "github.com/v2ray/v2ray-core/common/crypto"
  10. "github.com/v2ray/v2ray-core/common/log"
  11. v2net "github.com/v2ray/v2ray-core/common/net"
  12. "github.com/v2ray/v2ray-core/common/retry"
  13. "github.com/v2ray/v2ray-core/proxy"
  14. "github.com/v2ray/v2ray-core/proxy/internal"
  15. "github.com/v2ray/v2ray-core/proxy/vmess"
  16. "github.com/v2ray/v2ray-core/proxy/vmess/protocol"
  17. "github.com/v2ray/v2ray-core/proxy/vmess/protocol/user"
  18. )
  19. // Inbound connection handler that handles messages in VMess format.
  20. type VMessInboundHandler struct {
  21. sync.Mutex
  22. space app.Space
  23. clients user.UserSet
  24. accepting bool
  25. listener *net.TCPListener
  26. }
  27. func NewVMessInboundHandler(space app.Space, clients user.UserSet) *VMessInboundHandler {
  28. return &VMessInboundHandler{
  29. space: space,
  30. clients: clients,
  31. }
  32. }
  33. func (this *VMessInboundHandler) Close() {
  34. this.accepting = false
  35. if this.listener != nil {
  36. this.listener.Close()
  37. this.Lock()
  38. this.listener = nil
  39. this.Unlock()
  40. }
  41. }
  42. func (this *VMessInboundHandler) Listen(port v2net.Port) error {
  43. listener, err := net.ListenTCP("tcp", &net.TCPAddr{
  44. IP: []byte{0, 0, 0, 0},
  45. Port: int(port),
  46. Zone: "",
  47. })
  48. if err != nil {
  49. log.Error("Unable to listen tcp port %d: %v", port, err)
  50. return err
  51. }
  52. this.accepting = true
  53. this.listener = listener
  54. go this.AcceptConnections()
  55. return nil
  56. }
  57. func (this *VMessInboundHandler) AcceptConnections() error {
  58. for this.accepting {
  59. retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
  60. if !this.accepting {
  61. return nil
  62. }
  63. this.Lock()
  64. defer this.Unlock()
  65. if this.listener != nil {
  66. connection, err := this.listener.AcceptTCP()
  67. if err != nil {
  68. log.Error("Failed to accpet connection: %s", err.Error())
  69. return err
  70. }
  71. go this.HandleConnection(connection)
  72. }
  73. return nil
  74. })
  75. }
  76. return nil
  77. }
  78. func (this *VMessInboundHandler) HandleConnection(connection *net.TCPConn) error {
  79. defer connection.Close()
  80. connReader := v2net.NewTimeOutReader(16, connection)
  81. requestReader := protocol.NewVMessRequestReader(this.clients)
  82. request, err := requestReader.Read(connReader)
  83. if err != nil {
  84. log.Access(connection.RemoteAddr().String(), "", log.AccessRejected, err.Error())
  85. log.Warning("VMessIn: Invalid request from (%s): %v", connection.RemoteAddr().String(), err)
  86. return err
  87. }
  88. log.Access(connection.RemoteAddr().String(), request.Address.String(), log.AccessAccepted, "")
  89. log.Debug("VMessIn: Received request for %s", request.Address.String())
  90. ray := this.space.PacketDispatcher().DispatchToOutbound(v2net.NewPacket(request.Destination(), nil, true))
  91. input := ray.InboundInput()
  92. output := ray.InboundOutput()
  93. var readFinish, writeFinish sync.Mutex
  94. readFinish.Lock()
  95. writeFinish.Lock()
  96. userSettings := vmess.GetUserSettings(request.User.Level())
  97. connReader.SetTimeOut(userSettings.PayloadReadTimeout)
  98. go handleInput(request, connReader, input, &readFinish)
  99. responseKey := md5.Sum(request.RequestKey)
  100. responseIV := md5.Sum(request.RequestIV)
  101. aesStream, err := v2crypto.NewAesEncryptionStream(responseKey[:], responseIV[:])
  102. if err != nil {
  103. log.Error("VMessIn: Failed to create AES decryption stream: %v", err)
  104. return err
  105. }
  106. responseWriter := v2crypto.NewCryptionWriter(aesStream, connection)
  107. // Optimize for small response packet
  108. buffer := alloc.NewLargeBuffer().Clear()
  109. buffer.AppendBytes(request.ResponseHeader[0] ^ request.ResponseHeader[1])
  110. buffer.AppendBytes(request.ResponseHeader[2] ^ request.ResponseHeader[3])
  111. buffer.AppendBytes(byte(0), byte(0))
  112. if data, open := <-output; open {
  113. buffer.Append(data.Value)
  114. data.Release()
  115. responseWriter.Write(buffer.Value)
  116. buffer.Release()
  117. go handleOutput(request, responseWriter, output, &writeFinish)
  118. writeFinish.Lock()
  119. }
  120. connection.CloseWrite()
  121. readFinish.Lock()
  122. return nil
  123. }
  124. func handleInput(request *protocol.VMessRequest, reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) {
  125. defer close(input)
  126. defer finish.Unlock()
  127. aesStream, err := v2crypto.NewAesDecryptionStream(request.RequestKey, request.RequestIV)
  128. if err != nil {
  129. log.Error("VMessIn: Failed to create AES decryption stream: %v", err)
  130. return
  131. }
  132. requestReader := v2crypto.NewCryptionReader(aesStream, reader)
  133. v2net.ReaderToChan(input, requestReader)
  134. }
  135. func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) {
  136. v2net.ChanToWriter(writer, output)
  137. finish.Unlock()
  138. }
  139. func init() {
  140. if err := internal.RegisterInboundConnectionHandlerFactory("vmess", func(space app.Space, rawConfig interface{}) (proxy.InboundConnectionHandler, error) {
  141. config := rawConfig.(Config)
  142. allowedClients := user.NewTimedUserSet()
  143. for _, user := range config.AllowedUsers() {
  144. allowedClients.AddUser(user)
  145. }
  146. return NewVMessInboundHandler(space, allowedClients), nil
  147. }); err != nil {
  148. panic(err)
  149. }
  150. }