dispatcher.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package udp
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "v2ray.com/core"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/session"
  11. "v2ray.com/core/common/signal"
  12. )
  13. type ResponseCallback func(ctx context.Context, payload *buf.Buffer)
  14. type connEntry struct {
  15. link *core.Link
  16. timer signal.ActivityUpdater
  17. cancel context.CancelFunc
  18. }
  19. type Dispatcher struct {
  20. sync.RWMutex
  21. conns map[net.Destination]*connEntry
  22. dispatcher core.Dispatcher
  23. callback ResponseCallback
  24. }
  25. func NewDispatcher(dispatcher core.Dispatcher, callback ResponseCallback) *Dispatcher {
  26. return &Dispatcher{
  27. conns: make(map[net.Destination]*connEntry),
  28. dispatcher: dispatcher,
  29. callback: callback,
  30. }
  31. }
  32. func (v *Dispatcher) RemoveRay(dest net.Destination) {
  33. v.Lock()
  34. defer v.Unlock()
  35. if conn, found := v.conns[dest]; found {
  36. common.Close(conn.link.Reader)
  37. common.Close(conn.link.Writer)
  38. delete(v.conns, dest)
  39. }
  40. }
  41. func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *connEntry {
  42. v.Lock()
  43. defer v.Unlock()
  44. if entry, found := v.conns[dest]; found {
  45. return entry
  46. }
  47. newError("establishing new connection for ", dest).WriteToLog()
  48. ctx, cancel := context.WithCancel(ctx)
  49. removeRay := func() {
  50. cancel()
  51. v.RemoveRay(dest)
  52. }
  53. timer := signal.CancelAfterInactivity(ctx, removeRay, time.Second*4)
  54. link, _ := v.dispatcher.Dispatch(ctx, dest)
  55. entry := &connEntry{
  56. link: link,
  57. timer: timer,
  58. cancel: removeRay,
  59. }
  60. v.conns[dest] = entry
  61. go handleInput(ctx, entry, v.callback)
  62. return entry
  63. }
  64. func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer) {
  65. // TODO: Add user to destString
  66. newError("dispatch request to: ", destination).AtDebug().WriteToLog(session.ExportIDToError(ctx))
  67. conn := v.getInboundRay(ctx, destination)
  68. outputStream := conn.link.Writer
  69. if outputStream != nil {
  70. if err := outputStream.WriteMultiBuffer(buf.NewMultiBufferValue(payload)); err != nil {
  71. newError("failed to write first UDP payload").Base(err).WriteToLog(session.ExportIDToError(ctx))
  72. conn.cancel()
  73. return
  74. }
  75. }
  76. }
  77. func handleInput(ctx context.Context, conn *connEntry, callback ResponseCallback) {
  78. defer conn.cancel()
  79. input := conn.link.Reader
  80. timer := conn.timer
  81. for {
  82. select {
  83. case <-ctx.Done():
  84. return
  85. default:
  86. }
  87. mb, err := input.ReadMultiBuffer()
  88. if err != nil {
  89. newError("failed to handle UDP input").Base(err).WriteToLog(session.ExportIDToError(ctx))
  90. return
  91. }
  92. timer.Update()
  93. for _, b := range mb {
  94. callback(ctx, b)
  95. }
  96. }
  97. }