udp_server.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package hub
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/v2ray/v2ray-core/app/dispatcher"
  6. "github.com/v2ray/v2ray-core/common/alloc"
  7. "github.com/v2ray/v2ray-core/common/log"
  8. v2net "github.com/v2ray/v2ray-core/common/net"
  9. "github.com/v2ray/v2ray-core/transport/ray"
  10. )
  11. type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer)
  12. type TimedInboundRay struct {
  13. name string
  14. inboundRay ray.InboundRay
  15. accessed chan bool
  16. server *UDPServer
  17. sync.RWMutex
  18. }
  19. func NewTimedInboundRay(name string, inboundRay ray.InboundRay) *TimedInboundRay {
  20. r := &TimedInboundRay{
  21. name: name,
  22. inboundRay: inboundRay,
  23. accessed: make(chan bool),
  24. }
  25. go r.Monitor()
  26. return r
  27. }
  28. func (this *TimedInboundRay) Monitor() {
  29. for {
  30. time.Sleep(time.Second * 16)
  31. select {
  32. case <-this.accessed:
  33. default:
  34. // Ray not accessed for a while, assuming communication is dead.
  35. this.Release()
  36. return
  37. }
  38. }
  39. }
  40. func (this *TimedInboundRay) InboundInput() ray.OutputStream {
  41. this.RLock()
  42. defer this.RUnlock()
  43. if this.inboundRay == nil {
  44. return nil
  45. }
  46. select {
  47. case this.accessed <- true:
  48. default:
  49. }
  50. return this.inboundRay.InboundInput()
  51. }
  52. func (this *TimedInboundRay) InboundOutput() ray.InputStream {
  53. this.RLock()
  54. defer this.RUnlock()
  55. if this.inboundRay == nil {
  56. return nil
  57. }
  58. select {
  59. case this.accessed <- true:
  60. default:
  61. }
  62. return this.inboundRay.InboundOutput()
  63. }
  64. func (this *TimedInboundRay) Release() {
  65. log.Debug("UDP Server: Releasing TimedInboundRay: ", this.name)
  66. this.Lock()
  67. defer this.Unlock()
  68. if this.server == nil {
  69. return
  70. }
  71. this.server.RemoveRay(this.name)
  72. this.server = nil
  73. this.inboundRay.InboundInput().Close()
  74. this.inboundRay.InboundOutput().Release()
  75. this.inboundRay = nil
  76. }
  77. type UDPServer struct {
  78. sync.RWMutex
  79. conns map[string]*TimedInboundRay
  80. packetDispatcher dispatcher.PacketDispatcher
  81. }
  82. func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
  83. return &UDPServer{
  84. conns: make(map[string]*TimedInboundRay),
  85. packetDispatcher: packetDispatcher,
  86. }
  87. }
  88. func (this *UDPServer) RemoveRay(name string) {
  89. this.Lock()
  90. defer this.Unlock()
  91. delete(this.conns, name)
  92. }
  93. func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buffer) bool {
  94. log.Debug("UDP Server: Locating existing connection for ", name)
  95. this.RLock()
  96. defer this.RUnlock()
  97. if entry, found := this.conns[name]; found {
  98. outputStream := entry.InboundInput()
  99. if outputStream == nil {
  100. return false
  101. }
  102. err := outputStream.Write(payload)
  103. if err != nil {
  104. go this.RemoveRay(name)
  105. return false
  106. }
  107. return true
  108. }
  109. return false
  110. }
  111. func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) {
  112. destString := source.Address().String() + "-" + destination.NetAddr()
  113. log.Debug("UDP Server: Dispatch request: ", destString)
  114. if this.locateExistingAndDispatch(destString, payload) {
  115. return
  116. }
  117. log.Info("UDP Server: establishing new connection for ", destString)
  118. inboundRay := this.packetDispatcher.DispatchToOutbound(destination)
  119. timedInboundRay := NewTimedInboundRay(destString, inboundRay)
  120. outputStream := timedInboundRay.InboundInput()
  121. if outputStream != nil {
  122. outputStream.Write(payload)
  123. }
  124. this.Lock()
  125. this.conns[destString] = timedInboundRay
  126. this.Unlock()
  127. go this.handleConnection(timedInboundRay, source, callback)
  128. }
  129. func (this *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) {
  130. for {
  131. inputStream := inboundRay.InboundOutput()
  132. if inputStream == nil {
  133. break
  134. }
  135. data, err := inboundRay.InboundOutput().Read()
  136. if err != nil {
  137. break
  138. }
  139. callback(source, data)
  140. }
  141. inboundRay.Release()
  142. }