dispatcher.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package udp
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "v2ray.com/core/app/dispatcher"
  7. "v2ray.com/core/app/log"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/signal"
  11. "v2ray.com/core/transport/ray"
  12. )
  13. type ResponseCallback func(payload *buf.Buffer)
  14. type connEntry struct {
  15. inbound ray.InboundRay
  16. timer signal.ActivityUpdater
  17. cancel context.CancelFunc
  18. }
  19. type Dispatcher struct {
  20. sync.RWMutex
  21. conns map[net.Destination]*connEntry
  22. dispatcher dispatcher.Interface
  23. }
  24. func NewDispatcher(dispatcher dispatcher.Interface) *Dispatcher {
  25. return &Dispatcher{
  26. conns: make(map[net.Destination]*connEntry),
  27. dispatcher: dispatcher,
  28. }
  29. }
  30. func (v *Dispatcher) RemoveRay(dest net.Destination) {
  31. v.Lock()
  32. defer v.Unlock()
  33. if conn, found := v.conns[dest]; found {
  34. conn.inbound.InboundInput().Close()
  35. conn.inbound.InboundOutput().Close()
  36. delete(v.conns, dest)
  37. }
  38. }
  39. func (v *Dispatcher) getInboundRay(dest net.Destination, callback ResponseCallback) *connEntry {
  40. v.Lock()
  41. defer v.Unlock()
  42. if entry, found := v.conns[dest]; found {
  43. return entry
  44. }
  45. log.Trace(newError("establishing new connection for ", dest))
  46. ctx, cancel := context.WithCancel(context.Background())
  47. removeRay := func() {
  48. cancel()
  49. v.RemoveRay(dest)
  50. }
  51. timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*4)
  52. inboundRay, _ := v.dispatcher.Dispatch(ctx, dest)
  53. entry := &connEntry{
  54. inbound: inboundRay,
  55. timer: timer,
  56. cancel: removeRay,
  57. }
  58. v.conns[dest] = entry
  59. go handleInput(ctx, entry, callback)
  60. return entry
  61. }
  62. func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer, callback ResponseCallback) {
  63. // TODO: Add user to destString
  64. log.Trace(newError("dispatch request to: ", destination).AtDebug())
  65. conn := v.getInboundRay(destination, callback)
  66. outputStream := conn.inbound.InboundInput()
  67. if outputStream != nil {
  68. if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil {
  69. log.Trace(newError("failed to write first UDP payload").Base(err))
  70. conn.cancel()
  71. return
  72. }
  73. }
  74. }
  75. func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) {
  76. input := conn.inbound.InboundOutput()
  77. timer := conn.timer
  78. for {
  79. select {
  80. case <-ctx.Done():
  81. return
  82. default:
  83. }
  84. mb, err := input.ReadMultiBuffer()
  85. if err != nil {
  86. log.Trace(newError("failed to handl UDP input").Base(err))
  87. conn.cancel()
  88. return
  89. }
  90. timer.Update()
  91. for _, b := range mb {
  92. callback(b)
  93. }
  94. }
  95. }