dispatcher.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package udp
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "v2ray.com/core"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/net"
  9. "v2ray.com/core/common/signal"
  10. "v2ray.com/core/transport/ray"
  11. )
  12. type ResponseCallback func(payload *buf.Buffer)
  13. type connEntry struct {
  14. inbound ray.InboundRay
  15. timer signal.ActivityUpdater
  16. cancel context.CancelFunc
  17. }
  18. type Dispatcher struct {
  19. sync.RWMutex
  20. conns map[net.Destination]*connEntry
  21. dispatcher core.Dispatcher
  22. }
  23. func NewDispatcher(dispatcher core.Dispatcher) *Dispatcher {
  24. return &Dispatcher{
  25. conns: make(map[net.Destination]*connEntry),
  26. dispatcher: dispatcher,
  27. }
  28. }
  29. func (v *Dispatcher) RemoveRay(dest net.Destination) {
  30. v.Lock()
  31. defer v.Unlock()
  32. if conn, found := v.conns[dest]; found {
  33. conn.inbound.InboundInput().Close()
  34. conn.inbound.InboundOutput().Close()
  35. delete(v.conns, dest)
  36. }
  37. }
  38. func (v *Dispatcher) getInboundRay(dest net.Destination, callback ResponseCallback) *connEntry {
  39. v.Lock()
  40. defer v.Unlock()
  41. if entry, found := v.conns[dest]; found {
  42. return entry
  43. }
  44. newError("establishing new connection for ", dest).WriteToLog()
  45. ctx, cancel := context.WithCancel(context.Background())
  46. removeRay := func() {
  47. cancel()
  48. v.RemoveRay(dest)
  49. }
  50. timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*4)
  51. inboundRay, _ := v.dispatcher.Dispatch(ctx, dest)
  52. entry := &connEntry{
  53. inbound: inboundRay,
  54. timer: timer,
  55. cancel: removeRay,
  56. }
  57. v.conns[dest] = entry
  58. go handleInput(ctx, entry, callback)
  59. return entry
  60. }
  61. func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer, callback ResponseCallback) {
  62. // TODO: Add user to destString
  63. newError("dispatch request to: ", destination).AtDebug().WithContext(ctx).WriteToLog()
  64. conn := v.getInboundRay(destination, callback)
  65. outputStream := conn.inbound.InboundInput()
  66. if outputStream != nil {
  67. if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil {
  68. newError("failed to write first UDP payload").Base(err).WithContext(ctx).WriteToLog()
  69. conn.cancel()
  70. return
  71. }
  72. }
  73. }
  74. func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) {
  75. input := conn.inbound.InboundOutput()
  76. timer := conn.timer
  77. for {
  78. select {
  79. case <-ctx.Done():
  80. return
  81. default:
  82. }
  83. mb, err := input.ReadMultiBuffer()
  84. if err != nil {
  85. newError("failed to handle UDP input").Base(err).WithContext(ctx).WriteToLog()
  86. conn.cancel()
  87. return
  88. }
  89. timer.Update()
  90. for _, b := range mb {
  91. callback(b)
  92. }
  93. }
  94. }