client.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. package mux
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/errors"
  10. "v2ray.com/core/common/net"
  11. "v2ray.com/core/common/protocol"
  12. "v2ray.com/core/common/session"
  13. "v2ray.com/core/common/signal/done"
  14. "v2ray.com/core/common/task"
  15. "v2ray.com/core/proxy"
  16. "v2ray.com/core/transport"
  17. "v2ray.com/core/transport/internet"
  18. "v2ray.com/core/transport/pipe"
  19. )
  20. type ClientManager struct {
  21. Picker WorkerPicker
  22. }
  23. func (m *ClientManager) Dispatch(ctx context.Context, link *transport.Link) error {
  24. for i := 0; i < 16; i++ {
  25. worker, err := m.Picker.PickAvailable()
  26. if err != nil {
  27. return err
  28. }
  29. if worker.Dispatch(ctx, link) {
  30. return nil
  31. }
  32. }
  33. return newError("unable to find an available mux client").AtWarning()
  34. }
  35. type WorkerPicker interface {
  36. PickAvailable() (*ClientWorker, error)
  37. }
  38. type IncrementalWorkerPicker struct {
  39. Factory ClientWorkerFactory
  40. access sync.Mutex
  41. workers []*ClientWorker
  42. cleanupTask *task.Periodic
  43. }
  44. func (p *IncrementalWorkerPicker) cleanupFunc() error {
  45. p.access.Lock()
  46. defer p.access.Unlock()
  47. if len(p.workers) == 0 {
  48. return newError("no worker")
  49. }
  50. p.cleanup()
  51. return nil
  52. }
  53. func (p *IncrementalWorkerPicker) cleanup() {
  54. var activeWorkers []*ClientWorker
  55. for _, w := range p.workers {
  56. if !w.Closed() {
  57. activeWorkers = append(activeWorkers, w)
  58. }
  59. }
  60. p.workers = activeWorkers
  61. }
  62. func (p *IncrementalWorkerPicker) findAvailable() int {
  63. for idx, w := range p.workers {
  64. if !w.IsFull() {
  65. return idx
  66. }
  67. }
  68. return -1
  69. }
  70. func (p *IncrementalWorkerPicker) pickInternal() (*ClientWorker, error, bool) {
  71. p.access.Lock()
  72. defer p.access.Unlock()
  73. idx := p.findAvailable()
  74. if idx >= 0 {
  75. n := len(p.workers)
  76. if n > 1 && idx != n-1 {
  77. p.workers[n-1], p.workers[idx] = p.workers[idx], p.workers[n-1]
  78. }
  79. return p.workers[idx], nil, false
  80. }
  81. p.cleanup()
  82. worker, err := p.Factory.Create()
  83. if err != nil {
  84. return nil, err, false
  85. }
  86. p.workers = append(p.workers, worker)
  87. if p.cleanupTask == nil {
  88. p.cleanupTask = &task.Periodic{
  89. Interval: time.Second * 30,
  90. Execute: p.cleanupFunc,
  91. }
  92. }
  93. return worker, nil, true
  94. }
  95. func (p *IncrementalWorkerPicker) PickAvailable() (*ClientWorker, error) {
  96. worker, err, start := p.pickInternal()
  97. if start {
  98. common.Must(p.cleanupTask.Start())
  99. }
  100. return worker, err
  101. }
  102. type ClientWorkerFactory interface {
  103. Create() (*ClientWorker, error)
  104. }
  105. type DialingWorkerFactory struct {
  106. Proxy proxy.Outbound
  107. Dialer internet.Dialer
  108. Strategy ClientStrategy
  109. }
  110. func (f *DialingWorkerFactory) Create() (*ClientWorker, error) {
  111. opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)}
  112. uplinkReader, upLinkWriter := pipe.New(opts...)
  113. downlinkReader, downlinkWriter := pipe.New(opts...)
  114. c, err := NewClientWorker(transport.Link{
  115. Reader: downlinkReader,
  116. Writer: upLinkWriter,
  117. }, f.Strategy)
  118. if err != nil {
  119. return nil, err
  120. }
  121. go func(p proxy.Outbound, d internet.Dialer, c common.Closable) {
  122. ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{
  123. Target: net.TCPDestination(muxCoolAddress, muxCoolPort),
  124. })
  125. ctx, cancel := context.WithCancel(ctx)
  126. if err := p.Process(ctx, &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}, d); err != nil {
  127. errors.New("failed to handler mux client connection").Base(err).WriteToLog()
  128. }
  129. common.Must(c.Close())
  130. cancel()
  131. }(f.Proxy, f.Dialer, c.done)
  132. return c, nil
  133. }
  134. type ClientStrategy struct {
  135. MaxConcurrency uint32
  136. MaxConnection uint32
  137. }
  138. type ClientWorker struct {
  139. sessionManager *SessionManager
  140. link transport.Link
  141. done *done.Instance
  142. strategy ClientStrategy
  143. }
  144. var muxCoolAddress = net.DomainAddress("v1.mux.cool")
  145. var muxCoolPort = net.Port(9527)
  146. // NewClientWorker creates a new mux.Client.
  147. func NewClientWorker(stream transport.Link, s ClientStrategy) (*ClientWorker, error) {
  148. c := &ClientWorker{
  149. sessionManager: NewSessionManager(),
  150. link: stream,
  151. done: done.New(),
  152. strategy: s,
  153. }
  154. go c.fetchOutput()
  155. go c.monitor()
  156. return c, nil
  157. }
  158. func (m *ClientWorker) TotalConnections() uint32 {
  159. return uint32(m.sessionManager.Count())
  160. }
  161. func (m *ClientWorker) ActiveConnections() uint32 {
  162. return uint32(m.sessionManager.Size())
  163. }
  164. // Closed returns true if this Client is closed.
  165. func (m *ClientWorker) Closed() bool {
  166. return m.done.Done()
  167. }
  168. func (m *ClientWorker) monitor() {
  169. timer := time.NewTicker(time.Second * 16)
  170. defer timer.Stop()
  171. for {
  172. select {
  173. case <-m.done.Wait():
  174. m.sessionManager.Close()
  175. common.Close(m.link.Writer) // nolint: errcheck
  176. common.Interrupt(m.link.Reader) // nolint: errcheck
  177. return
  178. case <-timer.C:
  179. size := m.sessionManager.Size()
  180. if size == 0 && m.sessionManager.CloseIfNoSession() {
  181. common.Must(m.done.Close())
  182. }
  183. }
  184. }
  185. }
  186. func writeFirstPayload(reader buf.Reader, writer *Writer) error {
  187. err := buf.CopyOnceTimeout(reader, writer, time.Millisecond*100)
  188. if err == buf.ErrNotTimeoutReader || err == buf.ErrReadTimeout {
  189. return writer.WriteMultiBuffer(buf.MultiBuffer{})
  190. }
  191. if err != nil {
  192. return err
  193. }
  194. return nil
  195. }
  196. func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
  197. dest := session.OutboundFromContext(ctx).Target
  198. transferType := protocol.TransferTypeStream
  199. if dest.Network == net.Network_UDP {
  200. transferType = protocol.TransferTypePacket
  201. }
  202. s.transferType = transferType
  203. writer := NewWriter(s.ID, dest, output, transferType)
  204. defer s.Close() // nolint: errcheck
  205. defer writer.Close() // nolint: errcheck
  206. newError("dispatching request to ", dest).WriteToLog(session.ExportIDToError(ctx))
  207. if err := writeFirstPayload(s.input, writer); err != nil {
  208. newError("failed to write first payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
  209. writer.hasError = true
  210. common.Interrupt(s.input)
  211. return
  212. }
  213. if err := buf.Copy(s.input, writer); err != nil {
  214. newError("failed to fetch all input").Base(err).WriteToLog(session.ExportIDToError(ctx))
  215. writer.hasError = true
  216. common.Interrupt(s.input)
  217. return
  218. }
  219. }
  220. func (m *ClientWorker) IsClosing() bool {
  221. sm := m.sessionManager
  222. if m.strategy.MaxConnection > 0 && sm.Count() >= int(m.strategy.MaxConnection) {
  223. return true
  224. }
  225. return false
  226. }
  227. func (m *ClientWorker) IsFull() bool {
  228. if m.IsClosing() || m.Closed() {
  229. return true
  230. }
  231. sm := m.sessionManager
  232. if m.strategy.MaxConcurrency > 0 && sm.Size() >= int(m.strategy.MaxConcurrency) {
  233. return true
  234. }
  235. return false
  236. }
  237. func (m *ClientWorker) Dispatch(ctx context.Context, link *transport.Link) bool {
  238. if m.IsFull() || m.Closed() {
  239. return false
  240. }
  241. sm := m.sessionManager
  242. s := sm.Allocate()
  243. if s == nil {
  244. return false
  245. }
  246. s.input = link.Reader
  247. s.output = link.Writer
  248. go fetchInput(ctx, s, m.link.Writer)
  249. return true
  250. }
  251. func (m *ClientWorker) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
  252. if meta.Option.Has(OptionData) {
  253. return buf.Copy(NewStreamReader(reader), buf.Discard)
  254. }
  255. return nil
  256. }
  257. func (m *ClientWorker) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
  258. if meta.Option.Has(OptionData) {
  259. return buf.Copy(NewStreamReader(reader), buf.Discard)
  260. }
  261. return nil
  262. }
  263. func (m *ClientWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
  264. if !meta.Option.Has(OptionData) {
  265. return nil
  266. }
  267. s, found := m.sessionManager.Get(meta.SessionID)
  268. if !found {
  269. // Notify remote peer to close this session.
  270. closingWriter := NewResponseWriter(meta.SessionID, m.link.Writer, protocol.TransferTypeStream)
  271. closingWriter.Close()
  272. return buf.Copy(NewStreamReader(reader), buf.Discard)
  273. }
  274. rr := s.NewReader(reader)
  275. err := buf.Copy(rr, s.output)
  276. if err != nil && buf.IsWriteError(err) {
  277. newError("failed to write to downstream. closing session ", s.ID).Base(err).WriteToLog()
  278. // Notify remote peer to close this session.
  279. closingWriter := NewResponseWriter(meta.SessionID, m.link.Writer, protocol.TransferTypeStream)
  280. closingWriter.Close()
  281. drainErr := buf.Copy(rr, buf.Discard)
  282. common.Interrupt(s.input)
  283. s.Close()
  284. return drainErr
  285. }
  286. return err
  287. }
  288. func (m *ClientWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
  289. if s, found := m.sessionManager.Get(meta.SessionID); found {
  290. if meta.Option.Has(OptionError) {
  291. common.Interrupt(s.input)
  292. common.Interrupt(s.output)
  293. }
  294. s.Close()
  295. }
  296. if meta.Option.Has(OptionData) {
  297. return buf.Copy(NewStreamReader(reader), buf.Discard)
  298. }
  299. return nil
  300. }
  301. func (m *ClientWorker) fetchOutput() {
  302. defer func() {
  303. common.Must(m.done.Close())
  304. }()
  305. reader := &buf.BufferedReader{Reader: m.link.Reader}
  306. var meta FrameMetadata
  307. for {
  308. err := meta.Unmarshal(reader)
  309. if err != nil {
  310. if errors.Cause(err) != io.EOF {
  311. newError("failed to read metadata").Base(err).WriteToLog()
  312. }
  313. break
  314. }
  315. switch meta.SessionStatus {
  316. case SessionStatusKeepAlive:
  317. err = m.handleStatueKeepAlive(&meta, reader)
  318. case SessionStatusEnd:
  319. err = m.handleStatusEnd(&meta, reader)
  320. case SessionStatusNew:
  321. err = m.handleStatusNew(&meta, reader)
  322. case SessionStatusKeep:
  323. err = m.handleStatusKeep(&meta, reader)
  324. default:
  325. status := meta.SessionStatus
  326. newError("unknown status: ", status).AtError().WriteToLog()
  327. return
  328. }
  329. if err != nil {
  330. newError("failed to process data").Base(err).WriteToLog()
  331. return
  332. }
  333. }
  334. }