udp_server.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package udp
  2. import (
  3. "sync"
  4. "time"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/common/log"
  8. v2net "v2ray.com/core/common/net"
  9. "v2ray.com/core/proxy"
  10. "v2ray.com/core/transport/ray"
  11. )
  12. type ResponseCallback func(destination v2net.Destination, payload *buf.Buffer)
  13. type TimedInboundRay struct {
  14. name string
  15. inboundRay ray.InboundRay
  16. accessed chan bool
  17. server *Server
  18. sync.RWMutex
  19. }
  20. func NewTimedInboundRay(name string, inboundRay ray.InboundRay, server *Server) *TimedInboundRay {
  21. r := &TimedInboundRay{
  22. name: name,
  23. inboundRay: inboundRay,
  24. accessed: make(chan bool, 1),
  25. server: server,
  26. }
  27. go r.Monitor()
  28. return r
  29. }
  30. func (v *TimedInboundRay) Monitor() {
  31. for {
  32. time.Sleep(time.Second * 16)
  33. select {
  34. case <-v.accessed:
  35. default:
  36. // Ray not accessed for a while, assuming communication is dead.
  37. v.RLock()
  38. if v.server == nil {
  39. v.RUnlock()
  40. return
  41. }
  42. v.server.RemoveRay(v.name)
  43. v.RUnlock()
  44. v.Release()
  45. return
  46. }
  47. }
  48. }
  49. func (v *TimedInboundRay) InboundInput() ray.OutputStream {
  50. v.RLock()
  51. defer v.RUnlock()
  52. if v.inboundRay == nil {
  53. return nil
  54. }
  55. select {
  56. case v.accessed <- true:
  57. default:
  58. }
  59. return v.inboundRay.InboundInput()
  60. }
  61. func (v *TimedInboundRay) InboundOutput() ray.InputStream {
  62. v.RLock()
  63. defer v.RUnlock()
  64. if v.inboundRay == nil {
  65. return nil
  66. }
  67. select {
  68. case v.accessed <- true:
  69. default:
  70. }
  71. return v.inboundRay.InboundOutput()
  72. }
  73. func (v *TimedInboundRay) Release() {
  74. log.Debug("UDP Server: Releasing TimedInboundRay: ", v.name)
  75. v.Lock()
  76. defer v.Unlock()
  77. if v.server == nil {
  78. return
  79. }
  80. v.server = nil
  81. v.inboundRay.InboundInput().Close()
  82. v.inboundRay.InboundOutput().CloseError()
  83. v.inboundRay = nil
  84. }
  85. type Server struct {
  86. sync.RWMutex
  87. conns map[string]*TimedInboundRay
  88. packetDispatcher dispatcher.PacketDispatcher
  89. }
  90. func NewServer(packetDispatcher dispatcher.PacketDispatcher) *Server {
  91. return &Server{
  92. conns: make(map[string]*TimedInboundRay),
  93. packetDispatcher: packetDispatcher,
  94. }
  95. }
  96. func (v *Server) RemoveRay(name string) {
  97. v.Lock()
  98. defer v.Unlock()
  99. delete(v.conns, name)
  100. }
  101. func (v *Server) locateExistingAndDispatch(name string, payload *buf.Buffer) bool {
  102. log.Debug("UDP Server: Locating existing connection for ", name)
  103. v.RLock()
  104. defer v.RUnlock()
  105. if entry, found := v.conns[name]; found {
  106. outputStream := entry.InboundInput()
  107. if outputStream == nil {
  108. return false
  109. }
  110. err := outputStream.Write(payload)
  111. if err != nil {
  112. go entry.Release()
  113. return false
  114. }
  115. return true
  116. }
  117. return false
  118. }
  119. func (v *Server) getInboundRay(dest string, session *proxy.SessionInfo) (*TimedInboundRay, bool) {
  120. v.Lock()
  121. defer v.Unlock()
  122. if entry, found := v.conns[dest]; found {
  123. return entry, true
  124. }
  125. log.Info("UDP|Server: establishing new connection for ", dest)
  126. inboundRay := v.packetDispatcher.DispatchToOutbound(session)
  127. return NewTimedInboundRay(dest, inboundRay, v), false
  128. }
  129. func (v *Server) Dispatch(session *proxy.SessionInfo, payload *buf.Buffer, callback ResponseCallback) {
  130. source := session.Source
  131. destination := session.Destination
  132. // TODO: Add user to destString
  133. destString := source.String() + "-" + destination.String()
  134. log.Debug("UDP|Server: Dispatch request: ", destString)
  135. inboundRay, existing := v.getInboundRay(destString, session)
  136. outputStream := inboundRay.InboundInput()
  137. if outputStream != nil {
  138. outputStream.Write(payload)
  139. }
  140. if !existing {
  141. go v.handleConnection(inboundRay, source, callback)
  142. }
  143. }
  144. func (v *Server) handleConnection(inboundRay *TimedInboundRay, source v2net.Destination, callback ResponseCallback) {
  145. for {
  146. inputStream := inboundRay.InboundOutput()
  147. if inputStream == nil {
  148. break
  149. }
  150. data, err := inputStream.Read()
  151. if err != nil {
  152. break
  153. }
  154. callback(source, data)
  155. }
  156. inboundRay.Release()
  157. }