portal.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. package reverse
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "github.com/golang/protobuf/proto"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/dice"
  10. "v2ray.com/core/common/mux"
  11. "v2ray.com/core/common/session"
  12. "v2ray.com/core/common/task"
  13. "v2ray.com/core/common/vio"
  14. "v2ray.com/core/features/outbound"
  15. "v2ray.com/core/transport/pipe"
  16. )
  17. type Portal struct {
  18. ohm outbound.Manager
  19. tag string
  20. domain string
  21. picker *StaticMuxPicker
  22. client *mux.ClientManager
  23. }
  24. func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
  25. if len(config.Tag) == 0 {
  26. return nil, newError("portal tag is empty")
  27. }
  28. if len(config.Domain) == 0 {
  29. return nil, newError("portal domain is empty")
  30. }
  31. picker, err := NewStaticMuxPicker()
  32. if err != nil {
  33. return nil, err
  34. }
  35. return &Portal{
  36. ohm: ohm,
  37. tag: config.Tag,
  38. domain: config.Domain,
  39. picker: picker,
  40. client: &mux.ClientManager{
  41. Picker: picker,
  42. },
  43. }, nil
  44. }
  45. func (p *Portal) Start() error {
  46. return p.ohm.AddHandler(context.Background(), &Outbound{
  47. portal: p,
  48. })
  49. }
  50. func (p *Portal) Close() error {
  51. return p.ohm.RemoveHandler(context.Background(), p.tag)
  52. }
  53. func (s *Portal) HandleConnection(ctx context.Context, link *vio.Link) error {
  54. outboundMeta := session.OutboundFromContext(ctx)
  55. if outboundMeta == nil {
  56. return newError("outbound metadata not found").AtError()
  57. }
  58. if isInternalDomain(outboundMeta.Target) {
  59. muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{
  60. MaxConcurrency: 0,
  61. MaxConnection: 256,
  62. })
  63. if err != nil {
  64. return newError("failed to create mux client worker").Base(err).AtWarning()
  65. }
  66. worker, err := NewPortalWorker(muxClient)
  67. if err != nil {
  68. return newError("failed to create portal worker").Base(err)
  69. }
  70. s.picker.AddWorker(worker)
  71. return nil
  72. }
  73. return s.client.Dispatch(ctx, link)
  74. }
  75. type Outbound struct {
  76. portal *Portal
  77. tag string
  78. }
  79. func (o *Outbound) Tag() string {
  80. return o.tag
  81. }
  82. func (o *Outbound) Dispatch(ctx context.Context, link *vio.Link) {
  83. if err := o.portal.HandleConnection(ctx, link); err != nil {
  84. newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
  85. pipe.CloseError(link.Writer)
  86. }
  87. }
  88. func (o *Outbound) Start() error {
  89. return nil
  90. }
  91. func (o *Outbound) Close() error {
  92. return nil
  93. }
  94. type StaticMuxPicker struct {
  95. access sync.Mutex
  96. workers []*PortalWorker
  97. cTask *task.Periodic
  98. }
  99. func NewStaticMuxPicker() (*StaticMuxPicker, error) {
  100. p := &StaticMuxPicker{}
  101. p.cTask = &task.Periodic{
  102. Execute: p.cleanup,
  103. Interval: time.Second * 30,
  104. }
  105. p.cTask.Start()
  106. return p, nil
  107. }
  108. func (p *StaticMuxPicker) cleanup() error {
  109. p.access.Lock()
  110. defer p.access.Unlock()
  111. var activeWorkers []*PortalWorker
  112. for _, w := range p.workers {
  113. if !w.Closed() {
  114. activeWorkers = append(activeWorkers, w)
  115. }
  116. }
  117. if len(activeWorkers) != len(p.workers) {
  118. p.workers = activeWorkers
  119. }
  120. return nil
  121. }
  122. func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
  123. p.access.Lock()
  124. defer p.access.Unlock()
  125. n := len(p.workers)
  126. if n == 0 {
  127. return nil, newError("empty worker list")
  128. }
  129. idx := dice.Roll(n)
  130. for i := 0; i < n; i++ {
  131. w := p.workers[(i+idx)%n]
  132. if !w.IsFull() {
  133. return w.client, nil
  134. }
  135. }
  136. return nil, newError("no mux client worker available")
  137. }
  138. func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
  139. p.access.Lock()
  140. defer p.access.Unlock()
  141. p.workers = append(p.workers, worker)
  142. }
  143. type PortalWorker struct {
  144. client *mux.ClientWorker
  145. control *task.Periodic
  146. writer buf.Writer
  147. reader buf.Reader
  148. }
  149. func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
  150. opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
  151. uplinkReader, uplinkWriter := pipe.New(opt...)
  152. downlinkReader, downlinkWriter := pipe.New(opt...)
  153. f := client.Dispatch(context.Background(), &vio.Link{
  154. Reader: uplinkReader,
  155. Writer: downlinkWriter,
  156. })
  157. if !f {
  158. return nil, newError("unable to dispatch control connection")
  159. }
  160. w := &PortalWorker{
  161. client: client,
  162. reader: downlinkReader,
  163. writer: uplinkWriter,
  164. }
  165. w.control = &task.Periodic{
  166. Execute: w.heartbeat,
  167. Interval: time.Second * 2,
  168. }
  169. w.control.Start()
  170. return w, nil
  171. }
  172. func (w *PortalWorker) heartbeat() error {
  173. if w.client.Closed() {
  174. return newError("client worker stopped")
  175. }
  176. if w.writer == nil {
  177. return newError("already disposed")
  178. }
  179. msg := &Control{}
  180. msg.FillInRandom()
  181. if w.client.IsClosing() {
  182. msg.State = Control_DRAIN
  183. defer func() {
  184. common.Close(w.writer)
  185. pipe.CloseError(w.reader)
  186. w.writer = nil
  187. }()
  188. }
  189. b, err := proto.Marshal(msg)
  190. common.Must(err)
  191. var mb buf.MultiBuffer
  192. common.Must2(mb.Write(b))
  193. return w.writer.WriteMultiBuffer(mb)
  194. }
  195. func (w *PortalWorker) IsFull() bool {
  196. return w.client.IsFull()
  197. }
  198. func (w *PortalWorker) Closed() bool {
  199. return w.client.Closed()
  200. }