outbound.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package outbound
  2. import (
  3. "sync"
  4. "github.com/v2ray/v2ray-core/app"
  5. "github.com/v2ray/v2ray-core/common/alloc"
  6. v2io "github.com/v2ray/v2ray-core/common/io"
  7. "github.com/v2ray/v2ray-core/common/log"
  8. v2net "github.com/v2ray/v2ray-core/common/net"
  9. "github.com/v2ray/v2ray-core/common/protocol"
  10. "github.com/v2ray/v2ray-core/common/protocol/raw"
  11. "github.com/v2ray/v2ray-core/proxy"
  12. "github.com/v2ray/v2ray-core/proxy/internal"
  13. vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
  14. "github.com/v2ray/v2ray-core/transport/hub"
  15. "github.com/v2ray/v2ray-core/transport/ray"
  16. )
  17. type VMessOutboundHandler struct {
  18. receiverManager *ReceiverManager
  19. }
  20. func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *alloc.Buffer, ray ray.OutboundRay) error {
  21. defer ray.OutboundInput().Release()
  22. defer ray.OutboundOutput().Close()
  23. destination, vNextUser := this.receiverManager.PickReceiver()
  24. command := protocol.RequestCommandTCP
  25. if target.IsUDP() {
  26. command = protocol.RequestCommandUDP
  27. }
  28. request := &protocol.RequestHeader{
  29. Version: raw.Version,
  30. User: vNextUser,
  31. Command: command,
  32. Address: target.Address(),
  33. Port: target.Port(),
  34. Option: protocol.RequestOptionChunkStream,
  35. }
  36. conn, err := hub.Dial(destination)
  37. if err != nil {
  38. log.Error("Failed to open ", destination, ": ", err)
  39. return err
  40. }
  41. log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination)
  42. defer conn.Close()
  43. if request.Option.IsChunkStream() {
  44. conn.SetReusable(true)
  45. }
  46. input := ray.OutboundInput()
  47. output := ray.OutboundOutput()
  48. var requestFinish, responseFinish sync.Mutex
  49. requestFinish.Lock()
  50. responseFinish.Lock()
  51. session := raw.NewClientSession(protocol.DefaultIDHash)
  52. go this.handleRequest(session, conn, request, payload, input, &requestFinish)
  53. go this.handleResponse(session, conn, request, destination, output, &responseFinish)
  54. requestFinish.Lock()
  55. responseFinish.Lock()
  56. return nil
  57. }
  58. func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, payload *alloc.Buffer, input v2io.Reader, finish *sync.Mutex) {
  59. defer finish.Unlock()
  60. writer := v2io.NewBufferedWriter(conn)
  61. defer writer.Release()
  62. session.EncodeRequestHeader(request, writer)
  63. bodyWriter := session.EncodeRequestBody(writer)
  64. var streamWriter v2io.Writer = v2io.NewAdaptiveWriter(bodyWriter)
  65. if request.Option.IsChunkStream() {
  66. streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
  67. }
  68. streamWriter.Write(payload)
  69. writer.SetCached(false)
  70. err := v2io.Pipe(input, streamWriter)
  71. if err != vmessio.ErrorStreamCompleted {
  72. conn.SetReusable(false)
  73. }
  74. if request.Option.IsChunkStream() {
  75. streamWriter.Write(alloc.NewSmallBuffer().Clear())
  76. }
  77. streamWriter.Release()
  78. return
  79. }
  80. func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn *hub.Connection, request *protocol.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) {
  81. defer finish.Unlock()
  82. reader := v2io.NewBufferedReader(conn)
  83. defer reader.Release()
  84. header, err := session.DecodeResponseHeader(reader)
  85. if err != nil {
  86. log.Warning("VMessOut: Failed to read response: ", err)
  87. return
  88. }
  89. go this.handleCommand(dest, header.Command)
  90. reader.SetCached(false)
  91. decryptReader := session.DecodeResponseBody(reader)
  92. var bodyReader v2io.Reader
  93. if request.Option.IsChunkStream() {
  94. bodyReader = vmessio.NewAuthChunkReader(decryptReader)
  95. } else {
  96. bodyReader = v2io.NewAdaptiveReader(decryptReader)
  97. }
  98. err = v2io.Pipe(bodyReader, output)
  99. if err != vmessio.ErrorStreamCompleted {
  100. conn.SetReusable(false)
  101. }
  102. bodyReader.Release()
  103. return
  104. }
  105. func init() {
  106. internal.MustRegisterOutboundHandlerCreator("vmess",
  107. func(space app.Space, rawConfig interface{}) (proxy.OutboundHandler, error) {
  108. vOutConfig := rawConfig.(*Config)
  109. return &VMessOutboundHandler{
  110. receiverManager: NewReceiverManager(vOutConfig.Receivers),
  111. }, nil
  112. })
  113. }