udp_server.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package hub
  2. import (
  3. "sync"
  4. "github.com/v2ray/v2ray-core/app/dispatcher"
  5. v2net "github.com/v2ray/v2ray-core/common/net"
  6. "github.com/v2ray/v2ray-core/transport/ray"
  7. )
  8. type UDPResponseCallback func(packet v2net.Packet)
  9. type connEntry struct {
  10. inboundRay ray.InboundRay
  11. callback UDPResponseCallback
  12. }
  13. type UDPServer struct {
  14. sync.RWMutex
  15. conns map[string]*connEntry
  16. packetDispatcher dispatcher.PacketDispatcher
  17. }
  18. func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
  19. return &UDPServer{
  20. conns: make(map[string]*connEntry),
  21. packetDispatcher: packetDispatcher,
  22. }
  23. }
  24. func (this *UDPServer) locateExistingAndDispatch(dest string, packet v2net.Packet) bool {
  25. this.RLock()
  26. defer this.RUnlock()
  27. if entry, found := this.conns[dest]; found {
  28. entry.inboundRay.InboundInput().Write(packet.Chunk())
  29. return true
  30. }
  31. return false
  32. }
  33. func (this *UDPServer) Dispatch(source v2net.Destination, packet v2net.Packet, callback UDPResponseCallback) {
  34. destString := source.String() + "-" + packet.Destination().NetAddr()
  35. if this.locateExistingAndDispatch(destString, packet) {
  36. return
  37. }
  38. this.Lock()
  39. inboundRay := this.packetDispatcher.DispatchToOutbound(v2net.NewPacket(packet.Destination(), packet.Chunk(), true))
  40. this.conns[destString] = &connEntry{
  41. inboundRay: inboundRay,
  42. callback: callback,
  43. }
  44. this.Unlock()
  45. go this.handleConnection(destString, inboundRay, source, callback)
  46. }
  47. func (this *UDPServer) handleConnection(destString string, inboundRay ray.InboundRay, source v2net.Destination, callback UDPResponseCallback) {
  48. for {
  49. data, err := inboundRay.InboundOutput().Read()
  50. if err != nil {
  51. break
  52. }
  53. callback(v2net.NewPacket(source, data, false))
  54. }
  55. this.Lock()
  56. delete(this.conns, destString)
  57. this.Unlock()
  58. }