server.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package hysteria2
  2. import (
  3. "context"
  4. "io"
  5. "time"
  6. hyProtocol "github.com/apernet/hysteria/core/v2/international/protocol"
  7. core "github.com/v2fly/v2ray-core/v5"
  8. "github.com/v2fly/v2ray-core/v5/common"
  9. "github.com/v2fly/v2ray-core/v5/common/buf"
  10. "github.com/v2fly/v2ray-core/v5/common/errors"
  11. "github.com/v2fly/v2ray-core/v5/common/log"
  12. "github.com/v2fly/v2ray-core/v5/common/net"
  13. "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
  14. udp_proto "github.com/v2fly/v2ray-core/v5/common/protocol/udp"
  15. "github.com/v2fly/v2ray-core/v5/common/session"
  16. "github.com/v2fly/v2ray-core/v5/common/signal"
  17. "github.com/v2fly/v2ray-core/v5/common/task"
  18. "github.com/v2fly/v2ray-core/v5/features/policy"
  19. "github.com/v2fly/v2ray-core/v5/features/routing"
  20. "github.com/v2fly/v2ray-core/v5/transport/internet"
  21. hyTransport "github.com/v2fly/v2ray-core/v5/transport/internet/hysteria2"
  22. "github.com/v2fly/v2ray-core/v5/transport/internet/udp"
  23. )
  24. func init() {
  25. common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  26. return NewServer(ctx, config.(*ServerConfig))
  27. }))
  28. }
  29. // Server is an inbound connection handler that handles messages in protocol.
  30. type Server struct {
  31. policyManager policy.Manager
  32. packetEncoding packetaddr.PacketAddrType
  33. }
  34. // NewServer creates a new inbound handler.
  35. func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
  36. v := core.MustFromContext(ctx)
  37. server := &Server{
  38. policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
  39. }
  40. return server, nil
  41. }
  42. // Network implements proxy.Inbound.Network().
  43. func (s *Server) Network() []net.Network {
  44. return []net.Network{net.Network_TCP, net.Network_UNIX}
  45. }
  46. // Process implements proxy.Inbound.Process().
  47. func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
  48. sid := session.ExportIDToError(ctx)
  49. iConn := conn
  50. if statConn, ok := conn.(*internet.StatCouterConnection); ok {
  51. iConn = statConn.Connection // will not count the UDP traffic.
  52. }
  53. hyConn, IsHy2Transport := iConn.(*hyTransport.HyConn)
  54. if IsHy2Transport && hyConn.IsUDPExtension {
  55. network = net.Network_UDP
  56. }
  57. if !IsHy2Transport && network == net.Network_UDP {
  58. return newError(hyTransport.CanNotUseUdpExtension)
  59. }
  60. sessionPolicy := s.policyManager.ForLevel(0)
  61. if err := conn.SetReadDeadline(time.Now().Add(sessionPolicy.Timeouts.Handshake)); err != nil {
  62. return newError("unable to set read deadline").Base(err).AtWarning()
  63. }
  64. bufferedReader := &buf.BufferedReader{
  65. Reader: buf.NewReader(conn),
  66. }
  67. clientReader := &ConnReader{Reader: bufferedReader}
  68. if err := conn.SetReadDeadline(time.Time{}); err != nil {
  69. return newError("unable to set read deadline").Base(err).AtWarning()
  70. }
  71. if network == net.Network_UDP { // handle udp request
  72. return s.handleUDPPayload(ctx,
  73. &PacketReader{Reader: clientReader, HyConn: hyConn},
  74. &PacketWriter{Writer: conn, HyConn: hyConn}, dispatcher)
  75. }
  76. var reqAddr string
  77. var err error
  78. reqAddr, err = hyProtocol.ReadTCPRequest(conn)
  79. if err != nil {
  80. return newError("failed to parse header").Base(err)
  81. }
  82. err = hyProtocol.WriteTCPResponse(conn, true, "")
  83. if err != nil {
  84. return newError("failed to send response").Base(err)
  85. }
  86. address, stringPort, err := net.SplitHostPort(reqAddr)
  87. if err != nil {
  88. return err
  89. }
  90. port, err := net.PortFromString(stringPort)
  91. if err != nil {
  92. return err
  93. }
  94. destination := net.Destination{Network: network, Address: net.ParseAddress(address), Port: port}
  95. inbound := session.InboundFromContext(ctx)
  96. if inbound == nil {
  97. panic("no inbound metadata")
  98. }
  99. sessionPolicy = s.policyManager.ForLevel(0)
  100. ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
  101. From: conn.RemoteAddr(),
  102. To: destination,
  103. Status: log.AccessAccepted,
  104. Reason: "",
  105. })
  106. newError("received request for ", destination).WriteToLog(sid)
  107. return s.handleConnection(ctx, sessionPolicy, destination, clientReader, buf.NewWriter(conn), dispatcher)
  108. }
  109. func (s *Server) handleConnection(ctx context.Context, sessionPolicy policy.Session,
  110. destination net.Destination,
  111. clientReader buf.Reader,
  112. clientWriter buf.Writer, dispatcher routing.Dispatcher,
  113. ) error {
  114. ctx, cancel := context.WithCancel(ctx)
  115. timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
  116. ctx = policy.ContextWithBufferPolicy(ctx, sessionPolicy.Buffer)
  117. link, err := dispatcher.Dispatch(ctx, destination)
  118. if err != nil {
  119. return newError("failed to dispatch request to ", destination).Base(err)
  120. }
  121. requestDone := func() error {
  122. defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
  123. if err := buf.Copy(clientReader, link.Writer, buf.UpdateActivity(timer)); err != nil {
  124. return newError("failed to transfer request").Base(err)
  125. }
  126. return nil
  127. }
  128. responseDone := func() error {
  129. defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
  130. if err := buf.Copy(link.Reader, clientWriter, buf.UpdateActivity(timer)); err != nil {
  131. return newError("failed to write response").Base(err)
  132. }
  133. return nil
  134. }
  135. requestDonePost := task.OnSuccess(requestDone, task.Close(link.Writer))
  136. if err := task.Run(ctx, requestDonePost, responseDone); err != nil {
  137. common.Must(common.Interrupt(link.Reader))
  138. common.Must(common.Interrupt(link.Writer))
  139. return newError("connection ends").Base(err)
  140. }
  141. return nil
  142. }
  143. func (s *Server) handleUDPPayload(ctx context.Context, clientReader *PacketReader, clientWriter *PacketWriter, dispatcher routing.Dispatcher) error { // {{{
  144. udpDispatcherConstructor := udp.NewSplitDispatcher
  145. switch s.packetEncoding {
  146. case packetaddr.PacketAddrType_None:
  147. case packetaddr.PacketAddrType_Packet:
  148. packetAddrDispatcherFactory := udp.NewPacketAddrDispatcherCreator(ctx)
  149. udpDispatcherConstructor = packetAddrDispatcherFactory.NewPacketAddrDispatcher
  150. }
  151. udpServer := udpDispatcherConstructor(dispatcher, func(ctx context.Context, packet *udp_proto.Packet) {
  152. if err := clientWriter.WriteMultiBufferWithMetadata(buf.MultiBuffer{packet.Payload}, packet.Source); err != nil {
  153. newError("failed to write response").Base(err).AtWarning().WriteToLog(session.ExportIDToError(ctx))
  154. }
  155. })
  156. inbound := session.InboundFromContext(ctx)
  157. // user := inbound.User
  158. for {
  159. select {
  160. case <-ctx.Done():
  161. return nil
  162. default:
  163. p, err := clientReader.ReadMultiBufferWithMetadata()
  164. if err != nil {
  165. if errors.Cause(err) != io.EOF {
  166. return newError("unexpected EOF").Base(err)
  167. }
  168. return nil
  169. }
  170. currentPacketCtx := ctx
  171. currentPacketCtx = log.ContextWithAccessMessage(currentPacketCtx, &log.AccessMessage{
  172. From: inbound.Source,
  173. To: p.Target,
  174. Status: log.AccessAccepted,
  175. Reason: "",
  176. })
  177. newError("tunnelling request to ", p.Target).WriteToLog(session.ExportIDToError(ctx))
  178. for _, b := range p.Buffer {
  179. udpServer.Dispatch(currentPacketCtx, p.Target, b)
  180. }
  181. }
  182. }
  183. } // }}}