udp_server.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package hub
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/v2ray/v2ray-core/app/dispatcher"
  6. "github.com/v2ray/v2ray-core/common/alloc"
  7. v2net "github.com/v2ray/v2ray-core/common/net"
  8. "github.com/v2ray/v2ray-core/transport/ray"
  9. )
  10. type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer)
  11. type TimedInboundRay struct {
  12. name string
  13. inboundRay ray.InboundRay
  14. accessed chan bool
  15. server *UDPServer
  16. sync.RWMutex
  17. }
  18. func NewTimedInboundRay(name string, inboundRay ray.InboundRay) *TimedInboundRay {
  19. r := &TimedInboundRay{
  20. name: name,
  21. inboundRay: inboundRay,
  22. accessed: make(chan bool),
  23. }
  24. go r.Monitor()
  25. return r
  26. }
  27. func (this *TimedInboundRay) Monitor() {
  28. for {
  29. time.Sleep(16 * time.Second)
  30. select {
  31. case <-this.accessed:
  32. default:
  33. // Ray not accessed for a while, assuming communication is dead.
  34. this.Release()
  35. return
  36. }
  37. }
  38. }
  39. func (this *TimedInboundRay) InboundInput() ray.OutputStream {
  40. this.RLock()
  41. defer this.RUnlock()
  42. if this.inboundRay == nil {
  43. return nil
  44. }
  45. select {
  46. case this.accessed <- true:
  47. default:
  48. }
  49. return this.inboundRay.InboundInput()
  50. }
  51. func (this *TimedInboundRay) InboundOutput() ray.InputStream {
  52. this.RLock()
  53. this.RUnlock()
  54. if this.inboundRay == nil {
  55. return nil
  56. }
  57. select {
  58. case this.accessed <- true:
  59. default:
  60. }
  61. return this.inboundRay.InboundOutput()
  62. }
  63. func (this *TimedInboundRay) Release() {
  64. this.Lock()
  65. defer this.Unlock()
  66. if this.server == nil {
  67. return
  68. }
  69. this.server.RemoveRay(this.name)
  70. this.server = nil
  71. this.inboundRay.InboundInput().Close()
  72. this.inboundRay.InboundOutput().Release()
  73. this.inboundRay = nil
  74. }
  75. type UDPServer struct {
  76. sync.RWMutex
  77. conns map[string]*TimedInboundRay
  78. packetDispatcher dispatcher.PacketDispatcher
  79. }
  80. func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
  81. return &UDPServer{
  82. conns: make(map[string]*TimedInboundRay),
  83. packetDispatcher: packetDispatcher,
  84. }
  85. }
  86. func (this *UDPServer) RemoveRay(name string) {
  87. this.Lock()
  88. defer this.Unlock()
  89. delete(this.conns, name)
  90. }
  91. func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buffer) bool {
  92. this.RLock()
  93. defer this.RUnlock()
  94. if entry, found := this.conns[name]; found {
  95. err := entry.InboundInput().Write(payload)
  96. if err != nil {
  97. this.RemoveRay(name)
  98. return false
  99. }
  100. return true
  101. }
  102. return false
  103. }
  104. func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) {
  105. destString := source.Address().String() + "-" + destination.Address().String()
  106. if this.locateExistingAndDispatch(destString, payload) {
  107. return
  108. }
  109. this.Lock()
  110. inboundRay := this.packetDispatcher.DispatchToOutbound(destination)
  111. inboundRay.InboundInput().Write(payload)
  112. timedInboundRay := NewTimedInboundRay(destString, inboundRay)
  113. this.conns[destString] = timedInboundRay
  114. this.Unlock()
  115. go this.handleConnection(timedInboundRay, source, callback)
  116. }
  117. func (this *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) {
  118. for {
  119. data, err := inboundRay.InboundOutput().Read()
  120. if err != nil {
  121. break
  122. }
  123. callback(source, data)
  124. }
  125. inboundRay.Release()
  126. }