dispatcher.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package udp
  2. import (
  3. "context"
  4. "sync"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/app/log"
  8. v2net "v2ray.com/core/common/net"
  9. "v2ray.com/core/proxy"
  10. "v2ray.com/core/transport/ray"
  11. )
  12. type ResponseCallback func(payload *buf.Buffer)
  13. type Dispatcher struct {
  14. sync.RWMutex
  15. conns map[string]ray.InboundRay
  16. packetDispatcher dispatcher.Interface
  17. }
  18. func NewDispatcher(packetDispatcher dispatcher.Interface) *Dispatcher {
  19. return &Dispatcher{
  20. conns: make(map[string]ray.InboundRay),
  21. packetDispatcher: packetDispatcher,
  22. }
  23. }
  24. func (v *Dispatcher) RemoveRay(name string) {
  25. v.Lock()
  26. defer v.Unlock()
  27. if conn, found := v.conns[name]; found {
  28. conn.InboundInput().Close()
  29. conn.InboundOutput().Close()
  30. delete(v.conns, name)
  31. }
  32. }
  33. func (v *Dispatcher) getInboundRay(ctx context.Context, dest v2net.Destination) (ray.InboundRay, bool) {
  34. destString := dest.String()
  35. v.Lock()
  36. defer v.Unlock()
  37. if entry, found := v.conns[destString]; found {
  38. return entry, true
  39. }
  40. log.Info("UDP|Server: establishing new connection for ", dest)
  41. ctx = proxy.ContextWithDestination(ctx, dest)
  42. return v.packetDispatcher.DispatchToOutbound(ctx), false
  43. }
  44. func (v *Dispatcher) Dispatch(ctx context.Context, destination v2net.Destination, payload *buf.Buffer, callback ResponseCallback) {
  45. // TODO: Add user to destString
  46. destString := destination.String()
  47. log.Debug("UDP|Server: Dispatch request: ", destString)
  48. inboundRay, existing := v.getInboundRay(ctx, destination)
  49. outputStream := inboundRay.InboundInput()
  50. if outputStream != nil {
  51. if err := outputStream.Write(payload); err != nil {
  52. v.RemoveRay(destString)
  53. }
  54. }
  55. if !existing {
  56. go func() {
  57. handleInput(inboundRay.InboundOutput(), callback)
  58. v.RemoveRay(destString)
  59. }()
  60. }
  61. }
  62. func handleInput(input ray.InputStream, callback ResponseCallback) {
  63. for {
  64. data, err := input.Read()
  65. if err != nil {
  66. break
  67. }
  68. callback(data)
  69. }
  70. }