handler.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. package outbound
  2. import (
  3. "context"
  4. core "github.com/v2fly/v2ray-core/v5"
  5. "github.com/v2fly/v2ray-core/v5/app/proxyman"
  6. "github.com/v2fly/v2ray-core/v5/common"
  7. "github.com/v2fly/v2ray-core/v5/common/dice"
  8. "github.com/v2fly/v2ray-core/v5/common/mux"
  9. "github.com/v2fly/v2ray-core/v5/common/net"
  10. "github.com/v2fly/v2ray-core/v5/common/net/packetaddr"
  11. "github.com/v2fly/v2ray-core/v5/common/serial"
  12. "github.com/v2fly/v2ray-core/v5/common/session"
  13. "github.com/v2fly/v2ray-core/v5/features/dns"
  14. "github.com/v2fly/v2ray-core/v5/features/outbound"
  15. "github.com/v2fly/v2ray-core/v5/features/policy"
  16. "github.com/v2fly/v2ray-core/v5/features/stats"
  17. "github.com/v2fly/v2ray-core/v5/proxy"
  18. "github.com/v2fly/v2ray-core/v5/transport"
  19. "github.com/v2fly/v2ray-core/v5/transport/internet"
  20. "github.com/v2fly/v2ray-core/v5/transport/internet/security"
  21. "github.com/v2fly/v2ray-core/v5/transport/pipe"
  22. )
  23. func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
  24. var uplinkCounter stats.Counter
  25. var downlinkCounter stats.Counter
  26. policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
  27. if len(tag) > 0 && policy.ForSystem().Stats.OutboundUplink {
  28. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  29. name := "outbound>>>" + tag + ">>>traffic>>>uplink"
  30. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  31. if c != nil {
  32. uplinkCounter = c
  33. }
  34. }
  35. if len(tag) > 0 && policy.ForSystem().Stats.OutboundDownlink {
  36. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  37. name := "outbound>>>" + tag + ">>>traffic>>>downlink"
  38. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  39. if c != nil {
  40. downlinkCounter = c
  41. }
  42. }
  43. return uplinkCounter, downlinkCounter
  44. }
  45. // Handler is an implements of outbound.Handler.
  46. type Handler struct {
  47. tag string
  48. senderSettings *proxyman.SenderConfig
  49. streamSettings *internet.MemoryStreamConfig
  50. proxy proxy.Outbound
  51. outboundManager outbound.Manager
  52. mux *mux.ClientManager
  53. uplinkCounter stats.Counter
  54. downlinkCounter stats.Counter
  55. dns dns.Client
  56. }
  57. // NewHandler create a new Handler based on the given configuration.
  58. func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbound.Handler, error) {
  59. v := core.MustFromContext(ctx)
  60. uplinkCounter, downlinkCounter := getStatCounter(v, config.Tag)
  61. h := &Handler{
  62. tag: config.Tag,
  63. outboundManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
  64. uplinkCounter: uplinkCounter,
  65. downlinkCounter: downlinkCounter,
  66. }
  67. if config.SenderSettings != nil {
  68. senderSettings, err := serial.GetInstanceOf(config.SenderSettings)
  69. if err != nil {
  70. return nil, err
  71. }
  72. switch s := senderSettings.(type) {
  73. case *proxyman.SenderConfig:
  74. h.senderSettings = s
  75. mss, err := internet.ToMemoryStreamConfig(s.StreamSettings)
  76. if err != nil {
  77. return nil, newError("failed to parse stream settings").Base(err).AtWarning()
  78. }
  79. h.streamSettings = mss
  80. default:
  81. return nil, newError("settings is not SenderConfig")
  82. }
  83. }
  84. proxyConfig, err := serial.GetInstanceOf(config.ProxySettings)
  85. if err != nil {
  86. return nil, err
  87. }
  88. rawProxyHandler, err := common.CreateObject(ctx, proxyConfig)
  89. if err != nil {
  90. return nil, err
  91. }
  92. proxyHandler, ok := rawProxyHandler.(proxy.Outbound)
  93. if !ok {
  94. return nil, newError("not an outbound handler")
  95. }
  96. if h.senderSettings != nil && h.senderSettings.MultiplexSettings != nil {
  97. config := h.senderSettings.MultiplexSettings
  98. if config.Concurrency < 1 || config.Concurrency > 1024 {
  99. return nil, newError("invalid mux concurrency: ", config.Concurrency).AtWarning()
  100. }
  101. h.mux = &mux.ClientManager{
  102. Enabled: h.senderSettings.MultiplexSettings.Enabled,
  103. Picker: &mux.IncrementalWorkerPicker{
  104. Factory: mux.NewDialingWorkerFactory(
  105. ctx,
  106. proxyHandler,
  107. h,
  108. mux.ClientStrategy{
  109. MaxConcurrency: config.Concurrency,
  110. MaxConnection: 128,
  111. },
  112. ),
  113. },
  114. }
  115. }
  116. if h.senderSettings != nil && h.senderSettings.DomainStrategy != proxyman.SenderConfig_AS_IS {
  117. err := core.RequireFeatures(ctx, func(d dns.Client) error {
  118. h.dns = d
  119. return nil
  120. })
  121. if err != nil {
  122. return nil, err
  123. }
  124. }
  125. h.proxy = proxyHandler
  126. return h, nil
  127. }
  128. // Tag implements outbound.Handler.
  129. func (h *Handler) Tag() string {
  130. return h.tag
  131. }
  132. // Dispatch implements proxy.Outbound.Dispatch.
  133. func (h *Handler) Dispatch(ctx context.Context, link *transport.Link) {
  134. if h.mux != nil && (h.mux.Enabled || session.MuxPreferedFromContext(ctx)) {
  135. if err := h.mux.Dispatch(ctx, link); err != nil {
  136. err := newError("failed to process mux outbound traffic").Base(err)
  137. session.SubmitOutboundErrorToOriginator(ctx, err)
  138. err.WriteToLog(session.ExportIDToError(ctx))
  139. common.Interrupt(link.Writer)
  140. }
  141. } else {
  142. if err := h.proxy.Process(ctx, link, h); err != nil {
  143. // Ensure outbound ray is properly closed.
  144. err := newError("failed to process outbound traffic").Base(err)
  145. session.SubmitOutboundErrorToOriginator(ctx, err)
  146. err.WriteToLog(session.ExportIDToError(ctx))
  147. common.Interrupt(link.Writer)
  148. } else {
  149. common.Must(common.Close(link.Writer))
  150. }
  151. common.Interrupt(link.Reader)
  152. }
  153. }
  154. // Address implements internet.Dialer.
  155. func (h *Handler) Address() net.Address {
  156. if h.senderSettings == nil || h.senderSettings.Via == nil {
  157. return nil
  158. }
  159. return h.senderSettings.Via.AsAddress()
  160. }
  161. // Dial implements internet.Dialer.
  162. func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Connection, error) {
  163. if h.senderSettings != nil {
  164. if h.senderSettings.ProxySettings.HasTag() && !h.senderSettings.ProxySettings.TransportLayerProxy {
  165. tag := h.senderSettings.ProxySettings.Tag
  166. handler := h.outboundManager.GetHandler(tag)
  167. if handler != nil {
  168. newError("proxying to ", tag, " for dest ", dest).AtDebug().WriteToLog(session.ExportIDToError(ctx))
  169. ctx = session.ContextWithOutbound(ctx, &session.Outbound{
  170. Target: dest,
  171. })
  172. opts := pipe.OptionsFromContext(ctx)
  173. uplinkReader, uplinkWriter := pipe.New(opts...)
  174. downlinkReader, downlinkWriter := pipe.New(opts...)
  175. go handler.Dispatch(ctx, &transport.Link{Reader: uplinkReader, Writer: downlinkWriter})
  176. conn := net.NewConnection(net.ConnectionInputMulti(uplinkWriter), net.ConnectionOutputMulti(downlinkReader))
  177. securityEngine, err := security.CreateSecurityEngineFromSettings(ctx, h.streamSettings)
  178. if err != nil {
  179. return nil, newError("unable to create security engine").Base(err)
  180. }
  181. if securityEngine != nil {
  182. conn, err = securityEngine.Client(conn, security.OptionWithDestination{Dest: dest})
  183. if err != nil {
  184. return nil, newError("unable to create security protocol client from security engine").Base(err)
  185. }
  186. }
  187. return h.getStatCouterConnection(conn), nil
  188. }
  189. newError("failed to get outbound handler with tag: ", tag).AtWarning().WriteToLog(session.ExportIDToError(ctx))
  190. }
  191. if h.senderSettings.Via != nil {
  192. outbound := session.OutboundFromContext(ctx)
  193. if outbound == nil {
  194. outbound = new(session.Outbound)
  195. ctx = session.ContextWithOutbound(ctx, outbound)
  196. }
  197. outbound.Gateway = h.senderSettings.Via.AsAddress()
  198. }
  199. if h.senderSettings.DomainStrategy != proxyman.SenderConfig_AS_IS {
  200. outbound := session.OutboundFromContext(ctx)
  201. if outbound == nil {
  202. outbound = new(session.Outbound)
  203. ctx = session.ContextWithOutbound(ctx, outbound)
  204. }
  205. outbound.Resolver = func(ctx context.Context, domain string) net.Address {
  206. return h.resolveIP(ctx, domain, h.Address())
  207. }
  208. }
  209. }
  210. enablePacketAddrCapture := true
  211. if h.senderSettings != nil && h.senderSettings.ProxySettings != nil && h.senderSettings.ProxySettings.HasTag() && h.senderSettings.ProxySettings.TransportLayerProxy {
  212. tag := h.senderSettings.ProxySettings.Tag
  213. newError("transport layer proxying to ", tag, " for dest ", dest).AtDebug().WriteToLog(session.ExportIDToError(ctx))
  214. ctx = session.SetTransportLayerProxyTagToContext(ctx, tag)
  215. enablePacketAddrCapture = false
  216. }
  217. if isStream, err := packetaddr.GetDestinationSubsetOf(dest); err == nil && enablePacketAddrCapture {
  218. packetConn, err := internet.ListenSystemPacket(ctx, &net.UDPAddr{IP: net.AnyIP.IP(), Port: 0}, h.streamSettings.SocketSettings)
  219. if err != nil {
  220. return nil, newError("unable to listen socket").Base(err)
  221. }
  222. conn := packetaddr.ToPacketAddrConnWrapper(packetConn, isStream)
  223. return h.getStatCouterConnection(conn), nil
  224. }
  225. conn, err := internet.Dial(ctx, dest, h.streamSettings)
  226. return h.getStatCouterConnection(conn), err
  227. }
  228. func (h *Handler) resolveIP(ctx context.Context, domain string, localAddr net.Address) net.Address {
  229. strategy := h.senderSettings.DomainStrategy
  230. ips, err := dns.LookupIPWithOption(h.dns, domain, dns.IPOption{
  231. IPv4Enable: strategy == proxyman.SenderConfig_USE_IP || strategy == proxyman.SenderConfig_USE_IP4 || (localAddr != nil && localAddr.Family().IsIPv4()),
  232. IPv6Enable: strategy == proxyman.SenderConfig_USE_IP || strategy == proxyman.SenderConfig_USE_IP6 || (localAddr != nil && localAddr.Family().IsIPv6()),
  233. FakeEnable: false,
  234. })
  235. if err != nil {
  236. newError("failed to get IP address for domain ", domain).Base(err).WriteToLog(session.ExportIDToError(ctx))
  237. }
  238. if len(ips) == 0 {
  239. return nil
  240. }
  241. return net.IPAddress(ips[dice.Roll(len(ips))])
  242. }
  243. func (h *Handler) getStatCouterConnection(conn internet.Connection) internet.Connection {
  244. if h.uplinkCounter != nil || h.downlinkCounter != nil {
  245. return &internet.StatCouterConnection{
  246. Connection: conn,
  247. ReadCounter: h.downlinkCounter,
  248. WriteCounter: h.uplinkCounter,
  249. }
  250. }
  251. return conn
  252. }
  253. // GetOutbound implements proxy.GetOutbound.
  254. func (h *Handler) GetOutbound() proxy.Outbound {
  255. return h.proxy
  256. }
  257. // Start implements common.Runnable.
  258. func (h *Handler) Start() error {
  259. return nil
  260. }
  261. // Close implements common.Closable.
  262. func (h *Handler) Close() error {
  263. common.Close(h.mux)
  264. return nil
  265. }