dispatcher.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package udp
  2. import (
  3. "context"
  4. "sync"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/app/log"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/errors"
  9. v2net "v2ray.com/core/common/net"
  10. "v2ray.com/core/transport/ray"
  11. )
  12. type ResponseCallback func(payload *buf.Buffer)
  13. type Dispatcher struct {
  14. sync.RWMutex
  15. conns map[v2net.Destination]ray.InboundRay
  16. dispatcher dispatcher.Interface
  17. }
  18. func NewDispatcher(dispatcher dispatcher.Interface) *Dispatcher {
  19. return &Dispatcher{
  20. conns: make(map[v2net.Destination]ray.InboundRay),
  21. dispatcher: dispatcher,
  22. }
  23. }
  24. func (v *Dispatcher) RemoveRay(dest v2net.Destination) {
  25. v.Lock()
  26. defer v.Unlock()
  27. if conn, found := v.conns[dest]; found {
  28. conn.InboundInput().Close()
  29. conn.InboundOutput().Close()
  30. delete(v.conns, dest)
  31. }
  32. }
  33. func (v *Dispatcher) getInboundRay(ctx context.Context, dest v2net.Destination) (ray.InboundRay, bool) {
  34. v.Lock()
  35. defer v.Unlock()
  36. if entry, found := v.conns[dest]; found {
  37. return entry, true
  38. }
  39. log.Trace(errors.New("UDP|Server: establishing new connection for ", dest))
  40. inboundRay, _ := v.dispatcher.Dispatch(ctx, dest)
  41. v.conns[dest] = inboundRay
  42. return inboundRay, false
  43. }
  44. func (v *Dispatcher) Dispatch(ctx context.Context, destination v2net.Destination, payload *buf.Buffer, callback ResponseCallback) {
  45. // TODO: Add user to destString
  46. log.Trace(errors.New("UDP|Server: Dispatch request: ", destination).AtDebug())
  47. inboundRay, existing := v.getInboundRay(ctx, destination)
  48. outputStream := inboundRay.InboundInput()
  49. if outputStream != nil {
  50. if err := outputStream.Write(payload); err != nil {
  51. v.RemoveRay(destination)
  52. }
  53. }
  54. if !existing {
  55. go func() {
  56. handleInput(inboundRay.InboundOutput(), callback)
  57. v.RemoveRay(destination)
  58. }()
  59. }
  60. }
  61. func handleInput(input ray.InputStream, callback ResponseCallback) {
  62. for {
  63. data, err := input.Read()
  64. if err != nil {
  65. break
  66. }
  67. callback(data)
  68. }
  69. }