always.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package inbound
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/app/proxyman"
  6. "v2ray.com/core/app/proxyman/mux"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/dice"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/serial"
  11. "v2ray.com/core/proxy"
  12. )
  13. func getStatCounter(v *core.Instance, tag string) (core.StatCounter, core.StatCounter) {
  14. var uplinkCounter core.StatCounter
  15. var downlinkCounter core.StatCounter
  16. policy := v.PolicyManager()
  17. stats := v.Stats()
  18. if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
  19. name := "inbound>>>" + tag + ">>>traffic>>>uplink"
  20. c, _ := core.GetOrRegisterStatCounter(stats, name)
  21. if c != nil {
  22. uplinkCounter = c
  23. }
  24. }
  25. if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
  26. name := "inbound>>>" + tag + ">>>traffic>>>downlink"
  27. c, _ := core.GetOrRegisterStatCounter(stats, name)
  28. if c != nil {
  29. downlinkCounter = c
  30. }
  31. }
  32. return uplinkCounter, downlinkCounter
  33. }
  34. type AlwaysOnInboundHandler struct {
  35. proxy proxy.Inbound
  36. workers []worker
  37. mux *mux.Server
  38. tag string
  39. }
  40. func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *proxyman.ReceiverConfig, proxyConfig interface{}) (*AlwaysOnInboundHandler, error) {
  41. rawProxy, err := common.CreateObject(ctx, proxyConfig)
  42. if err != nil {
  43. return nil, err
  44. }
  45. p, ok := rawProxy.(proxy.Inbound)
  46. if !ok {
  47. return nil, newError("not an inbound proxy.")
  48. }
  49. h := &AlwaysOnInboundHandler{
  50. proxy: p,
  51. mux: mux.NewServer(ctx),
  52. tag: tag,
  53. }
  54. uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
  55. nl := p.Network()
  56. pr := receiverConfig.PortRange
  57. address := receiverConfig.Listen.AsAddress()
  58. if address == nil {
  59. address = net.AnyIP
  60. }
  61. for port := pr.From; port <= pr.To; port++ {
  62. if nl.HasNetwork(net.Network_TCP) {
  63. newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
  64. worker := &tcpWorker{
  65. address: address,
  66. port: net.Port(port),
  67. proxy: p,
  68. stream: receiverConfig.StreamSettings,
  69. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  70. tag: tag,
  71. dispatcher: h.mux,
  72. sniffingConfig: receiverConfig.GetEffectiveSniffingSettings(),
  73. uplinkCounter: uplinkCounter,
  74. downlinkCounter: downlinkCounter,
  75. }
  76. h.workers = append(h.workers, worker)
  77. }
  78. if nl.HasNetwork(net.Network_UDP) {
  79. worker := &udpWorker{
  80. tag: tag,
  81. proxy: p,
  82. address: address,
  83. port: net.Port(port),
  84. recvOrigDest: receiverConfig.ReceiveOriginalDestination,
  85. dispatcher: h.mux,
  86. uplinkCounter: uplinkCounter,
  87. downlinkCounter: downlinkCounter,
  88. }
  89. h.workers = append(h.workers, worker)
  90. }
  91. }
  92. return h, nil
  93. }
  94. // Start implements common.Runnable.
  95. func (h *AlwaysOnInboundHandler) Start() error {
  96. for _, worker := range h.workers {
  97. if err := worker.Start(); err != nil {
  98. return err
  99. }
  100. }
  101. return nil
  102. }
  103. // Close implements common.Closable.
  104. func (h *AlwaysOnInboundHandler) Close() error {
  105. var errors []interface{}
  106. for _, worker := range h.workers {
  107. if err := worker.Close(); err != nil {
  108. errors = append(errors, err)
  109. }
  110. }
  111. if err := h.mux.Close(); err != nil {
  112. errors = append(errors, err)
  113. }
  114. if len(errors) > 0 {
  115. return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
  116. }
  117. return nil
  118. }
  119. func (h *AlwaysOnInboundHandler) GetRandomInboundProxy() (interface{}, net.Port, int) {
  120. if len(h.workers) == 0 {
  121. return nil, 0, 0
  122. }
  123. w := h.workers[dice.Roll(len(h.workers))]
  124. return w.Proxy(), w.Port(), 9999
  125. }
  126. func (h *AlwaysOnInboundHandler) Tag() string {
  127. return h.tag
  128. }
  129. func (h *AlwaysOnInboundHandler) GetInbound() proxy.Inbound {
  130. return h.proxy
  131. }