mux.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package mux
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/common/signal"
  8. "v2ray.com/core/proxy"
  9. "v2ray.com/core/transport/ray"
  10. )
  11. const (
  12. maxParallel = 8
  13. maxTotal = 128
  14. )
  15. type clientSession struct {
  16. sync.Mutex
  17. outboundRay ray.OutboundRay
  18. parent *Client
  19. id uint16
  20. uplinkClosed bool
  21. downlinkClosed bool
  22. }
  23. func (s *clientSession) checkAndRemove() {
  24. s.Lock()
  25. if s.uplinkClosed && s.downlinkClosed {
  26. s.parent.remove(s.id)
  27. }
  28. s.Unlock()
  29. }
  30. func (s *clientSession) closeUplink() {
  31. s.Lock()
  32. s.uplinkClosed = true
  33. s.Unlock()
  34. s.checkAndRemove()
  35. }
  36. func (s *clientSession) closeDownlink() {
  37. s.Lock()
  38. s.downlinkClosed = true
  39. s.Unlock()
  40. s.checkAndRemove()
  41. }
  42. type Client struct {
  43. access sync.RWMutex
  44. count uint16
  45. sessions map[uint16]*clientSession
  46. inboundRay ray.InboundRay
  47. }
  48. func (m *Client) IsFullyOccupied() bool {
  49. m.access.RLock()
  50. defer m.access.RUnlock()
  51. return len(m.sessions) >= maxParallel
  52. }
  53. func (m *Client) IsFullyUsed() bool {
  54. m.access.RLock()
  55. defer m.access.RUnlock()
  56. return m.count >= maxTotal
  57. }
  58. func (m *Client) remove(id uint16) {
  59. m.access.Lock()
  60. delete(m.sessions, id)
  61. m.access.Unlock()
  62. }
  63. func (m *Client) fetchInput(ctx context.Context, session *clientSession) {
  64. dest, _ := proxy.TargetFromContext(ctx)
  65. writer := &muxWriter{
  66. dest: dest,
  67. id: session.id,
  68. writer: m.inboundRay.InboundInput(),
  69. }
  70. _, timer := signal.CancelAfterInactivity(ctx, time.Minute*5)
  71. buf.PipeUntilEOF(timer, session.outboundRay.OutboundInput(), writer)
  72. writer.Close()
  73. session.closeUplink()
  74. }
  75. func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
  76. m.access.Lock()
  77. defer m.access.Unlock()
  78. m.count++
  79. id := m.count
  80. session := &clientSession{
  81. outboundRay: outboundRay,
  82. parent: m,
  83. id: id,
  84. }
  85. m.sessions[id] = session
  86. go m.fetchInput(ctx, session)
  87. }
  88. func (m *Client) fetchOutput() {
  89. reader := NewReader(m.inboundRay.InboundOutput())
  90. for {
  91. meta, err := reader.ReadMetadata()
  92. if err != nil {
  93. break
  94. }
  95. m.access.RLock()
  96. session, found := m.sessions[meta.SessionID]
  97. m.access.RUnlock()
  98. if found && meta.SessionStatus == SessionStatusEnd {
  99. session.closeDownlink()
  100. }
  101. if !meta.Option.Has(OptionData) {
  102. continue
  103. }
  104. for {
  105. data, more, err := reader.Read()
  106. if err != nil {
  107. break
  108. }
  109. if found {
  110. if err := session.outboundRay.OutboundOutput().Write(data); err != nil {
  111. break
  112. }
  113. }
  114. if !more {
  115. break
  116. }
  117. }
  118. }
  119. }