dispatcher.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package udp
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "v2ray.com/core/common"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/net"
  9. "v2ray.com/core/common/session"
  10. "v2ray.com/core/common/signal"
  11. "v2ray.com/core/common/vio"
  12. "v2ray.com/core/features/routing"
  13. )
  14. type ResponseCallback func(ctx context.Context, payload *buf.Buffer)
  15. type connEntry struct {
  16. link *vio.Link
  17. timer signal.ActivityUpdater
  18. cancel context.CancelFunc
  19. }
  20. type Dispatcher struct {
  21. sync.RWMutex
  22. conns map[net.Destination]*connEntry
  23. dispatcher routing.Dispatcher
  24. callback ResponseCallback
  25. }
  26. func NewDispatcher(dispatcher routing.Dispatcher, callback ResponseCallback) *Dispatcher {
  27. return &Dispatcher{
  28. conns: make(map[net.Destination]*connEntry),
  29. dispatcher: dispatcher,
  30. callback: callback,
  31. }
  32. }
  33. func (v *Dispatcher) RemoveRay(dest net.Destination) {
  34. v.Lock()
  35. defer v.Unlock()
  36. if conn, found := v.conns[dest]; found {
  37. common.Close(conn.link.Reader)
  38. common.Close(conn.link.Writer)
  39. delete(v.conns, dest)
  40. }
  41. }
  42. func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *connEntry {
  43. v.Lock()
  44. defer v.Unlock()
  45. if entry, found := v.conns[dest]; found {
  46. return entry
  47. }
  48. newError("establishing new connection for ", dest).WriteToLog()
  49. ctx, cancel := context.WithCancel(ctx)
  50. removeRay := func() {
  51. cancel()
  52. v.RemoveRay(dest)
  53. }
  54. timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*4)
  55. link, _ := v.dispatcher.Dispatch(ctx, dest)
  56. entry := &connEntry{
  57. link: link,
  58. timer: timer,
  59. cancel: removeRay,
  60. }
  61. v.conns[dest] = entry
  62. go handleInput(ctx, entry, v.callback)
  63. return entry
  64. }
  65. func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer) {
  66. // TODO: Add user to destString
  67. newError("dispatch request to: ", destination).AtDebug().WriteToLog(session.ExportIDToError(ctx))
  68. conn := v.getInboundRay(ctx, destination)
  69. outputStream := conn.link.Writer
  70. if outputStream != nil {
  71. if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil {
  72. newError("failed to write first UDP payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
  73. conn.cancel()
  74. return
  75. }
  76. }
  77. }
  78. func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) {
  79. defer conn.cancel()
  80. input := conn.link.Reader
  81. timer := conn.timer
  82. for {
  83. select {
  84. case <-ctx.Done():
  85. return
  86. default:
  87. }
  88. mb, err := input.ReadMultiBuffer()
  89. if err != nil {
  90. newError("failed to handle UDP input").Base(err).WriteToLog(session.ExportIDToError(ctx))
  91. return
  92. }
  93. timer.Update()
  94. for _, b := range mb {
  95. callback(ctx, b)
  96. }
  97. }
  98. }