mux.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package mux
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "v2ray.com/core/app"
  7. "v2ray.com/core/app/dispatcher"
  8. "v2ray.com/core/app/log"
  9. "v2ray.com/core/common/buf"
  10. "v2ray.com/core/common/errors"
  11. "v2ray.com/core/common/net"
  12. "v2ray.com/core/common/signal"
  13. "v2ray.com/core/proxy"
  14. "v2ray.com/core/transport/ray"
  15. )
  16. const (
  17. maxParallel = 8
  18. maxTotal = 128
  19. )
  20. type manager interface {
  21. remove(id uint16)
  22. }
  23. type session struct {
  24. sync.Mutex
  25. input ray.InputStream
  26. output ray.OutputStream
  27. parent manager
  28. id uint16
  29. uplinkClosed bool
  30. downlinkClosed bool
  31. }
  32. func (s *session) checkAndRemove() {
  33. s.Lock()
  34. if s.uplinkClosed && s.downlinkClosed {
  35. s.parent.remove(s.id)
  36. }
  37. s.Unlock()
  38. }
  39. func (s *session) closeUplink() {
  40. s.Lock()
  41. s.uplinkClosed = true
  42. s.Unlock()
  43. s.checkAndRemove()
  44. }
  45. func (s *session) closeDownlink() {
  46. s.Lock()
  47. s.downlinkClosed = true
  48. s.Unlock()
  49. s.checkAndRemove()
  50. }
  51. type Client struct {
  52. access sync.RWMutex
  53. count uint16
  54. sessions map[uint16]*session
  55. inboundRay ray.InboundRay
  56. ctx context.Context
  57. cancel context.CancelFunc
  58. }
  59. var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527))
  60. func NewClient(p proxy.Outbound, dialer proxy.Dialer) (*Client, error) {
  61. ctx, cancel := context.WithCancel(context.Background())
  62. ctx = proxy.ContextWithTarget(ctx, muxCoolDestination)
  63. pipe := ray.NewRay(ctx)
  64. err := p.Process(ctx, pipe, dialer)
  65. if err != nil {
  66. cancel()
  67. return nil, err
  68. }
  69. return &Client{
  70. sessions: make(map[uint16]*session, 256),
  71. inboundRay: pipe,
  72. ctx: ctx,
  73. cancel: cancel,
  74. }, nil
  75. }
  76. func (m *Client) isFullyOccupied() bool {
  77. m.access.RLock()
  78. defer m.access.RUnlock()
  79. return len(m.sessions) >= maxParallel
  80. }
  81. func (m *Client) remove(id uint16) {
  82. m.access.Lock()
  83. defer m.access.Unlock()
  84. delete(m.sessions, id)
  85. if len(m.sessions) == 0 {
  86. m.cancel()
  87. m.inboundRay.InboundInput().Close()
  88. }
  89. }
  90. func (m *Client) Closed() bool {
  91. select {
  92. case <-m.ctx.Done():
  93. return true
  94. default:
  95. return false
  96. }
  97. }
  98. func (m *Client) fetchInput(ctx context.Context, s *session) {
  99. dest, _ := proxy.TargetFromContext(ctx)
  100. writer := &Writer{
  101. dest: dest,
  102. id: s.id,
  103. writer: m.inboundRay.InboundInput(),
  104. }
  105. _, timer := signal.CancelAfterInactivity(ctx, time.Minute*5)
  106. buf.PipeUntilEOF(timer, s.input, writer)
  107. writer.Close()
  108. s.closeUplink()
  109. }
  110. func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool {
  111. m.access.Lock()
  112. defer m.access.Unlock()
  113. if len(m.sessions) >= maxParallel {
  114. return false
  115. }
  116. if m.count >= maxTotal {
  117. return false
  118. }
  119. select {
  120. case <-m.ctx.Done():
  121. return false
  122. default:
  123. }
  124. m.count++
  125. id := m.count
  126. s := &session{
  127. input: outboundRay.OutboundInput(),
  128. output: outboundRay.OutboundOutput(),
  129. parent: m,
  130. id: id,
  131. }
  132. m.sessions[id] = s
  133. go m.fetchInput(ctx, s)
  134. return true
  135. }
  136. func (m *Client) fetchOutput() {
  137. reader := NewReader(m.inboundRay.InboundOutput())
  138. for {
  139. meta, err := reader.ReadMetadata()
  140. if err != nil {
  141. break
  142. }
  143. m.access.RLock()
  144. s, found := m.sessions[meta.SessionID]
  145. m.access.RUnlock()
  146. if found && meta.SessionStatus == SessionStatusEnd {
  147. s.closeDownlink()
  148. s.output.Close()
  149. }
  150. if !meta.Option.Has(OptionData) {
  151. continue
  152. }
  153. for {
  154. data, more, err := reader.Read()
  155. if err != nil {
  156. break
  157. }
  158. if found {
  159. if err := s.output.Write(data); err != nil {
  160. break
  161. }
  162. }
  163. if !more {
  164. break
  165. }
  166. }
  167. }
  168. }
  169. type Server struct {
  170. dispatcher dispatcher.Interface
  171. }
  172. func NewServer(ctx context.Context) *Server {
  173. s := &Server{}
  174. space := app.SpaceFromContext(ctx)
  175. space.OnInitialize(func() error {
  176. d := dispatcher.FromSpace(space)
  177. if d == nil {
  178. return errors.New("Proxyman|Mux: No dispatcher in space.")
  179. }
  180. s.dispatcher = d
  181. return nil
  182. })
  183. return s
  184. }
  185. func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
  186. if dest != muxCoolDestination {
  187. return s.dispatcher.Dispatch(ctx, dest)
  188. }
  189. ray := ray.NewRay(ctx)
  190. worker := &ServerWorker{
  191. dispatcher: s.dispatcher,
  192. outboundRay: ray,
  193. sessions: make(map[uint16]*session),
  194. }
  195. go worker.run(ctx)
  196. return ray, nil
  197. }
  198. type ServerWorker struct {
  199. dispatcher dispatcher.Interface
  200. outboundRay ray.OutboundRay
  201. sessions map[uint16]*session
  202. access sync.RWMutex
  203. }
  204. func (w *ServerWorker) remove(id uint16) {
  205. w.access.Lock()
  206. delete(w.sessions, id)
  207. w.access.Unlock()
  208. }
  209. func (w *ServerWorker) handle(ctx context.Context, s *session) {
  210. for {
  211. select {
  212. case <-ctx.Done():
  213. return
  214. default:
  215. data, err := s.input.Read()
  216. if err != nil {
  217. return
  218. }
  219. w.outboundRay.OutboundOutput().Write(data)
  220. }
  221. }
  222. }
  223. func (w *ServerWorker) run(ctx context.Context) {
  224. input := w.outboundRay.OutboundInput()
  225. reader := NewReader(input)
  226. for {
  227. select {
  228. case <-ctx.Done():
  229. return
  230. default:
  231. }
  232. meta, err := reader.ReadMetadata()
  233. if err != nil {
  234. return
  235. }
  236. w.access.RLock()
  237. s, found := w.sessions[meta.SessionID]
  238. w.access.RUnlock()
  239. if found && meta.SessionStatus == SessionStatusEnd {
  240. s.closeUplink()
  241. s.output.Close()
  242. }
  243. if meta.SessionStatus == SessionStatusNew {
  244. inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
  245. if err != nil {
  246. log.Info("Proxyman|Mux: Failed to dispatch request: ", err)
  247. continue
  248. }
  249. s = &session{
  250. input: inboundRay.InboundOutput(),
  251. output: inboundRay.InboundInput(),
  252. parent: w,
  253. id: meta.SessionID,
  254. }
  255. w.access.Lock()
  256. w.sessions[meta.SessionID] = s
  257. w.access.Unlock()
  258. go w.handle(ctx, s)
  259. }
  260. if meta.Option.Has(OptionData) {
  261. for {
  262. data, more, err := reader.Read()
  263. if err != nil {
  264. break
  265. }
  266. if s != nil {
  267. s.output.Write(data)
  268. }
  269. if !more {
  270. break
  271. }
  272. }
  273. }
  274. }
  275. }