udp_server.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package udp
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/v2ray/v2ray-core/app/dispatcher"
  6. "github.com/v2ray/v2ray-core/common/alloc"
  7. "github.com/v2ray/v2ray-core/common/log"
  8. v2net "github.com/v2ray/v2ray-core/common/net"
  9. "github.com/v2ray/v2ray-core/proxy"
  10. "github.com/v2ray/v2ray-core/transport/ray"
  11. )
  12. type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer)
  13. type TimedInboundRay struct {
  14. name string
  15. inboundRay ray.InboundRay
  16. accessed chan bool
  17. server *UDPServer
  18. sync.RWMutex
  19. }
  20. func NewTimedInboundRay(name string, inboundRay ray.InboundRay, server *UDPServer) *TimedInboundRay {
  21. r := &TimedInboundRay{
  22. name: name,
  23. inboundRay: inboundRay,
  24. accessed: make(chan bool, 1),
  25. server: server,
  26. }
  27. go r.Monitor()
  28. return r
  29. }
  30. func (this *TimedInboundRay) Monitor() {
  31. for {
  32. time.Sleep(time.Second * 16)
  33. select {
  34. case <-this.accessed:
  35. default:
  36. // Ray not accessed for a while, assuming communication is dead.
  37. this.RLock()
  38. if this.server == nil {
  39. this.RUnlock()
  40. return
  41. }
  42. this.server.RemoveRay(this.name)
  43. this.RUnlock()
  44. this.Release()
  45. return
  46. }
  47. }
  48. }
  49. func (this *TimedInboundRay) InboundInput() ray.OutputStream {
  50. this.RLock()
  51. defer this.RUnlock()
  52. if this.inboundRay == nil {
  53. return nil
  54. }
  55. select {
  56. case this.accessed <- true:
  57. default:
  58. }
  59. return this.inboundRay.InboundInput()
  60. }
  61. func (this *TimedInboundRay) InboundOutput() ray.InputStream {
  62. this.RLock()
  63. defer this.RUnlock()
  64. if this.inboundRay == nil {
  65. return nil
  66. }
  67. select {
  68. case this.accessed <- true:
  69. default:
  70. }
  71. return this.inboundRay.InboundOutput()
  72. }
  73. func (this *TimedInboundRay) Release() {
  74. log.Debug("UDP Server: Releasing TimedInboundRay: ", this.name)
  75. this.Lock()
  76. defer this.Unlock()
  77. if this.server == nil {
  78. return
  79. }
  80. this.server = nil
  81. this.inboundRay.InboundInput().Close()
  82. this.inboundRay.InboundOutput().Release()
  83. this.inboundRay = nil
  84. }
  85. type UDPServer struct {
  86. sync.RWMutex
  87. conns map[string]*TimedInboundRay
  88. packetDispatcher dispatcher.PacketDispatcher
  89. meta *proxy.InboundHandlerMeta
  90. }
  91. func NewUDPServer(meta *proxy.InboundHandlerMeta, packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
  92. return &UDPServer{
  93. conns: make(map[string]*TimedInboundRay),
  94. packetDispatcher: packetDispatcher,
  95. meta: meta,
  96. }
  97. }
  98. func (this *UDPServer) RemoveRay(name string) {
  99. this.Lock()
  100. defer this.Unlock()
  101. delete(this.conns, name)
  102. }
  103. func (this *UDPServer) locateExistingAndDispatch(name string, payload *alloc.Buffer) bool {
  104. log.Debug("UDP Server: Locating existing connection for ", name)
  105. this.RLock()
  106. defer this.RUnlock()
  107. if entry, found := this.conns[name]; found {
  108. outputStream := entry.InboundInput()
  109. if outputStream == nil {
  110. return false
  111. }
  112. err := outputStream.Write(payload)
  113. if err != nil {
  114. go entry.Release()
  115. return false
  116. }
  117. return true
  118. }
  119. return false
  120. }
  121. func (this *UDPServer) Dispatch(session *proxy.SessionInfo, payload *alloc.Buffer, callback UDPResponseCallback) {
  122. source := session.Source
  123. destination := session.Destination
  124. // TODO: Add user to destString
  125. destString := source.String() + "-" + destination.String()
  126. log.Debug("UDP Server: Dispatch request: ", destString)
  127. if this.locateExistingAndDispatch(destString, payload) {
  128. return
  129. }
  130. log.Info("UDP Server: establishing new connection for ", destString)
  131. inboundRay := this.packetDispatcher.DispatchToOutbound(this.meta, session)
  132. timedInboundRay := NewTimedInboundRay(destString, inboundRay, this)
  133. outputStream := timedInboundRay.InboundInput()
  134. if outputStream != nil {
  135. outputStream.Write(payload)
  136. }
  137. this.Lock()
  138. this.conns[destString] = timedInboundRay
  139. this.Unlock()
  140. go this.handleConnection(timedInboundRay, source, callback)
  141. }
  142. func (this *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) {
  143. for {
  144. inputStream := inboundRay.InboundOutput()
  145. if inputStream == nil {
  146. break
  147. }
  148. data, err := inboundRay.InboundOutput().Read()
  149. if err != nil {
  150. break
  151. }
  152. callback(source, data)
  153. }
  154. inboundRay.Release()
  155. }