inbound_detour_dynamic.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package core
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "v2ray.com/core/app"
  7. "v2ray.com/core/common/dice"
  8. "v2ray.com/core/common/log"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/retry"
  11. "v2ray.com/core/proxy"
  12. )
  13. type InboundDetourHandlerDynamic struct {
  14. sync.RWMutex
  15. space app.Space
  16. config *InboundConnectionConfig
  17. portsInUse map[net.Port]bool
  18. ichs []proxy.InboundHandler
  19. ich2Recyle []proxy.InboundHandler
  20. lastRefresh time.Time
  21. ctx context.Context
  22. }
  23. func NewInboundDetourHandlerDynamic(ctx context.Context, config *InboundConnectionConfig) (*InboundDetourHandlerDynamic, error) {
  24. space := app.SpaceFromContext(ctx)
  25. handler := &InboundDetourHandlerDynamic{
  26. space: space,
  27. config: config,
  28. portsInUse: make(map[net.Port]bool),
  29. ctx: ctx,
  30. }
  31. handler.ichs = make([]proxy.InboundHandler, config.GetAllocationStrategyValue().GetConcurrencyValue())
  32. // To test configuration
  33. ichConfig, err := config.GetTypedSettings()
  34. if err != nil {
  35. return nil, err
  36. }
  37. ich, err := proxy.CreateInboundHandler(proxy.ContextWithInboundMeta(ctx, &proxy.InboundHandlerMeta{
  38. Address: config.GetListenOnValue(),
  39. Port: 0,
  40. Tag: config.Tag,
  41. StreamSettings: config.StreamSettings,
  42. AllowPassiveConnection: config.AllowPassiveConnection,
  43. }), ichConfig)
  44. if err != nil {
  45. log.Error("Point: Failed to create inbound connection handler: ", err)
  46. return nil, err
  47. }
  48. ich.Close()
  49. return handler, nil
  50. }
  51. func (v *InboundDetourHandlerDynamic) pickUnusedPort() net.Port {
  52. delta := int(v.config.PortRange.To) - int(v.config.PortRange.From) + 1
  53. for {
  54. r := dice.Roll(delta)
  55. port := v.config.PortRange.FromPort() + net.Port(r)
  56. _, used := v.portsInUse[port]
  57. if !used {
  58. return port
  59. }
  60. }
  61. }
  62. func (v *InboundDetourHandlerDynamic) GetConnectionHandler() (proxy.InboundHandler, int) {
  63. v.RLock()
  64. defer v.RUnlock()
  65. ich := v.ichs[dice.Roll(len(v.ichs))]
  66. until := int(v.config.GetAllocationStrategyValue().GetRefreshValue()) - int((time.Now().Unix()-v.lastRefresh.Unix())/60/1000)
  67. if until < 0 {
  68. until = 0
  69. }
  70. return ich, int(until)
  71. }
  72. func (v *InboundDetourHandlerDynamic) Close() {
  73. v.Lock()
  74. defer v.Unlock()
  75. for _, ich := range v.ichs {
  76. ich.Close()
  77. }
  78. }
  79. func (v *InboundDetourHandlerDynamic) RecyleHandles() {
  80. if v.ich2Recyle != nil {
  81. for _, ich := range v.ich2Recyle {
  82. if ich == nil {
  83. continue
  84. }
  85. port := ich.Port()
  86. ich.Close()
  87. delete(v.portsInUse, port)
  88. }
  89. v.ich2Recyle = nil
  90. }
  91. }
  92. func (v *InboundDetourHandlerDynamic) refresh() error {
  93. v.lastRefresh = time.Now()
  94. config := v.config
  95. v.ich2Recyle = v.ichs
  96. newIchs := make([]proxy.InboundHandler, config.GetAllocationStrategyValue().GetConcurrencyValue())
  97. for idx := range newIchs {
  98. err := retry.Timed(5, 100).On(func() error {
  99. port := v.pickUnusedPort()
  100. ichConfig, _ := config.GetTypedSettings()
  101. ich, err := proxy.CreateInboundHandler(proxy.ContextWithInboundMeta(v.ctx, &proxy.InboundHandlerMeta{
  102. Address: config.GetListenOnValue(),
  103. Port: port, Tag: config.Tag,
  104. StreamSettings: config.StreamSettings}), ichConfig)
  105. if err != nil {
  106. delete(v.portsInUse, port)
  107. return err
  108. }
  109. err = ich.Start()
  110. if err != nil {
  111. delete(v.portsInUse, port)
  112. return err
  113. }
  114. v.portsInUse[port] = true
  115. newIchs[idx] = ich
  116. return nil
  117. })
  118. if err != nil {
  119. log.Error("Point: Failed to create inbound connection handler: ", err)
  120. return err
  121. }
  122. }
  123. v.Lock()
  124. v.ichs = newIchs
  125. v.Unlock()
  126. return nil
  127. }
  128. func (v *InboundDetourHandlerDynamic) Start() error {
  129. err := v.refresh()
  130. if err != nil {
  131. log.Error("Point: Failed to refresh dynamic allocations: ", err)
  132. return err
  133. }
  134. go func() {
  135. for {
  136. time.Sleep(time.Duration(v.config.GetAllocationStrategyValue().GetRefreshValue())*time.Minute - 1)
  137. v.RecyleHandles()
  138. err := v.refresh()
  139. if err != nil {
  140. log.Error("Point: Failed to refresh dynamic allocations: ", err)
  141. }
  142. time.Sleep(time.Minute)
  143. }
  144. }()
  145. return nil
  146. }