always.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package inbound
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/app/proxyman"
  6. "v2ray.com/core/app/proxyman/mux"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/dice"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/serial"
  11. "v2ray.com/core/proxy"
  12. "v2ray.com/core/transport/internet"
  13. )
  14. func getStatCounter(v *core.Instance, tag string) (core.StatCounter, core.StatCounter) {
  15. var uplinkCounter core.StatCounter
  16. var downlinkCounter core.StatCounter
  17. policy := v.PolicyManager()
  18. stats := v.Stats()
  19. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  20. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  21. c, _ := core.GetOrRegisterStatCounter(stats, name)
  22. if c != nil {
  23. uplinkCounter = c
  24. }
  25. }
  26. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  27. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  28. c, _ := core.GetOrRegisterStatCounter(stats, name)
  29. if c != nil {
  30. downlinkCounter = c
  31. }
  32. }
  33. return uplinkCounter, downlinkCounter
  34. }
  35. type AlwaysOnInboundHandler struct {
  36. proxy proxy.Inbound
  37. workers []worker
  38. mux *mux.Server
  39. tag string
  40. }
  41. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  42. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  43. if err != nil {
  44. return nil, err
  45. }
  46. p, ok := rawProxy.(proxy.Inbound)
  47. if !ok {
  48. return nil, newError("not an inbound proxy.")
  49. }
  50. h := &AlwaysOnInboundHandler{
  51. proxy: p,
  52. mux: mux.NewServer(ctx),
  53. tag: tag,
  54. }
  55. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  56. nl := p.Network()
  57. pr := receiverConfig.PortRange
  58. address := receiverConfig.Listen.AsAddress()
  59. if address == nil {
  60. address = net.AnyIP
  61. }
  62. mss, err := internet.ToMemoryStreamConfig(receiverConfig.StreamSettings)
  63. if err != nil {
  64. return nil, newError("failed to parse stream config").Base(err).AtWarning()
  65. }
  66. if receiverConfig.ReceiveOriginalDestination {
  67. if mss.SocketSettings == nil {
  68. mss.SocketSettings = &internet.SocketConfig{}
  69. }
  70. if mss.SocketSettings.Tproxy == internet.SocketConfig_Off {
  71. mss.SocketSettings.Tproxy = internet.SocketConfig_Redirect
  72. }
  73. mss.SocketSettings.ReceiveOriginalDestAddress = true
  74. }
  75. for port := pr.From; port <= pr.To; port++ {
  76. if nl.HasNetwork(net.Network_TCP) {
  77. newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
  78. worker := &tcpWorker{
  79. address: address,
  80. port: net.Port(port),
  81. proxy: p,
  82. stream: mss,
  83. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  84. tag: tag,
  85. dispatcher: h.mux,
  86. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  87. uplinkCounter: uplinkCounter,
  88. downlinkCounter: downlinkCounter,
  89. }
  90. h.workers = append(h.workers, worker)
  91. }
  92. if nl.HasNetwork(net.Network_UDP) {
  93. worker := &udpWorker{
  94. tag: tag,
  95. proxy: p,
  96. address: address,
  97. port: net.Port(port),
  98. dispatcher: h.mux,
  99. uplinkCounter: uplinkCounter,
  100. downlinkCounter: downlinkCounter,
  101. stream: mss,
  102. }
  103. h.workers = append(h.workers, worker)
  104. }
  105. }
  106. return h, nil
  107. }
  108. // Start implements common.Runnable.
  109. func (h *AlwaysOnInboundHandler) Start() error {
  110. for _, worker := range h.workers {
  111. if err := worker.Start(); err != nil {
  112. return err
  113. }
  114. }
  115. return nil
  116. }
  117. // Close implements common.Closable.
  118. func (h *AlwaysOnInboundHandler) Close() error {
  119. var errors []interface{}
  120. for _, worker := range h.workers {
  121. if err := worker.Close(); err != nil {
  122. errors = append(errors, err)
  123. }
  124. }
  125. if err := h.mux.Close(); err != nil {
  126. errors = append(errors, err)
  127. }
  128. if len(errors) > 0 {
  129. return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
  130. }
  131. return nil
  132. }
  133. func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
  134. if len(h.workers) == 0 {
  135. return nil, 0, 0
  136. }
  137. w := h.workers[dice.Roll(len(h.workers))]
  138. return w.Proxy(), w.Port(), 9999
  139. }
  140. func (h *AlwaysOnInboundHandler) Tag() string {
  141. return h.tag
  142. }
  143. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  144. return h.proxy
  145. }