udp_server.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package udp
  2. import (
  3. "sync"
  4. "time"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/common/log"
  8. v2net "v2ray.com/core/common/net"
  9. "v2ray.com/core/proxy"
  10. "v2ray.com/core/transport/ray"
  11. )
  12. type UDPResponseCallback func(destination v2net.Destination, payload *buf.Buffer)
  13. type TimedInboundRay struct {
  14. name string
  15. inboundRay ray.InboundRay
  16. accessed chan bool
  17. server *UDPServer
  18. sync.RWMutex
  19. }
  20. func NewTimedInboundRay(name string, inboundRay ray.InboundRay, server *UDPServer) *TimedInboundRay {
  21. r := &TimedInboundRay{
  22. name: name,
  23. inboundRay: inboundRay,
  24. accessed: make(chan bool, 1),
  25. server: server,
  26. }
  27. go r.Monitor()
  28. return r
  29. }
  30. func (v *TimedInboundRay) Monitor() {
  31. for {
  32. time.Sleep(time.Second * 16)
  33. select {
  34. case <-v.accessed:
  35. default:
  36. // Ray not accessed for a while, assuming communication is dead.
  37. v.RLock()
  38. if v.server == nil {
  39. v.RUnlock()
  40. return
  41. }
  42. v.server.RemoveRay(v.name)
  43. v.RUnlock()
  44. v.Release()
  45. return
  46. }
  47. }
  48. }
  49. func (v *TimedInboundRay) InboundInput() ray.OutputStream {
  50. v.RLock()
  51. defer v.RUnlock()
  52. if v.inboundRay == nil {
  53. return nil
  54. }
  55. select {
  56. case v.accessed <- true:
  57. default:
  58. }
  59. return v.inboundRay.InboundInput()
  60. }
  61. func (v *TimedInboundRay) InboundOutput() ray.InputStream {
  62. v.RLock()
  63. defer v.RUnlock()
  64. if v.inboundRay == nil {
  65. return nil
  66. }
  67. select {
  68. case v.accessed <- true:
  69. default:
  70. }
  71. return v.inboundRay.InboundOutput()
  72. }
  73. func (v *TimedInboundRay) Release() {
  74. log.Debug("UDP Server: Releasing TimedInboundRay: ", v.name)
  75. v.Lock()
  76. defer v.Unlock()
  77. if v.server == nil {
  78. return
  79. }
  80. v.server = nil
  81. v.inboundRay.InboundInput().Close()
  82. v.inboundRay.InboundOutput().Release()
  83. v.inboundRay = nil
  84. }
  85. type UDPServer struct {
  86. sync.RWMutex
  87. conns map[string]*TimedInboundRay
  88. packetDispatcher dispatcher.PacketDispatcher
  89. }
  90. func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
  91. return &UDPServer{
  92. conns: make(map[string]*TimedInboundRay),
  93. packetDispatcher: packetDispatcher,
  94. }
  95. }
  96. func (v *UDPServer) RemoveRay(name string) {
  97. v.Lock()
  98. defer v.Unlock()
  99. delete(v.conns, name)
  100. }
  101. func (v *UDPServer) locateExistingAndDispatch(name string, payload *buf.Buffer) bool {
  102. log.Debug("UDP Server: Locating existing connection for ", name)
  103. v.RLock()
  104. defer v.RUnlock()
  105. if entry, found := v.conns[name]; found {
  106. outputStream := entry.InboundInput()
  107. if outputStream == nil {
  108. return false
  109. }
  110. err := outputStream.Write(payload)
  111. if err != nil {
  112. go entry.Release()
  113. return false
  114. }
  115. return true
  116. }
  117. return false
  118. }
  119. func (v *UDPServer) Dispatch(session *proxy.SessionInfo, payload *buf.Buffer, callback UDPResponseCallback) {
  120. source := session.Source
  121. destination := session.Destination
  122. // TODO: Add user to destString
  123. destString := source.String() + "-" + destination.String()
  124. log.Debug("UDP Server: Dispatch request: ", destString)
  125. if v.locateExistingAndDispatch(destString, payload) {
  126. return
  127. }
  128. log.Info("UDP Server: establishing new connection for ", destString)
  129. inboundRay := v.packetDispatcher.DispatchToOutbound(session)
  130. timedInboundRay := NewTimedInboundRay(destString, inboundRay, v)
  131. outputStream := timedInboundRay.InboundInput()
  132. if outputStream != nil {
  133. outputStream.Write(payload)
  134. }
  135. v.Lock()
  136. v.conns[destString] = timedInboundRay
  137. v.Unlock()
  138. go v.handleConnection(timedInboundRay, source, callback)
  139. }
  140. func (v *UDPServer) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback UDPResponseCallback) {
  141. for {
  142. inputStream := inboundRay.InboundOutput()
  143. if inputStream == nil {
  144. break
  145. }
  146. data, err := inboundRay.InboundOutput().Read()
  147. if err != nil {
  148. break
  149. }
  150. callback(source, data)
  151. }
  152. inboundRay.Release()
  153. }