always.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package inbound
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/app/proxyman"
  6. "v2ray.com/core/common"
  7. "v2ray.com/core/common/dice"
  8. "v2ray.com/core/common/mux"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/serial"
  11. "v2ray.com/core/features/policy"
  12. "v2ray.com/core/features/stats"
  13. "v2ray.com/core/proxy"
  14. "v2ray.com/core/transport/internet"
  15. )
  16. func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
  17. var uplinkCounter stats.Counter
  18. var downlinkCounter stats.Counter
  19. policy := v.GetFeature(policy.ManagerType()).(policy.Manager)
  20. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  21. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  22. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  23. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  24. if c != nil {
  25. uplinkCounter = c
  26. }
  27. }
  28. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  29. statsManager := v.GetFeature(stats.ManagerType()).(stats.Manager)
  30. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  31. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  32. if c != nil {
  33. downlinkCounter = c
  34. }
  35. }
  36. return uplinkCounter, downlinkCounter
  37. }
  38. type AlwaysOnInboundHandler struct {
  39. proxy proxy.Inbound
  40. workers []worker
  41. mux *mux.Server
  42. tag string
  43. }
  44. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  45. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  46. if err != nil {
  47. return nil, err
  48. }
  49. p, ok := rawProxy.(proxy.Inbound)
  50. if !ok {
  51. return nil, newError("not an inbound proxy.")
  52. }
  53. h := &AlwaysOnInboundHandler{
  54. proxy: p,
  55. mux: mux.NewServer(ctx),
  56. tag: tag,
  57. }
  58. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  59. nl := p.Network()
  60. pr := receiverConfig.PortRange
  61. address := receiverConfig.Listen.AsAddress()
  62. if address == nil {
  63. address = net.AnyIP
  64. }
  65. mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
  66. if err != nil {
  67. return nil, newError("failed to parse stream config").Base(err).AtWarning()
  68. }
  69. if receiverConfig.ReceiveOriginalDestination {
  70. if mss.SocketSettings == nil {
  71. mss.SocketSettings = &internet.SocketConfig{}
  72. }
  73. if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
  74. mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
  75. }
  76. mss.SocketSettings.ReceiveOriginalDestAddress = true
  77. }
  78. for port := pr.From; port <= pr.To; port++ {
  79. if nl.HasNetwork(net.Network_TCP) {
  80. newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
  81. worker := &tcpWorker{
  82. address: address,
  83. port: net.Port(port),
  84. proxy: p,
  85. stream: mss,
  86. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  87. tag: tag,
  88. dispatcher: h.mux,
  89. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  90. uplinkCounter: uplinkCounter,
  91. downlinkCounter: downlinkCounter,
  92. }
  93. h.workers = append(h.workers, worker)
  94. }
  95. if nl.HasNetwork(net.Network_UDP) {
  96. worker := &udpWorker{
  97. tag: tag,
  98. proxy: p,
  99. address: address,
  100. port: net.Port(port),
  101. dispatcher: h.mux,
  102. uplinkCounter: uplinkCounter,
  103. downlinkCounter: downlinkCounter,
  104. stream: mss,
  105. }
  106. h.workers = append(h.workers, worker)
  107. }
  108. }
  109. return h, nil
  110. }
  111. // Start implements common.Runnable.
  112. func (h *AlwaysOnInboundHandler) Start() error {
  113. for _, worker := range h.workers {
  114. if err := worker.Start(); err != nil {
  115. return err
  116. }
  117. }
  118. return nil
  119. }
  120. // Close implements common.Closable.
  121. func (h *AlwaysOnInboundHandler) Close() error {
  122. var errors []interface{}
  123. for _, worker := range h.workers {
  124. if err := worker.Close(); err != nil {
  125. errors = append(errors, err)
  126. }
  127. }
  128. if err := h.mux.Close(); err != nil {
  129. errors = append(errors, err)
  130. }
  131. if len(errors) > 0 {
  132. return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
  133. }
  134. return nil
  135. }
  136. func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
  137. if len(h.workers) == 0 {
  138. return nil, 0, 0
  139. }
  140. w := h.workers[dice.Roll(len(h.workers))]
  141. return w.Proxy(), w.Port(), 9999
  142. }
  143. func (h *AlwaysOnInboundHandler) Tag() string {
  144. return h.tag
  145. }
  146. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  147. return h.proxy
  148. }