dispatcher.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package udp
  2. import (
  3. "context"
  4. "sync"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/app/log"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/net"
  9. "v2ray.com/core/transport/ray"
  10. )
  11. type ResponseCallback func(payload *buf.Buffer)
  12. type Dispatcher struct {
  13. sync.RWMutex
  14. conns map[net.Destination]ray.InboundRay
  15. dispatcher dispatcher.Interface
  16. }
  17. func NewDispatcher(dispatcher dispatcher.Interface) *Dispatcher {
  18. return &Dispatcher{
  19. conns: make(map[net.Destination]ray.InboundRay),
  20. dispatcher: dispatcher,
  21. }
  22. }
  23. func (v *Dispatcher) RemoveRay(dest net.Destination) {
  24. v.Lock()
  25. defer v.Unlock()
  26. if conn, found := v.conns[dest]; found {
  27. conn.InboundInput().Close()
  28. conn.InboundOutput().Close()
  29. delete(v.conns, dest)
  30. }
  31. }
  32. func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) (ray.InboundRay, bool) {
  33. v.Lock()
  34. defer v.Unlock()
  35. if entry, found := v.conns[dest]; found {
  36. return entry, true
  37. }
  38. log.Trace(newError("establishing new connection for ", dest))
  39. inboundRay, _ := v.dispatcher.Dispatch(ctx, dest)
  40. v.conns[dest] = inboundRay
  41. return inboundRay, false
  42. }
  43. func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer, callback ResponseCallback) {
  44. // TODO: Add user to destString
  45. log.Trace(newError("dispatch request to: ", destination).AtDebug())
  46. inboundRay, existing := v.getInboundRay(ctx, destination)
  47. outputStream := inboundRay.InboundInput()
  48. if outputStream != nil {
  49. if err := outputStream.Write(buf.NewMultiBufferValue(payload)); err != nil {
  50. v.RemoveRay(destination)
  51. }
  52. }
  53. if !existing {
  54. go func() {
  55. handleInput(inboundRay.InboundOutput(), callback)
  56. v.RemoveRay(destination)
  57. }()
  58. }
  59. }
  60. func handleInput(input ray.InputStream, callback ResponseCallback) {
  61. for {
  62. mb, err := input.Read()
  63. if err != nil {
  64. break
  65. }
  66. for _, b := range mb {
  67. callback(b)
  68. }
  69. }
  70. }