portal.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. // +build !confonly
  2. package reverse
  3. import (
  4. "context"
  5. "sync"
  6. "time"
  7. "github.com/golang/protobuf/proto"
  8. "github.com/v2fly/v2ray-core/v4/common"
  9. "github.com/v2fly/v2ray-core/v4/common/buf"
  10. "github.com/v2fly/v2ray-core/v4/common/mux"
  11. "github.com/v2fly/v2ray-core/v4/common/net"
  12. "github.com/v2fly/v2ray-core/v4/common/session"
  13. "github.com/v2fly/v2ray-core/v4/common/task"
  14. "github.com/v2fly/v2ray-core/v4/features/outbound"
  15. "github.com/v2fly/v2ray-core/v4/transport"
  16. "github.com/v2fly/v2ray-core/v4/transport/pipe"
  17. )
  18. type Portal struct {
  19. ohm outbound.Manager
  20. tag string
  21. domain string
  22. picker *StaticMuxPicker
  23. client *mux.ClientManager
  24. }
  25. func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
  26. if config.Tag == "" {
  27. return nil, newError("portal tag is empty")
  28. }
  29. if config.Domain == "" {
  30. return nil, newError("portal domain is empty")
  31. }
  32. picker, err := NewStaticMuxPicker()
  33. if err != nil {
  34. return nil, err
  35. }
  36. return &Portal{
  37. ohm: ohm,
  38. tag: config.Tag,
  39. domain: config.Domain,
  40. picker: picker,
  41. client: &mux.ClientManager{
  42. Picker: picker,
  43. },
  44. }, nil
  45. }
  46. func (p *Portal) Start() error {
  47. return p.ohm.AddHandler(context.Background(), &Outbound{
  48. portal: p,
  49. tag: p.tag,
  50. })
  51. }
  52. func (p *Portal) Close() error {
  53. return p.ohm.RemoveHandler(context.Background(), p.tag)
  54. }
  55. func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
  56. outboundMeta := session.OutboundFromContext(ctx)
  57. if outboundMeta == nil {
  58. return newError("outbound metadata not found").AtError()
  59. }
  60. if isDomain(outboundMeta.Target, p.domain) {
  61. muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
  62. if err != nil {
  63. return newError("failed to create mux client worker").Base(err).AtWarning()
  64. }
  65. worker, err := NewPortalWorker(muxClient)
  66. if err != nil {
  67. return newError("failed to create portal worker").Base(err)
  68. }
  69. p.picker.AddWorker(worker)
  70. return nil
  71. }
  72. return p.client.Dispatch(ctx, link)
  73. }
  74. type Outbound struct {
  75. portal *Portal
  76. tag string
  77. }
  78. func (o *Outbound) Tag() string {
  79. return o.tag
  80. }
  81. func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
  82. if err := o.portal.HandleConnection(ctx, link); err != nil {
  83. newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
  84. common.Interrupt(link.Writer)
  85. }
  86. }
  87. func (o *Outbound) Start() error {
  88. return nil
  89. }
  90. func (o *Outbound) Close() error {
  91. return nil
  92. }
  93. type StaticMuxPicker struct {
  94. access sync.Mutex
  95. workers []*PortalWorker
  96. cTask *task.Periodic
  97. }
  98. func NewStaticMuxPicker() (*StaticMuxPicker, error) {
  99. p := &StaticMuxPicker{}
  100. p.cTask = &task.Periodic{
  101. Execute: p.cleanup,
  102. Interval: time.Second * 30,
  103. }
  104. p.cTask.Start()
  105. return p, nil
  106. }
  107. func (p *StaticMuxPicker) cleanup() error {
  108. p.access.Lock()
  109. defer p.access.Unlock()
  110. var activeWorkers []*PortalWorker
  111. for _, w := range p.workers {
  112. if !w.Closed() {
  113. activeWorkers = append(activeWorkers, w)
  114. }
  115. }
  116. if len(activeWorkers) != len(p.workers) {
  117. p.workers = activeWorkers
  118. }
  119. return nil
  120. }
  121. func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
  122. p.access.Lock()
  123. defer p.access.Unlock()
  124. if len(p.workers) == 0 {
  125. return nil, newError("empty worker list")
  126. }
  127. var minIdx int = -1
  128. var minConn uint32 = 9999
  129. for i, w := range p.workers {
  130. if w.draining {
  131. continue
  132. }
  133. if w.client.ActiveConnections() < minConn {
  134. minConn = w.client.ActiveConnections()
  135. minIdx = i
  136. }
  137. }
  138. if minIdx == -1 {
  139. for i, w := range p.workers {
  140. if w.IsFull() {
  141. continue
  142. }
  143. if w.client.ActiveConnections() < minConn {
  144. minConn = w.client.ActiveConnections()
  145. minIdx = i
  146. }
  147. }
  148. }
  149. if minIdx != -1 {
  150. return p.workers[minIdx].client, nil
  151. }
  152. return nil, newError("no mux client worker available")
  153. }
  154. func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
  155. p.access.Lock()
  156. defer p.access.Unlock()
  157. p.workers = append(p.workers, worker)
  158. }
  159. type PortalWorker struct {
  160. client *mux.ClientWorker
  161. control *task.Periodic
  162. writer buf.Writer
  163. reader buf.Reader
  164. draining bool
  165. }
  166. func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
  167. opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
  168. uplinkReader, uplinkWriter := pipe.New(opt...)
  169. downlinkReader, downlinkWriter := pipe.New(opt...)
  170. ctx := context.Background()
  171. ctx = session.ContextWithOutbound(ctx, &session.Outbound{
  172. Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
  173. })
  174. f := client.Dispatch(ctx, &transport.Link{
  175. Reader: uplinkReader,
  176. Writer: downlinkWriter,
  177. })
  178. if !f {
  179. return nil, newError("unable to dispatch control connection")
  180. }
  181. w := &PortalWorker{
  182. client: client,
  183. reader: downlinkReader,
  184. writer: uplinkWriter,
  185. }
  186. w.control = &task.Periodic{
  187. Execute: w.heartbeat,
  188. Interval: time.Second * 2,
  189. }
  190. w.control.Start()
  191. return w, nil
  192. }
  193. func (w *PortalWorker) heartbeat() error {
  194. if w.client.Closed() {
  195. return newError("client worker stopped")
  196. }
  197. if w.draining || w.writer == nil {
  198. return newError("already disposed")
  199. }
  200. msg := &Control{}
  201. msg.FillInRandom()
  202. if w.client.TotalConnections() > 256 {
  203. w.draining = true
  204. msg.State = Control_DRAIN
  205. defer func() {
  206. common.Close(w.writer)
  207. common.Interrupt(w.reader)
  208. w.writer = nil
  209. }()
  210. }
  211. b, err := proto.Marshal(msg)
  212. common.Must(err)
  213. mb := buf.MergeBytes(nil, b)
  214. return w.writer.WriteMultiBuffer(mb)
  215. }
  216. func (w *PortalWorker) IsFull() bool {
  217. return w.client.IsFull()
  218. }
  219. func (w *PortalWorker) Closed() bool {
  220. return w.client.Closed()
  221. }