dispatcher.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package udp
  2. import (
  3. "context"
  4. "sync"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/app/log"
  7. "v2ray.com/core/common/buf"
  8. v2net "v2ray.com/core/common/net"
  9. "v2ray.com/core/transport/ray"
  10. )
  11. type ResponseCallback func(payload *buf.Buffer)
  12. type Dispatcher struct {
  13. sync.RWMutex
  14. conns map[string]ray.InboundRay
  15. dispatcher dispatcher.Interface
  16. }
  17. func NewDispatcher(dispatcher dispatcher.Interface) *Dispatcher {
  18. return &Dispatcher{
  19. conns: make(map[string]ray.InboundRay),
  20. dispatcher: dispatcher,
  21. }
  22. }
  23. func (v *Dispatcher) RemoveRay(name string) {
  24. v.Lock()
  25. defer v.Unlock()
  26. if conn, found := v.conns[name]; found {
  27. conn.InboundInput().Close()
  28. conn.InboundOutput().Close()
  29. delete(v.conns, name)
  30. }
  31. }
  32. func (v *Dispatcher) getInboundRay(ctx context.Context, dest v2net.Destination) (ray.InboundRay, bool) {
  33. destString := dest.String()
  34. v.Lock()
  35. defer v.Unlock()
  36. if entry, found := v.conns[destString]; found {
  37. return entry, true
  38. }
  39. log.Info("UDP|Server: establishing new connection for ", dest)
  40. inboundRay, _ := v.dispatcher.Dispatch(ctx, dest)
  41. return inboundRay, false
  42. }
  43. func (v *Dispatcher) Dispatch(ctx context.Context, destination v2net.Destination, payload *buf.Buffer, callback ResponseCallback) {
  44. // TODO: Add user to destString
  45. destString := destination.String()
  46. log.Debug("UDP|Server: Dispatch request: ", destString)
  47. inboundRay, existing := v.getInboundRay(ctx, destination)
  48. outputStream := inboundRay.InboundInput()
  49. if outputStream != nil {
  50. if err := outputStream.Write(payload); err != nil {
  51. v.RemoveRay(destString)
  52. }
  53. }
  54. if !existing {
  55. go func() {
  56. handleInput(inboundRay.InboundOutput(), callback)
  57. v.RemoveRay(destString)
  58. }()
  59. }
  60. }
  61. func handleInput(input ray.InputStream, callback ResponseCallback) {
  62. for {
  63. data, err := input.Read()
  64. if err != nil {
  65. break
  66. }
  67. callback(data)
  68. }
  69. }