always.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package inbound
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/app/proxyman"
  6. "v2ray.com/core/app/proxyman/mux"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/dice"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/serial"
  11. "v2ray.com/core/features/stats"
  12. "v2ray.com/core/proxy"
  13. "v2ray.com/core/transport/internet"
  14. )
  15. func getStatCounter(v *core.Instance, tag string) (stats.Counter, stats.Counter) {
  16. var uplinkCounter stats.Counter
  17. var downlinkCounter stats.Counter
  18. policy := v.PolicyManager()
  19. statsManager := v.Stats()
  20. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  21. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  22. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  23. if c != nil {
  24. uplinkCounter = c
  25. }
  26. }
  27. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  28. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  29. c, _ := stats.GetOrRegisterCounter(statsManager, name)
  30. if c != nil {
  31. downlinkCounter = c
  32. }
  33. }
  34. return uplinkCounter, downlinkCounter
  35. }
  36. type AlwaysOnInboundHandler struct {
  37. proxy proxy.Inbound
  38. workers []worker
  39. mux *mux.Server
  40. tag string
  41. }
  42. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  43. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  44. if err != nil {
  45. return nil, err
  46. }
  47. p, ok := rawProxy.(proxy.Inbound)
  48. if !ok {
  49. return nil, newError("not an inbound proxy.")
  50. }
  51. h := &AlwaysOnInboundHandler{
  52. proxy: p,
  53. mux: mux.NewServer(ctx),
  54. tag: tag,
  55. }
  56. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  57. nl := p.Network()
  58. pr := receiverConfig.PortRange
  59. address := receiverConfig.Listen.AsAddress()
  60. if address == nil {
  61. address = net.AnyIP
  62. }
  63. mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
  64. if err != nil {
  65. return nil, newError("failed to parse stream config").Base(err).AtWarning()
  66. }
  67. if receiverConfig.ReceiveOriginalDestination {
  68. if mss.SocketSettings == nil {
  69. mss.SocketSettings = &internet.SocketConfig{}
  70. }
  71. if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
  72. mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
  73. }
  74. mss.SocketSettings.ReceiveOriginalDestAddress = true
  75. }
  76. for port := pr.From; port <= pr.To; port++ {
  77. if nl.HasNetwork(net.Network_TCP) {
  78. newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
  79. worker := &tcpWorker{
  80. address: address,
  81. port: net.Port(port),
  82. proxy: p,
  83. stream: mss,
  84. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  85. tag: tag,
  86. dispatcher: h.mux,
  87. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  88. uplinkCounter: uplinkCounter,
  89. downlinkCounter: downlinkCounter,
  90. }
  91. h.workers = append(h.workers, worker)
  92. }
  93. if nl.HasNetwork(net.Network_UDP) {
  94. worker := &udpWorker{
  95. tag: tag,
  96. proxy: p,
  97. address: address,
  98. port: net.Port(port),
  99. dispatcher: h.mux,
  100. uplinkCounter: uplinkCounter,
  101. downlinkCounter: downlinkCounter,
  102. stream: mss,
  103. }
  104. h.workers = append(h.workers, worker)
  105. }
  106. }
  107. return h, nil
  108. }
  109. // Start implements common.Runnable.
  110. func (h *AlwaysOnInboundHandler) Start() error {
  111. for _, worker := range h.workers {
  112. if err := worker.Start(); err != nil {
  113. return err
  114. }
  115. }
  116. return nil
  117. }
  118. // Close implements common.Closable.
  119. func (h *AlwaysOnInboundHandler) Close() error {
  120. var errors []interface{}
  121. for _, worker := range h.workers {
  122. if err := worker.Close(); err != nil {
  123. errors = append(errors, err)
  124. }
  125. }
  126. if err := h.mux.Close(); err != nil {
  127. errors = append(errors, err)
  128. }
  129. if len(errors) > 0 {
  130. return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
  131. }
  132. return nil
  133. }
  134. func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
  135. if len(h.workers) == 0 {
  136. return nil, 0, 0
  137. }
  138. w := h.workers[dice.Roll(len(h.workers))]
  139. return w.Proxy(), w.Port(), 9999
  140. }
  141. func (h *AlwaysOnInboundHandler) Tag() string {
  142. return h.tag
  143. }
  144. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  145. return h.proxy
  146. }