handler_tcp.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package tun
  2. import (
  3. "context"
  4. "time"
  5. "gvisor.dev/gvisor/pkg/tcpip"
  6. "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
  7. "gvisor.dev/gvisor/pkg/tcpip/header"
  8. "gvisor.dev/gvisor/pkg/tcpip/stack"
  9. "gvisor.dev/gvisor/pkg/tcpip/transport/tcp"
  10. "gvisor.dev/gvisor/pkg/waiter"
  11. tun_net "github.com/v2fly/v2ray-core/v5/app/tun/net"
  12. "github.com/v2fly/v2ray-core/v5/common"
  13. "github.com/v2fly/v2ray-core/v5/common/buf"
  14. "github.com/v2fly/v2ray-core/v5/common/log"
  15. "github.com/v2fly/v2ray-core/v5/common/net"
  16. "github.com/v2fly/v2ray-core/v5/common/session"
  17. "github.com/v2fly/v2ray-core/v5/common/signal"
  18. "github.com/v2fly/v2ray-core/v5/common/task"
  19. "github.com/v2fly/v2ray-core/v5/features/policy"
  20. "github.com/v2fly/v2ray-core/v5/features/routing"
  21. internet "github.com/v2fly/v2ray-core/v5/transport/internet"
  22. )
  23. const (
  24. rcvWnd = 0 // default settings
  25. maxInFlight = 2 << 10
  26. )
  27. type tcpConn struct {
  28. *gonet.TCPConn
  29. id stack.TransportEndpointID
  30. }
  31. func (c *tcpConn) ID() *stack.TransportEndpointID {
  32. return &c.id
  33. }
  34. type TCPHandler struct {
  35. ctx context.Context
  36. dispatcher routing.Dispatcher
  37. policyManager policy.Manager
  38. config *Config
  39. }
  40. func SetTCPHandler(ctx context.Context, dispatcher routing.Dispatcher, policyManager policy.Manager, config *Config) StackOption {
  41. return func(s *stack.Stack) error {
  42. tcpForwarder := tcp.NewForwarder(s, rcvWnd, maxInFlight, func(r *tcp.ForwarderRequest) {
  43. wg := new(waiter.Queue)
  44. linkedEndpoint, err := r.CreateEndpoint(wg)
  45. if err != nil {
  46. r.Complete(true)
  47. return
  48. }
  49. defer r.Complete(false)
  50. if config.SocketSettings != nil {
  51. if err := applySocketOptions(s, linkedEndpoint, config.SocketSettings); err != nil {
  52. newError("failed to apply socket options: ", err).WriteToLog(session.ExportIDToError(ctx))
  53. }
  54. }
  55. conn := &tcpConn{
  56. TCPConn: gonet.NewTCPConn(wg, linkedEndpoint),
  57. id: r.ID(),
  58. }
  59. handler := &TCPHandler{
  60. ctx: ctx,
  61. dispatcher: dispatcher,
  62. policyManager: policyManager,
  63. config: config,
  64. }
  65. go handler.Handle(conn)
  66. })
  67. s.SetTransportProtocolHandler(tcp.ProtocolNumber, tcpForwarder.HandlePacket)
  68. return nil
  69. }
  70. }
  71. func (h *TCPHandler) Handle(conn tun_net.TCPConn) error {
  72. defer conn.Close()
  73. id := conn.ID()
  74. ctx := session.ContextWithInbound(h.ctx, &session.Inbound{Tag: h.config.Tag})
  75. sessionPolicy := h.policyManager.ForLevel(h.config.UserLevel)
  76. dest := net.TCPDestination(tun_net.AddressFromTCPIPAddr(id.LocalAddress), net.Port(id.LocalPort))
  77. src := net.TCPDestination(tun_net.AddressFromTCPIPAddr(id.RemoteAddress), net.Port(id.RemotePort))
  78. ctx = log.ContextWithAccessMessage(ctx, &log.AccessMessage{
  79. From: src,
  80. To: dest,
  81. Status: log.AccessAccepted,
  82. Reason: "",
  83. })
  84. content := new(session.Content)
  85. if h.config.SniffingSettings != nil {
  86. content.SniffingRequest.Enabled = h.config.SniffingSettings.Enabled
  87. content.SniffingRequest.OverrideDestinationForProtocol = h.config.SniffingSettings.DestinationOverride
  88. content.SniffingRequest.MetadataOnly = h.config.SniffingSettings.MetadataOnly
  89. }
  90. ctx = session.ContextWithContent(ctx, content)
  91. ctx, cancel := context.WithCancel(ctx)
  92. timer := signal.CancelAfterInactivity(ctx, cancel, sessionPolicy.Timeouts.ConnectionIdle)
  93. link, err := h.dispatcher.Dispatch(ctx, dest)
  94. if err != nil {
  95. return newError("failed to dispatch").Base(err)
  96. }
  97. responseDone := func() error {
  98. defer timer.SetTimeout(sessionPolicy.Timeouts.UplinkOnly)
  99. if err := buf.Copy(link.Reader, buf.NewWriter(conn), buf.UpdateActivity(timer)); err != nil {
  100. return newError("failed to transport all TCP response").Base(err)
  101. }
  102. return nil
  103. }
  104. requestDone := func() error {
  105. defer timer.SetTimeout(sessionPolicy.Timeouts.DownlinkOnly)
  106. if err := buf.Copy(buf.NewReader(conn), link.Writer, buf.UpdateActivity(timer)); err != nil {
  107. return newError("failed to transport all TCP request").Base(err)
  108. }
  109. return nil
  110. }
  111. requestDoneAndCloseWriter := task.OnSuccess(requestDone, task.Close(link.Writer))
  112. if err := task.Run(h.ctx, requestDoneAndCloseWriter, responseDone); err != nil {
  113. common.Interrupt(link.Reader)
  114. common.Interrupt(link.Writer)
  115. return newError("connection ends").Base(err)
  116. }
  117. return nil
  118. }
  119. func applySocketOptions(s *stack.Stack, endpoint tcpip.Endpoint, config *internet.SocketConfig) tcpip.Error {
  120. if config.TcpKeepAliveInterval > 0 {
  121. interval := tcpip.KeepaliveIntervalOption(time.Duration(config.TcpKeepAliveInterval) * time.Second)
  122. if err := endpoint.SetSockOpt(&interval); err != nil {
  123. return err
  124. }
  125. }
  126. if config.TcpKeepAliveIdle > 0 {
  127. idle := tcpip.KeepaliveIdleOption(time.Duration(config.TcpKeepAliveIdle) * time.Second)
  128. if err := endpoint.SetSockOpt(&idle); err != nil {
  129. return err
  130. }
  131. }
  132. if config.TcpKeepAliveInterval > 0 || config.TcpKeepAliveIdle > 0 {
  133. endpoint.SocketOptions().SetKeepAlive(true)
  134. }
  135. {
  136. var sendBufferSizeRangeOption tcpip.TCPSendBufferSizeRangeOption
  137. if err := s.TransportProtocolOption(header.TCPProtocolNumber, &sendBufferSizeRangeOption); err == nil {
  138. endpoint.SocketOptions().SetReceiveBufferSize(int64(sendBufferSizeRangeOption.Default), false)
  139. }
  140. var receiveBufferSizeRangeOption tcpip.TCPReceiveBufferSizeRangeOption
  141. if err := s.TransportProtocolOption(header.TCPProtocolNumber, &receiveBufferSizeRangeOption); err == nil {
  142. endpoint.SocketOptions().SetSendBufferSize(int64(receiveBufferSizeRangeOption.Default), false)
  143. }
  144. }
  145. return nil
  146. }