dokodemo.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package dokodemo
  2. import (
  3. "sync"
  4. "github.com/v2ray/v2ray-core/app"
  5. "github.com/v2ray/v2ray-core/app/dispatcher"
  6. "github.com/v2ray/v2ray-core/common/alloc"
  7. v2io "github.com/v2ray/v2ray-core/common/io"
  8. "github.com/v2ray/v2ray-core/common/log"
  9. v2net "github.com/v2ray/v2ray-core/common/net"
  10. "github.com/v2ray/v2ray-core/proxy"
  11. "github.com/v2ray/v2ray-core/proxy/internal"
  12. "github.com/v2ray/v2ray-core/transport/hub"
  13. )
  14. type DokodemoDoor struct {
  15. tcpMutex sync.RWMutex
  16. udpMutex sync.RWMutex
  17. config *Config
  18. accepting bool
  19. address v2net.Address
  20. port v2net.Port
  21. packetDispatcher dispatcher.PacketDispatcher
  22. tcpListener *hub.TCPHub
  23. udpHub *hub.UDPHub
  24. udpServer *hub.UDPServer
  25. listeningPort v2net.Port
  26. listeningAddress v2net.Address
  27. }
  28. func NewDokodemoDoor(config *Config, space app.Space, listen v2net.Address, port v2net.Port) *DokodemoDoor {
  29. d := &DokodemoDoor{
  30. config: config,
  31. address: config.Address,
  32. port: config.Port,
  33. listeningAddress: listen,
  34. listeningPort: port,
  35. }
  36. space.InitializeApplication(func() error {
  37. if !space.HasApp(dispatcher.APP_ID) {
  38. log.Error("Dokodemo: Dispatcher is not found in the space.")
  39. return app.ErrorMissingApplication
  40. }
  41. d.packetDispatcher = space.GetApp(dispatcher.APP_ID).(dispatcher.PacketDispatcher)
  42. return nil
  43. })
  44. return d
  45. }
  46. func (this *DokodemoDoor) Port() v2net.Port {
  47. return this.listeningPort
  48. }
  49. func (this *DokodemoDoor) Close() {
  50. this.accepting = false
  51. if this.tcpListener != nil {
  52. this.tcpMutex.Lock()
  53. this.tcpListener.Close()
  54. this.tcpListener = nil
  55. this.tcpMutex.Unlock()
  56. }
  57. if this.udpHub != nil {
  58. this.udpMutex.Lock()
  59. this.udpHub.Close()
  60. this.udpHub = nil
  61. this.udpMutex.Unlock()
  62. }
  63. }
  64. func (this *DokodemoDoor) Start() error {
  65. if this.accepting {
  66. return nil
  67. }
  68. this.accepting = true
  69. if this.config.Network.HasNetwork(v2net.TCPNetwork) {
  70. err := this.ListenTCP(this.listeningAddress, this.listeningPort)
  71. if err != nil {
  72. return err
  73. }
  74. }
  75. if this.config.Network.HasNetwork(v2net.UDPNetwork) {
  76. err := this.ListenUDP(this.listeningAddress, this.listeningPort)
  77. if err != nil {
  78. return err
  79. }
  80. }
  81. return nil
  82. }
  83. func (this *DokodemoDoor) ListenUDP(address v2net.Address, port v2net.Port) error {
  84. this.udpServer = hub.NewUDPServer(this.packetDispatcher)
  85. udpHub, err := hub.ListenUDP(address, port, this.handleUDPPackets)
  86. if err != nil {
  87. log.Error("Dokodemo failed to listen on port ", port, ": ", err)
  88. return err
  89. }
  90. this.udpMutex.Lock()
  91. this.udpHub = udpHub
  92. this.udpMutex.Unlock()
  93. return nil
  94. }
  95. func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) {
  96. this.udpServer.Dispatch(dest, v2net.UDPDestination(this.address, this.port), payload, this.handleUDPResponse)
  97. }
  98. func (this *DokodemoDoor) handleUDPResponse(dest v2net.Destination, payload *alloc.Buffer) {
  99. defer payload.Release()
  100. this.udpMutex.RLock()
  101. defer this.udpMutex.RUnlock()
  102. if !this.accepting {
  103. return
  104. }
  105. this.udpHub.WriteTo(payload.Value, dest)
  106. }
  107. func (this *DokodemoDoor) ListenTCP(address v2net.Address, port v2net.Port) error {
  108. tcpListener, err := hub.ListenTCP(address, port, this.HandleTCPConnection, nil)
  109. if err != nil {
  110. log.Error("Dokodemo: Failed to listen on port ", port, ": ", err)
  111. return err
  112. }
  113. this.tcpMutex.Lock()
  114. this.tcpListener = tcpListener
  115. this.tcpMutex.Unlock()
  116. return nil
  117. }
  118. func (this *DokodemoDoor) HandleTCPConnection(conn *hub.Connection) {
  119. defer conn.Close()
  120. ray := this.packetDispatcher.DispatchToOutbound(v2net.TCPDestination(this.address, this.port))
  121. defer ray.InboundOutput().Release()
  122. var inputFinish, outputFinish sync.Mutex
  123. inputFinish.Lock()
  124. outputFinish.Lock()
  125. reader := v2net.NewTimeOutReader(this.config.Timeout, conn)
  126. defer reader.Release()
  127. go func() {
  128. v2reader := v2io.NewAdaptiveReader(reader)
  129. defer v2reader.Release()
  130. v2io.Pipe(v2reader, ray.InboundInput())
  131. inputFinish.Unlock()
  132. ray.InboundInput().Close()
  133. }()
  134. go func() {
  135. v2writer := v2io.NewAdaptiveWriter(conn)
  136. defer v2writer.Release()
  137. v2io.Pipe(ray.InboundOutput(), v2writer)
  138. outputFinish.Unlock()
  139. }()
  140. outputFinish.Lock()
  141. inputFinish.Lock()
  142. }
  143. func init() {
  144. internal.MustRegisterInboundHandlerCreator("dokodemo-door",
  145. func(space app.Space, rawConfig interface{}, listen v2net.Address, port v2net.Port) (proxy.InboundHandler, error) {
  146. return NewDokodemoDoor(rawConfig.(*Config), space, listen, port), nil
  147. })
  148. }