httprt.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package httprt
  2. //go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
  3. import (
  4. "bytes"
  5. "context"
  6. "encoding/base64"
  7. "io"
  8. gonet "net"
  9. "net/http"
  10. "time"
  11. "github.com/v2fly/v2ray-core/v5/transport/internet/transportcommon"
  12. "github.com/v2fly/v2ray-core/v5/common"
  13. "github.com/v2fly/v2ray-core/v5/common/net"
  14. "github.com/v2fly/v2ray-core/v5/transport/internet/request"
  15. )
  16. func newHTTPRoundTripperClient(ctx context.Context, config *ClientConfig) request.RoundTripperClient {
  17. _ = ctx
  18. return &httpTripperClient{config: config}
  19. }
  20. type httpTripperClient struct {
  21. httpRTT http.RoundTripper
  22. config *ClientConfig
  23. assembly request.TransportClientAssembly
  24. }
  25. type unimplementedBackDrop struct{}
  26. func (u unimplementedBackDrop) RoundTrip(r *http.Request) (*http.Response, error) {
  27. return nil, newError("unimplemented")
  28. }
  29. func (h *httpTripperClient) OnTransportClientAssemblyReady(assembly request.TransportClientAssembly) {
  30. h.assembly = assembly
  31. }
  32. func (h *httpTripperClient) RoundTrip(ctx context.Context, req request.Request, opts ...request.RoundTripperOption) (resp request.Response, err error) {
  33. var streamingWriter io.Writer
  34. for _, v := range opts {
  35. if streamingResp, ok := v.(request.OptionSupportsStreamingResponse); ok {
  36. streamingWriter = streamingResp.GetResponseWriter()
  37. }
  38. }
  39. if h.httpRTT == nil {
  40. var backDrop http.RoundTripper = unimplementedBackDrop{}
  41. if h.config.AllowHttp {
  42. backDrop = &http.Transport{
  43. DialContext: func(_ context.Context, network, addr string) (gonet.Conn, error) {
  44. return h.assembly.AutoImplDialer().Dial(ctx)
  45. },
  46. DialTLSContext: func(_ context.Context, network, addr string) (gonet.Conn, error) {
  47. return nil, newError("unexpected dial of TLS")
  48. },
  49. }
  50. }
  51. h.httpRTT = transportcommon.NewALPNAwareHTTPRoundTripperWithH2Pool(ctx, func(ctx context.Context, addr string) (gonet.Conn, error) {
  52. return h.assembly.AutoImplDialer().Dial(ctx)
  53. }, backDrop, int(h.config.H2PoolSize))
  54. }
  55. connectionTagStr := base64.RawURLEncoding.EncodeToString(req.ConnectionTag)
  56. httpRequest, err := http.NewRequest("POST", h.config.Http.UrlPrefix+h.config.Http.Path, bytes.NewReader(req.Data))
  57. if err != nil {
  58. return resp, err
  59. }
  60. httpRequest.Header.Set("X-Session-ID", connectionTagStr)
  61. httpResp, err := h.httpRTT.RoundTrip(httpRequest)
  62. if err != nil {
  63. return resp, err
  64. }
  65. defer httpResp.Body.Close()
  66. if streamingWriter == nil {
  67. result, err := io.ReadAll(httpResp.Body)
  68. if err != nil {
  69. return request.Response{}, err
  70. }
  71. return request.Response{Data: result}, err
  72. }
  73. _, err = io.Copy(streamingWriter, httpResp.Body)
  74. if err != nil {
  75. return request.Response{}, newError("unable to copy response").Base(err)
  76. }
  77. return request.Response{}, nil
  78. }
  79. func newHTTPRoundTripperServer(ctx context.Context, config *ServerConfig) request.RoundTripperServer {
  80. return &httpTripperServer{ctx: ctx, config: config}
  81. }
  82. type httpTripperServer struct {
  83. ctx context.Context
  84. listener net.Listener
  85. assembly request.TransportServerAssembly
  86. config *ServerConfig
  87. }
  88. func (h *httpTripperServer) OnTransportServerAssemblyReady(assembly request.TransportServerAssembly) {
  89. h.assembly = assembly
  90. }
  91. func (h *httpTripperServer) ServeHTTP(writer http.ResponseWriter, r *http.Request) {
  92. h.onRequest(writer, r)
  93. }
  94. type httpRespStreamWriting struct {
  95. resp http.ResponseWriter
  96. used bool
  97. }
  98. func (h *httpRespStreamWriting) RoundTripperOption() {
  99. }
  100. func (h *httpRespStreamWriting) GetResponseWriter() io.Writer {
  101. h.used = true
  102. return h.resp
  103. }
  104. func (h *httpRespStreamWriting) Flush() {
  105. if f, ok := h.resp.(http.Flusher); ok {
  106. f.Flush()
  107. }
  108. }
  109. func (h *httpTripperServer) onRequest(resp http.ResponseWriter, req *http.Request) {
  110. tail := req.Header.Get("X-Session-ID")
  111. data := []byte(tail)
  112. if !h.config.NoDecodingSessionTag {
  113. decodedData, err := base64.RawURLEncoding.DecodeString(tail)
  114. if err != nil {
  115. newError("unable to decode tag").Base(err).AtInfo().WriteToLog()
  116. return
  117. }
  118. data = decodedData
  119. }
  120. body, err := io.ReadAll(req.Body)
  121. req.Body.Close()
  122. if err != nil {
  123. newError("unable to read body").Base(err).AtInfo().WriteToLog()
  124. }
  125. streamingRespOption := &httpRespStreamWriting{resp: resp}
  126. recvResp, err := h.assembly.TripperReceiver().OnRoundTrip(h.ctx, request.Request{Data: body, ConnectionTag: data},
  127. streamingRespOption)
  128. if err != nil {
  129. newError("unable to process roundtrip").Base(err).AtInfo().WriteToLog()
  130. }
  131. if streamingRespOption.used {
  132. return
  133. }
  134. _, err = io.Copy(resp, bytes.NewReader(recvResp.Data))
  135. if err != nil {
  136. newError("unable to send response").Base(err).AtInfo().WriteToLog()
  137. }
  138. }
  139. func (h *httpTripperServer) Start() error {
  140. listener, err := h.assembly.AutoImplListener().Listen(h.ctx)
  141. if err != nil {
  142. return newError("unable to create a listener for http tripper server").Base(err)
  143. }
  144. h.listener = listener
  145. go func() {
  146. httpServer := http.Server{
  147. ReadHeaderTimeout: 15 * time.Second,
  148. ReadTimeout: 15 * time.Second,
  149. WriteTimeout: 10 * time.Second,
  150. IdleTimeout: 30 * time.Second,
  151. }
  152. httpServer.Handler = h
  153. err := httpServer.Serve(h.listener)
  154. if err != nil {
  155. newError("unable to serve listener for http tripper server").Base(err).WriteToLog()
  156. }
  157. }()
  158. return nil
  159. }
  160. func (h *httpTripperServer) Close() error {
  161. return h.listener.Close()
  162. }
  163. func init() {
  164. common.Must(common.RegisterConfig((*ClientConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  165. clientConfig, ok := config.(*ClientConfig)
  166. if !ok {
  167. return nil, newError("not a ClientConfig")
  168. }
  169. return newHTTPRoundTripperClient(ctx, clientConfig), nil
  170. }))
  171. common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  172. serverConfig, ok := config.(*ServerConfig)
  173. if !ok {
  174. return nil, newError("not a ServerConfig")
  175. }
  176. return newHTTPRoundTripperServer(ctx, serverConfig), nil
  177. }))
  178. }