udp_server.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package hub
  2. import (
  3. "sync"
  4. "github.com/v2ray/v2ray-core/app/dispatcher"
  5. "github.com/v2ray/v2ray-core/common/alloc"
  6. v2net "github.com/v2ray/v2ray-core/common/net"
  7. "github.com/v2ray/v2ray-core/transport/ray"
  8. )
  9. type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer)
  10. type connEntry struct {
  11. inboundRay ray.InboundRay
  12. callback UDPResponseCallback
  13. }
  14. type UDPServer struct {
  15. sync.RWMutex
  16. conns map[string]*connEntry
  17. packetDispatcher dispatcher.PacketDispatcher
  18. }
  19. func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
  20. return &UDPServer{
  21. conns: make(map[string]*connEntry),
  22. packetDispatcher: packetDispatcher,
  23. }
  24. }
  25. func (this *UDPServer) locateExistingAndDispatch(dest string, payload *alloc.Buffer) bool {
  26. this.RLock()
  27. defer this.RUnlock()
  28. if entry, found := this.conns[dest]; found {
  29. entry.inboundRay.InboundInput().Write(payload)
  30. return true
  31. }
  32. return false
  33. }
  34. func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) {
  35. destString := source.String() + "-" + destination.NetAddr()
  36. if this.locateExistingAndDispatch(destString, payload) {
  37. return
  38. }
  39. this.Lock()
  40. inboundRay := this.packetDispatcher.DispatchToOutbound(destination)
  41. inboundRay.InboundInput().Write(payload)
  42. this.conns[destString] = &connEntry{
  43. inboundRay: inboundRay,
  44. callback: callback,
  45. }
  46. this.Unlock()
  47. go this.handleConnection(destString, inboundRay, source, callback)
  48. }
  49. func (this *UDPServer) handleConnection(destString string, inboundRay ray.InboundRay, source v2net.Destination, callback UDPResponseCallback) {
  50. for {
  51. data, err := inboundRay.InboundOutput().Read()
  52. if err != nil {
  53. break
  54. }
  55. callback(source, data)
  56. }
  57. this.Lock()
  58. inboundRay.InboundInput().Release()
  59. inboundRay.InboundOutput().Release()
  60. delete(this.conns, destString)
  61. this.Unlock()
  62. }