udp_server.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package udp
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/v2ray/v2ray-core/app/dispatcher"
  6. "github.com/v2ray/v2ray-core/common/alloc"
  7. "github.com/v2ray/v2ray-core/common/log"
  8. v2net "github.com/v2ray/v2ray-core/common/net"
  9. "github.com/v2ray/v2ray-core/transport/ray"
  10. )
  11. type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer)
  12. type TimedInboundRay struct {
  13. name string
  14. inboundRay ray.InboundRay
  15. accessed chan bool
  16. server *UDPServer
  17. sync.RWMutex
  18. }
  19. func NewTimedInboundRay(name string, inboundRay ray.InboundRay, server *UDPServer) *TimedInboundRay {
  20. r := &TimedInboundRay{
  21. name: name,
  22. inboundRay: inboundRay,
  23. accessed: make(chan bool, 1),
  24. server: server,
  25. }
  26. go r.Monitor()
  27. return r
  28. }
  29. func (this *TimedInboundRay) Monitor() {
  30. for {
  31. time.Sleep(time.Second * 16)
  32. select {
  33. case <-this.accessed:
  34. default:
  35. // Ray not accessed for a while, assuming communication is dead.
  36. this.RLock()
  37. if this.server == nil {
  38. this.RUnlock()
  39. return
  40. }
  41. this.server.RemoveRay(this.name)
  42. this.RUnlock()
  43. this.Release()
  44. return
  45. }
  46. }
  47. }
  48. func (this *TimedInboundRay) InboundInput() ray.OutputStream {
  49. this.RLock()
  50. defer this.RUnlock()
  51. if this.inboundRay == nil {
  52. return nil
  53. }
  54. select {
  55. case this.accessed <- true:
  56. default:
  57. }
  58. return this.inboundRay.InboundInput()
  59. }
  60. func (this *TimedInboundRay) InboundOutput() ray.InputStream {
  61. this.RLock()
  62. defer this.RUnlock()
  63. if this.inboundRay == nil {
  64. return nil
  65. }
  66. select {
  67. case this.accessed <- true:
  68. default:
  69. }
  70. return this.inboundRay.InboundOutput()
  71. }
  72. func (this *TimedInboundRay) Release() {
  73. log.Debug("UDP Server: Releasing TimedInboundRay: ", this.name)
  74. this.Lock()
  75. defer this.Unlock()
  76. if this.server == nil {
  77. return
  78. }
  79. this.server = nil
  80. this.inboundRay.InboundInput().Close()
  81. this.inboundRay.InboundOutput().Release()
  82. this.inboundRay = nil
  83. }
  84. type UDPServer struct {
  85. sync.RWMutex
  86. conns map[string]*TimedInboundRay
  87. packetDispatcher dispatcher.PacketDispatcher
  88. }
  89. func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
  90. return &UDPServer{
  91. conns: make(map[string]*TimedInboundRay),
  92. packetDispatcher: packetDispatcher,
  93. }
  94. }
  95. func (this *UDPServer) RemoveRay(name string) {
  96. this.Lock()
  97. defer this.Unlock()
  98. delete(this.conns, name)
  99. }
  100. func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buffer) bool {
  101. log.Debug("UDP Server: Locating existing connection for ", name)
  102. this.RLock()
  103. defer this.RUnlock()
  104. if entry, found := this.conns[name]; found {
  105. outputStream := entry.InboundInput()
  106. if outputStream == nil {
  107. return false
  108. }
  109. err := outputStream.Write(payload)
  110. if err != nil {
  111. go entry.Release()
  112. return false
  113. }
  114. return true
  115. }
  116. return false
  117. }
  118. func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) {
  119. destString := source.String() + "-" + destination.String()
  120. log.Debug("UDP Server: Dispatch request: ", destString)
  121. if this.locateExistingAndDispatch(destString, payload) {
  122. return
  123. }
  124. log.Info("UDP Server: establishing new connection for ", destString)
  125. inboundRay := this.packetDispatcher.DispatchToOutbound(destination)
  126. timedInboundRay := NewTimedInboundRay(destString, inboundRay, this)
  127. outputStream := timedInboundRay.InboundInput()
  128. if outputStream != nil {
  129. outputStream.Write(payload)
  130. }
  131. this.Lock()
  132. this.conns[destString] = timedInboundRay
  133. this.Unlock()
  134. go this.handleConnection(timedInboundRay, source, callback)
  135. }
  136. func (this *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) {
  137. for {
  138. inputStream := inboundRay.InboundOutput()
  139. if inputStream == nil {
  140. break
  141. }
  142. data, err := inboundRay.InboundOutput().Read()
  143. if err != nil {
  144. break
  145. }
  146. callback(source, data)
  147. }
  148. inboundRay.Release()
  149. }