server.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package hysteria2
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. hyProtocol "github.com/v2fly/hysteria/core/v2/international/protocol"
  7. core "github.com/v2fly/v2ray-core/v5"
  8. "github.com/v2fly/v2ray-core/v5/common"
  9. "github.com/v2fly/v2ray-core/v5/common/buf"
  10. "github.com/v2fly/v2ray-core/v5/common/errors"
  11. "github.com/v2fly/v2ray-core/v5/common/log"
  12. "github.com/v2fly/v2ray-core/v5/common/net"
  13. "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
  14. udp_proto "github.com/v2fly/v2ray-core/v5/common/protocol/udp"
  15. "github.com/v2fly/v2ray-core/v5/common/session"
  16. "github.com/v2fly/v2ray-core/v5/common/signal"
  17. "github.com/v2fly/v2ray-core/v5/common/task"
  18. "github.com/v2fly/v2ray-core/v5/features/policy"
  19. "github.com/v2fly/v2ray-core/v5/features/routing"
  20. "github.com/v2fly/v2ray-core/v5/transport/internet"
  21. hyTransport "github.com/v2fly/v2ray-core/v5/transport/internet/hysteria2"
  22. "github.com/v2fly/v2ray-core/v5/transport/internet/udp"
  23. )
  24. func init() {
  25. common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  26. return NewServer(ctx, config.(*ServerConfig))
  27. }))
  28. }
  29. // Server is an inbound connection handler that handles messages in protocol.
  30. type Server struct {
  31. policyManager policy.Manager
  32. packetEncoding packetaddr.PacketAddrType
  33. }
  34. // NewServer creates a new inbound handler.
  35. func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
  36. v := core.MustFromContext(ctx)
  37. server := &Server{
  38. policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
  39. packetEncoding: config.PacketEncoding,
  40. }
  41. return server, nil
  42. }
  43. // Network implements proxy.Inbound.Network().
  44. func (s *Server) Network() []net.Network {
  45. return []net.Network{net.Network_TCP, net.Network_UNIX}
  46. }
  47. // Process implements proxy.Inbound.Process().
  48. func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
  49. sid := session.ExportIDToError(ctx)
  50. iConn := conn
  51. if statConn, ok := conn.(*internet.StatCouterConnection); ok {
  52. iConn = statConn.Connection // will not count the UDP traffic.
  53. }
  54. hyConn, IsHy2Transport := iConn.(*hyTransport.HyConn)
  55. if IsHy2Transport && hyConn.IsUDPExtension {
  56. network = net.Network_UDP
  57. }
  58. if !IsHy2Transport && network == net.Network_UDP {
  59. return newError(hyTransport.CanNotUseUDPExtension)
  60. }
  61. sessionPolicy := s.policyManager.ForLevel(0)
  62. if err := conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil {
  63. return newError("unable to set read deadline").Base(err).AtWarning()
  64. }
  65. bufferedReader := &buf.BufferedReader{
  66. Reader: buf.NewReader(conn),
  67. }
  68. clientReader := &ConnReader{Reader: bufferedReader}
  69. if err := conn.SetReadDeadline(time.Time{}); err != nil {
  70. return newError("unable to set read deadline").Base(err).AtWarning()
  71. }
  72. if network == net.Network_UDP { // handle udp request
  73. return s.handleUDPPayload(ctx,
  74. &PacketReader{Reader: clientReader, HyConn: hyConn},
  75. &PacketWriter{Writer: conn, HyConn: hyConn}, dispatcher)
  76. }
  77. var reqAddr string
  78. var err error
  79. reqAddr, err = hyProtocol.ReadTCPRequest(conn)
  80. if err != nil {
  81. return newError("failed to parse header").Base(err)
  82. }
  83. err = hyProtocol.WriteTCPResponse(conn, true, "")
  84. if err != nil {
  85. return newError("failed to send response").Base(err)
  86. }
  87. address, stringPort, err := net.SplitHostPort(reqAddr)
  88. if err != nil {
  89. return err
  90. }
  91. port, err := net.PortFromString(stringPort)
  92. if err != nil {
  93. return err
  94. }
  95. destination := net.Destination{Network: network, Address: net.ParseAddress(address), Port: port}
  96. inbound := session.InboundFromContext(ctx)
  97. if inbound == nil {
  98. panic("no inbound metadata")
  99. }
  100. sessionPolicy = s.policyManager.ForLevel(0)
  101. ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
  102. From: conn.RemoteAddr(),
  103. To: destination,
  104. Status: log.AccessAccepted,
  105. Reason: "",
  106. })
  107. newError("received request for ", destination).WriteToLog(sid)
  108. return s.handleConnection(ctx, sessionPolicy, destination, clientReader, buf.NewWriter(conn), dispatcher)
  109. }
  110. func (s *Server) handleConnection(ctx context.Context, sessionPolicy policy.Session,
  111. destination net.Destination,
  112. clientReader buf.Reader,
  113. clientWriter buf.Writer, dispatcher routing.Dispatcher,
  114. ) error {
  115. ctx, cancel := context.WithCancel(ctx)
  116. timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
  117. ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
  118. link, err := dispatcher.Dispatch(ctx, destination)
  119. if err != nil {
  120. return newError("failed to dispatch request to ", destination).Base(err)
  121. }
  122. requestDone := func() error {
  123. defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
  124. if err := buf.Copy(clientReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
  125. return newError("failed to transfer request").Base(err)
  126. }
  127. return nil
  128. }
  129. responseDone := func() error {
  130. defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
  131. if err := buf.Copy(link.Reader, clientWriter, buf.UpdateActivity(timer)); err != nil {
  132. return newError("failed to write response").Base(err)
  133. }
  134. return nil
  135. }
  136. requestDonePost := task.OnSuccess(requestDone, task.Close(link.Writer))
  137. if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
  138. common.Must(common.Interrupt(link.Reader))
  139. common.Must(common.Interrupt(link.Writer))
  140. return newError("connection ends").Base(err)
  141. }
  142. return nil
  143. }
  144. func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReader, clientWriter *PacketWriter, dispatcher routing.Dispatcher) error {
  145. udpDispatcherConstructor := udp.NewSplitDispatcher
  146. switch s.packetEncoding {
  147. case packetaddr.PacketAddrType_None:
  148. case packetaddr.PacketAddrType_Packet:
  149. packetAddrDispatcherFactory := udp.NewPacketAddrDispatcherCreator(ctx)
  150. udpDispatcherConstructor = packetAddrDispatcherFactory.NewPacketAddrDispatcher
  151. }
  152. udpServer := udpDispatcherConstructor(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
  153. if err := clientWriter.WriteMultiBufferWithMetadata(buf.MultiBuffer{packet.Payload}, packet.Source); err != nil {
  154. newError("failed to write response").Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx))
  155. }
  156. })
  157. inbound := session.InboundFromContext(ctx)
  158. for {
  159. select {
  160. case <-ctx.Done():
  161. return nil
  162. default:
  163. p, err := clientReader.ReadMultiBufferWithMetadata()
  164. if err != nil {
  165. if errors.Cause(err) != io.EOF {
  166. return newError("unexpected EOF").Base(err)
  167. }
  168. return nil
  169. }
  170. currentPacketCtx := ctx
  171. currentPacketCtx = log.ContextWithAccessMessage(currentPacketCtx, &log.AccessMessage{
  172. From: inbound.Source,
  173. To: p.Target,
  174. Status: log.AccessAccepted,
  175. Reason: "",
  176. })
  177. newError("tunnelling request to ", p.Target).WriteToLog(session.ExportIDToError(ctx))
  178. for _, b := range p.Buffer {
  179. udpServer.Dispatch(currentPacketCtx, p.Target, b)
  180. }
  181. }
  182. }
  183. }