dispatcher_split.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. package udp
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/v2fly/v2ray-core/v5/common"
  8. "github.com/v2fly/v2ray-core/v5/common/buf"
  9. "github.com/v2fly/v2ray-core/v5/common/net"
  10. "github.com/v2fly/v2ray-core/v5/common/protocol/udp"
  11. "github.com/v2fly/v2ray-core/v5/common/session"
  12. "github.com/v2fly/v2ray-core/v5/common/signal"
  13. "github.com/v2fly/v2ray-core/v5/common/signal/done"
  14. "github.com/v2fly/v2ray-core/v5/features/routing"
  15. "github.com/v2fly/v2ray-core/v5/transport"
  16. )
  17. type ResponseCallback func(ctx context.Context, packet *udp.Packet)
  18. type connEntry struct {
  19. link *transport.Link
  20. timer signal.ActivityUpdater
  21. cancel context.CancelFunc
  22. }
  23. type Dispatcher struct {
  24. sync.RWMutex
  25. conns map[net.Destination]*connEntry
  26. dispatcher routing.Dispatcher
  27. callback ResponseCallback
  28. }
  29. func (v *Dispatcher) Close() error {
  30. return nil
  31. }
  32. func NewSplitDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) DispatcherI {
  33. return &Dispatcher{
  34. conns: make(map[net.Destination]*connEntry),
  35. dispatcher: dispatcher,
  36. callback: callback,
  37. }
  38. }
  39. func (v *Dispatcher) RemoveRay(dest net.Destination) {
  40. v.Lock()
  41. defer v.Unlock()
  42. if conn, found := v.conns[dest]; found {
  43. common.Close(conn.link.Reader)
  44. common.Close(conn.link.Writer)
  45. delete(v.conns, dest)
  46. }
  47. }
  48. func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *connEntry {
  49. v.Lock()
  50. defer v.Unlock()
  51. if entry, found := v.conns[dest]; found {
  52. return entry
  53. }
  54. newError("establishing new connection for ", dest).WriteToLog()
  55. ctx, cancel := context.WithCancel(ctx)
  56. removeRay := func() {
  57. cancel()
  58. v.RemoveRay(dest)
  59. }
  60. timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*300)
  61. link, _ := v.dispatcher.Dispatch(ctx, dest)
  62. entry := &connEntry{
  63. link: link,
  64. timer: timer,
  65. cancel: removeRay,
  66. }
  67. v.conns[dest] = entry
  68. go handleInput(ctx, entry, dest, v.callback)
  69. return entry
  70. }
  71. func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer) {
  72. // TODO: Add user to destString
  73. newError("dispatch request to: ", destination).AtDebug().WriteToLog(session.ExportIDToError(ctx))
  74. conn := v.getInboundRay(ctx, destination)
  75. outputStream := conn.link.Writer
  76. if outputStream != nil {
  77. if err := outputStream.WriteMultiBuffer(buf.MultiBuffer{payload}); err != nil {
  78. newError("failed to write first UDP payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
  79. conn.cancel()
  80. return
  81. }
  82. }
  83. }
  84. func handleInput(ctx context.Context, conn *connEntry, dest net.Destination, callback ResponseCallback) {
  85. defer conn.cancel()
  86. input := conn.link.Reader
  87. timer := conn.timer
  88. for {
  89. select {
  90. case <-ctx.Done():
  91. return
  92. default:
  93. }
  94. mb, err := input.ReadMultiBuffer()
  95. if err != nil {
  96. newError("failed to handle UDP input").Base(err).WriteToLog(session.ExportIDToError(ctx))
  97. return
  98. }
  99. timer.Update()
  100. for _, b := range mb {
  101. callback(ctx, &udp.Packet{
  102. Payload: b,
  103. Source: dest,
  104. })
  105. }
  106. }
  107. }
  108. type dispatcherConn struct {
  109. dispatcher *Dispatcher
  110. cache chan *udp.Packet
  111. done *done.Instance
  112. }
  113. func DialDispatcher(ctx context.Context, dispatcher routing.Dispatcher) (net.PacketConn, error) {
  114. c := &dispatcherConn{
  115. cache: make(chan *udp.Packet, 16),
  116. done: done.New(),
  117. }
  118. d := NewSplitDispatcher(dispatcher, c.callback)
  119. c.dispatcher = d.(*Dispatcher)
  120. return c, nil
  121. }
  122. func (c *dispatcherConn) callback(ctx context.Context, packet *udp.Packet) {
  123. select {
  124. case <-c.done.Wait():
  125. packet.Payload.Release()
  126. return
  127. case c.cache <- packet:
  128. default:
  129. packet.Payload.Release()
  130. return
  131. }
  132. }
  133. func (c *dispatcherConn) ReadFrom(p []byte) (int, net.Addr, error) {
  134. select {
  135. case <-c.done.Wait():
  136. return 0, nil, io.EOF
  137. case packet := <-c.cache:
  138. n := copy(p, packet.Payload.Bytes())
  139. return n, &net.UDPAddr{
  140. IP: packet.Source.Address.IP(),
  141. Port: int(packet.Source.Port),
  142. }, nil
  143. }
  144. }
  145. func (c *dispatcherConn) WriteTo(p []byte, addr net.Addr) (int, error) {
  146. buffer := buf.New()
  147. raw := buffer.Extend(buf.Size)
  148. n := copy(raw, p)
  149. buffer.Resize(0, int32(n))
  150. ctx := context.Background()
  151. c.dispatcher.Dispatch(ctx, net.DestinationFromAddr(addr), buffer)
  152. return n, nil
  153. }
  154. func (c *dispatcherConn) Close() error {
  155. return c.done.Close()
  156. }
  157. func (c *dispatcherConn) LocalAddr() net.Addr {
  158. return &net.UDPAddr{
  159. IP: []byte{0, 0, 0, 0},
  160. Port: 0,
  161. }
  162. }
  163. func (c *dispatcherConn) SetDeadline(t time.Time) error {
  164. return nil
  165. }
  166. func (c *dispatcherConn) SetReadDeadline(t time.Time) error {
  167. return nil
  168. }
  169. func (c *dispatcherConn) SetWriteDeadline(t time.Time) error {
  170. return nil
  171. }