worker.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package inbound
  2. import (
  3. "context"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "v2ray.com/core/app/dispatcher"
  10. "v2ray.com/core/app/log"
  11. "v2ray.com/core/common/buf"
  12. v2net "v2ray.com/core/common/net"
  13. "v2ray.com/core/proxy"
  14. "v2ray.com/core/transport/internet"
  15. "v2ray.com/core/transport/internet/tcp"
  16. "v2ray.com/core/transport/internet/udp"
  17. )
  18. type worker interface {
  19. Start() error
  20. Close()
  21. Port() v2net.Port
  22. Proxy() proxy.Inbound
  23. }
  24. type tcpWorker struct {
  25. address v2net.Address
  26. port v2net.Port
  27. proxy proxy.Inbound
  28. stream *internet.StreamConfig
  29. recvOrigDest bool
  30. tag string
  31. dispatcher dispatcher.Interface
  32. ctx context.Context
  33. cancel context.CancelFunc
  34. hub *internet.TCPHub
  35. }
  36. func (w *tcpWorker) callback(conn internet.Connection) {
  37. ctx, cancel := context.WithCancel(w.ctx)
  38. if w.recvOrigDest {
  39. dest := tcp.GetOriginalDestination(conn)
  40. if dest.IsValid() {
  41. ctx = proxy.ContextWithOriginalDestination(ctx, dest)
  42. }
  43. }
  44. if len(w.tag) > 0 {
  45. ctx = proxy.ContextWithInboundTag(ctx, w.tag)
  46. }
  47. ctx = proxy.ContextWithInboundDestination(ctx, v2net.TCPDestination(w.address, w.port))
  48. ctx = proxy.ContextWithSource(ctx, v2net.DestinationFromAddr(conn.RemoteAddr()))
  49. if err := w.proxy.Process(ctx, v2net.Network_TCP, conn, w.dispatcher); err != nil {
  50. log.Info("Proxyman|TCPWorker: Connection ends with ", err)
  51. }
  52. cancel()
  53. conn.Close()
  54. }
  55. func (w *tcpWorker) Proxy() proxy.Inbound {
  56. return w.proxy
  57. }
  58. func (w *tcpWorker) Start() error {
  59. ctx, cancel := context.WithCancel(context.Background())
  60. w.ctx = ctx
  61. w.cancel = cancel
  62. hub, err := internet.ListenTCP(w.address, w.port, w.callback, w.stream)
  63. if err != nil {
  64. return err
  65. }
  66. w.hub = hub
  67. return nil
  68. }
  69. func (w *tcpWorker) Close() {
  70. if w.hub != nil {
  71. w.hub.Close()
  72. w.cancel()
  73. }
  74. }
  75. func (w *tcpWorker) Port() v2net.Port {
  76. return w.port
  77. }
  78. type udpConn struct {
  79. lastActivityTime int64 // in seconds
  80. input chan *buf.Buffer
  81. output func([]byte) (int, error)
  82. remote net.Addr
  83. local net.Addr
  84. cancel context.CancelFunc
  85. }
  86. func (c *udpConn) updateActivity() {
  87. atomic.StoreInt64(&c.lastActivityTime, time.Now().Unix())
  88. }
  89. func (c *udpConn) Read(buf []byte) (int, error) {
  90. in, open := <-c.input
  91. if !open {
  92. return 0, io.EOF
  93. }
  94. defer in.Release()
  95. c.updateActivity()
  96. return copy(buf, in.Bytes()), nil
  97. }
  98. func (c *udpConn) Write(buf []byte) (int, error) {
  99. n, err := c.output(buf)
  100. if err == nil {
  101. c.updateActivity()
  102. }
  103. return n, err
  104. }
  105. func (c *udpConn) Close() error {
  106. return nil
  107. }
  108. func (c *udpConn) RemoteAddr() net.Addr {
  109. return c.remote
  110. }
  111. func (c *udpConn) LocalAddr() net.Addr {
  112. return c.remote
  113. }
  114. func (*udpConn) SetDeadline(time.Time) error {
  115. return nil
  116. }
  117. func (*udpConn) SetReadDeadline(time.Time) error {
  118. return nil
  119. }
  120. func (*udpConn) SetWriteDeadline(time.Time) error {
  121. return nil
  122. }
  123. func (*udpConn) Reusable() bool {
  124. return false
  125. }
  126. func (*udpConn) SetReusable(bool) {}
  127. type udpWorker struct {
  128. sync.RWMutex
  129. proxy proxy.Inbound
  130. hub *udp.Hub
  131. address v2net.Address
  132. port v2net.Port
  133. recvOrigDest bool
  134. tag string
  135. dispatcher dispatcher.Interface
  136. ctx context.Context
  137. cancel context.CancelFunc
  138. activeConn map[v2net.Destination]*udpConn
  139. }
  140. func (w *udpWorker) getConnection(src v2net.Destination) (*udpConn, bool) {
  141. w.Lock()
  142. defer w.Unlock()
  143. if conn, found := w.activeConn[src]; found {
  144. return conn, true
  145. }
  146. conn := &udpConn{
  147. input: make(chan *buf.Buffer, 32),
  148. output: func(b []byte) (int, error) {
  149. return w.hub.WriteTo(b, src)
  150. },
  151. remote: &net.UDPAddr{
  152. IP: src.Address.IP(),
  153. Port: int(src.Port),
  154. },
  155. local: &net.UDPAddr{
  156. IP: w.address.IP(),
  157. Port: int(w.port),
  158. },
  159. }
  160. w.activeConn[src] = conn
  161. conn.updateActivity()
  162. return conn, false
  163. }
  164. func (w *udpWorker) callback(b *buf.Buffer, source v2net.Destination, originalDest v2net.Destination) {
  165. conn, existing := w.getConnection(source)
  166. select {
  167. case conn.input <- b:
  168. default:
  169. b.Release()
  170. }
  171. if !existing {
  172. go func() {
  173. ctx := w.ctx
  174. ctx, cancel := context.WithCancel(ctx)
  175. conn.cancel = cancel
  176. if originalDest.IsValid() {
  177. ctx = proxy.ContextWithOriginalDestination(ctx, originalDest)
  178. }
  179. if len(w.tag) > 0 {
  180. ctx = proxy.ContextWithInboundTag(ctx, w.tag)
  181. }
  182. ctx = proxy.ContextWithSource(ctx, source)
  183. ctx = proxy.ContextWithInboundDestination(ctx, v2net.UDPDestination(w.address, w.port))
  184. if err := w.proxy.Process(ctx, v2net.Network_UDP, conn, w.dispatcher); err != nil {
  185. log.Info("Proxyman|UDPWorker: Connection ends with ", err)
  186. }
  187. w.removeConn(source)
  188. cancel()
  189. }()
  190. }
  191. }
  192. func (w *udpWorker) removeConn(src v2net.Destination) {
  193. w.Lock()
  194. delete(w.activeConn, src)
  195. w.Unlock()
  196. }
  197. func (w *udpWorker) Start() error {
  198. w.activeConn = make(map[v2net.Destination]*udpConn)
  199. ctx, cancel := context.WithCancel(context.Background())
  200. w.ctx = ctx
  201. w.cancel = cancel
  202. h, err := udp.ListenUDP(w.address, w.port, udp.ListenOption{
  203. Callback: w.callback,
  204. ReceiveOriginalDest: w.recvOrigDest,
  205. })
  206. if err != nil {
  207. return err
  208. }
  209. go w.monitor()
  210. w.hub = h
  211. return nil
  212. }
  213. func (w *udpWorker) Close() {
  214. if w.hub != nil {
  215. w.hub.Close()
  216. w.cancel()
  217. }
  218. }
  219. func (w *udpWorker) monitor() {
  220. for {
  221. select {
  222. case <-w.ctx.Done():
  223. return
  224. case <-time.After(time.Second * 16):
  225. nowSec := time.Now().Unix()
  226. w.Lock()
  227. for addr, conn := range w.activeConn {
  228. if nowSec-conn.lastActivityTime > 8 {
  229. delete(w.activeConn, addr)
  230. conn.cancel()
  231. }
  232. }
  233. w.Unlock()
  234. }
  235. }
  236. }
  237. func (w *udpWorker) Port() v2net.Port {
  238. return w.port
  239. }
  240. func (w *udpWorker) Proxy() proxy.Inbound {
  241. return w.proxy
  242. }