dispatcher_split.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package udp
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/v2fly/v2ray-core/v5/common"
  8. "github.com/v2fly/v2ray-core/v5/common/buf"
  9. "github.com/v2fly/v2ray-core/v5/common/net"
  10. "github.com/v2fly/v2ray-core/v5/common/protocol/udp"
  11. "github.com/v2fly/v2ray-core/v5/common/session"
  12. "github.com/v2fly/v2ray-core/v5/common/signal"
  13. "github.com/v2fly/v2ray-core/v5/common/signal/done"
  14. "github.com/v2fly/v2ray-core/v5/features/routing"
  15. "github.com/v2fly/v2ray-core/v5/transport"
  16. )
  17. type ResponseCallback func(ctx context.Context, packet *udp.Packet)
  18. type connEntry struct {
  19. link *transport.Link
  20. timer signal.ActivityUpdater
  21. cancel context.CancelFunc
  22. }
  23. type Dispatcher struct {
  24. sync.RWMutex
  25. conns map[net.Destination]*connEntry
  26. dispatcher routing.Dispatcher
  27. callback ResponseCallback
  28. }
  29. func NewSplitDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) *Dispatcher {
  30. return &Dispatcher{
  31. conns: make(map[net.Destination]*connEntry),
  32. dispatcher: dispatcher,
  33. callback: callback,
  34. }
  35. }
  36. func (v *Dispatcher) RemoveRay(dest net.Destination) {
  37. v.Lock()
  38. defer v.Unlock()
  39. if conn, found := v.conns[dest]; found {
  40. common.Close(conn.link.Reader)
  41. common.Close(conn.link.Writer)
  42. delete(v.conns, dest)
  43. }
  44. }
  45. func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *connEntry {
  46. v.Lock()
  47. defer v.Unlock()
  48. if entry, found := v.conns[dest]; found {
  49. return entry
  50. }
  51. newError("establishing new connection for ", dest).WriteToLog()
  52. ctx, cancel := context.WithCancel(ctx)
  53. removeRay := func() {
  54. cancel()
  55. v.RemoveRay(dest)
  56. }
  57. timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*4)
  58. link, _ := v.dispatcher.Dispatch(ctx, dest)
  59. entry := &connEntry{
  60. link: link,
  61. timer: timer,
  62. cancel: removeRay,
  63. }
  64. v.conns[dest] = entry
  65. go handleInput(ctx, entry, dest, v.callback)
  66. return entry
  67. }
  68. func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer) {
  69. // TODO: Add user to destString
  70. newError("dispatch request to: ", destination).AtDebug().WriteToLog(session.ExportIDToError(ctx))
  71. conn := v.getInboundRay(ctx, destination)
  72. outputStream := conn.link.Writer
  73. if outputStream != nil {
  74. if err := outputStream.WriteMultiBuffer(buf.MultiBuffer{payload}); err != nil {
  75. newError("failed to write first UDP payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
  76. conn.cancel()
  77. return
  78. }
  79. }
  80. }
  81. func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback) {
  82. defer conn.cancel()
  83. input := conn.link.Reader
  84. timer := conn.timer
  85. for {
  86. select {
  87. case <-ctx.Done():
  88. return
  89. default:
  90. }
  91. mb, err := input.ReadMultiBuffer()
  92. if err != nil {
  93. newError("failed to handle UDP input").Base(err).WriteToLog(session.ExportIDToError(ctx))
  94. return
  95. }
  96. timer.Update()
  97. for _, b := range mb {
  98. callback(ctx, &udp.Packet{
  99. Payload: b,
  100. Source: dest,
  101. })
  102. }
  103. }
  104. }
  105. type dispatcherConn struct {
  106. dispatcher *Dispatcher
  107. cache chan *udp.Packet
  108. done *done.Instance
  109. }
  110. func DialDispatcher(ctx context.Context, dispatcher routing.Dispatcher) (net.PacketConn, error) {
  111. c := &dispatcherConn{
  112. cache: make(chan *udp.Packet, 16),
  113. done: done.New(),
  114. }
  115. d := NewSplitDispatcher(dispatcher, c.callback)
  116. c.dispatcher = d
  117. return c, nil
  118. }
  119. func (c *dispatcherConn) callback(ctx context.Context, packet *udp.Packet) {
  120. select {
  121. case <-c.done.Wait():
  122. packet.Payload.Release()
  123. return
  124. case c.cache <- packet:
  125. default:
  126. packet.Payload.Release()
  127. return
  128. }
  129. }
  130. func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) {
  131. select {
  132. case <-c.done.Wait():
  133. return 0, nil, io.EOF
  134. case packet := <-c.cache:
  135. n := copy(p, packet.Payload.Bytes())
  136. return n, &net.UDPAddr{
  137. IP: packet.Source.Address.IP(),
  138. Port: int(packet.Source.Port),
  139. }, nil
  140. }
  141. }
  142. func (c *dispatcherConn) WriteTo(p []byte, addr net.Addr) (int, error) {
  143. buffer := buf.New()
  144. raw := buffer.Extend(buf.Size)
  145. n := copy(raw, p)
  146. buffer.Resize(0, int32(n))
  147. ctx := context.Background()
  148. c.dispatcher.Dispatch(ctx, net.DestinationFromAddr(addr), buffer)
  149. return n, nil
  150. }
  151. func (c *dispatcherConn) Close() error {
  152. return c.done.Close()
  153. }
  154. func (c *dispatcherConn) LocalAddr() net.Addr {
  155. return &net.UDPAddr{
  156. IP: []byte{0, 0, 0, 0},
  157. Port: 0,
  158. }
  159. }
  160. func (c *dispatcherConn) SetDeadline(t time.Time) error {
  161. return nil
  162. }
  163. func (c *dispatcherConn) SetReadDeadline(t time.Time) error {
  164. return nil
  165. }
  166. func (c *dispatcherConn) SetWriteDeadline(t time.Time) error {
  167. return nil
  168. }