inboundhandler.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package mocks
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/v2ray/v2ray-core/app/dispatcher"
  6. v2io "github.com/v2ray/v2ray-core/common/io"
  7. v2net "github.com/v2ray/v2ray-core/common/net"
  8. "github.com/v2ray/v2ray-core/proxy"
  9. )
  10. type InboundConnectionHandler struct {
  11. ListeningPort v2net.Port
  12. ListeningAddress v2net.Address
  13. PacketDispatcher dispatcher.PacketDispatcher
  14. ConnInput io.Reader
  15. ConnOutput io.Writer
  16. }
  17. func (this *InboundConnectionHandler) Start() error {
  18. return nil
  19. }
  20. func (this *InboundConnectionHandler) Port() v2net.Port {
  21. return this.ListeningPort
  22. }
  23. func (this *InboundConnectionHandler) Close() {
  24. }
  25. func (this *InboundConnectionHandler) Communicate(destination v2net.Destination) error {
  26. ray := this.PacketDispatcher.DispatchToOutbound(&proxy.InboundHandlerMeta{
  27. AllowPassiveConnection: false,
  28. }, destination)
  29. input := ray.InboundInput()
  30. output := ray.InboundOutput()
  31. readFinish := &sync.Mutex{}
  32. writeFinish := &sync.Mutex{}
  33. readFinish.Lock()
  34. writeFinish.Lock()
  35. go func() {
  36. v2reader := v2io.NewAdaptiveReader(this.ConnInput)
  37. defer v2reader.Release()
  38. v2io.Pipe(v2reader, input)
  39. input.Close()
  40. readFinish.Unlock()
  41. }()
  42. go func() {
  43. v2writer := v2io.NewAdaptiveWriter(this.ConnOutput)
  44. defer v2writer.Release()
  45. v2io.Pipe(output, v2writer)
  46. output.Release()
  47. writeFinish.Unlock()
  48. }()
  49. readFinish.Lock()
  50. writeFinish.Lock()
  51. return nil
  52. }