always.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package inbound
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/app/proxyman"
  6. "v2ray.com/core/app/proxyman/mux"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/dice"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/proxy"
  11. )
  12. func getStatCounter(v *core.Instance, tag string) (core.StatCounter, core.StatCounter) {
  13. var uplinkCounter core.StatCounter
  14. var downlinkCounter core.StatCounter
  15. policy := v.PolicyManager()
  16. stats := v.Stats()
  17. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  18. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  19. c, _ := core.GetOrRegisterStatCounter(stats, name)
  20. if c != nil {
  21. uplinkCounter = c
  22. }
  23. }
  24. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  25. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  26. c, _ := core.GetOrRegisterStatCounter(stats, name)
  27. if c != nil {
  28. downlinkCounter = c
  29. }
  30. }
  31. return uplinkCounter, downlinkCounter
  32. }
  33. type AlwaysOnInboundHandler struct {
  34. proxy proxy.Inbound
  35. workers []worker
  36. mux *mux.Server
  37. tag string
  38. }
  39. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  40. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  41. if err != nil {
  42. return nil, err
  43. }
  44. p, ok := rawProxy.(proxy.Inbound)
  45. if !ok {
  46. return nil, newError("not an inbound proxy.")
  47. }
  48. h := &AlwaysOnInboundHandler{
  49. proxy: p,
  50. mux: mux.NewServer(ctx),
  51. tag: tag,
  52. }
  53. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  54. nl := p.Network()
  55. pr := receiverConfig.PortRange
  56. address := receiverConfig.Listen.AsAddress()
  57. if address == nil {
  58. address = net.AnyIP
  59. }
  60. for port := pr.From; port <= pr.To; port++ {
  61. if nl.HasNetwork(net.Network_TCP) {
  62. newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
  63. worker := &tcpWorker{
  64. address: address,
  65. port: net.Port(port),
  66. proxy: p,
  67. stream: receiverConfig.StreamSettings,
  68. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  69. tag: tag,
  70. dispatcher: h.mux,
  71. sniffers: receiverConfig.DomainOverride,
  72. uplinkCounter: uplinkCounter,
  73. downlinkCounter: downlinkCounter,
  74. }
  75. h.workers = append(h.workers, worker)
  76. }
  77. if nl.HasNetwork(net.Network_UDP) {
  78. worker := &udpWorker{
  79. tag: tag,
  80. proxy: p,
  81. address: address,
  82. port: net.Port(port),
  83. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  84. dispatcher: h.mux,
  85. uplinkCounter: uplinkCounter,
  86. downlinkCounter: downlinkCounter,
  87. }
  88. h.workers = append(h.workers, worker)
  89. }
  90. }
  91. return h, nil
  92. }
  93. func (h *AlwaysOnInboundHandler) Start() error {
  94. for _, worker := range h.workers {
  95. if err := worker.Start(); err != nil {
  96. return err
  97. }
  98. }
  99. return nil
  100. }
  101. func (h *AlwaysOnInboundHandler) Close() error {
  102. for _, worker := range h.workers {
  103. worker.Close()
  104. }
  105. h.mux.Close()
  106. return nil
  107. }
  108. func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
  109. if len(h.workers) == 0 {
  110. return nil, 0, 0
  111. }
  112. w := h.workers[dice.Roll(len(h.workers))]
  113. return w.Proxy(), w.Port(), 9999
  114. }
  115. func (h *AlwaysOnInboundHandler) Tag() string {
  116. return h.tag
  117. }
  118. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  119. return h.proxy
  120. }