inboundhandler.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package mocks
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/v2ray/v2ray-core/app/dispatcher"
  6. v2io "github.com/v2ray/v2ray-core/common/io"
  7. v2net "github.com/v2ray/v2ray-core/common/net"
  8. "github.com/v2ray/v2ray-core/proxy"
  9. )
  10. type InboundConnectionHandler struct {
  11. ListeningPort v2net.Port
  12. ListeningAddress v2net.Address
  13. PacketDispatcher dispatcher.PacketDispatcher
  14. ConnInput io.Reader
  15. ConnOutput io.Writer
  16. }
  17. func (this *InboundConnectionHandler) Start() error {
  18. return nil
  19. }
  20. func (this *InboundConnectionHandler) Port() v2net.Port {
  21. return this.ListeningPort
  22. }
  23. func (this *InboundConnectionHandler) Close() {
  24. }
  25. func (this *InboundConnectionHandler) Communicate(destination v2net.Destination) error {
  26. ray := this.PacketDispatcher.DispatchToOutbound(&proxy.InboundHandlerMeta{
  27. AllowPassiveConnection: false,
  28. }, &proxy.SessionInfo{
  29. Source: v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(0)),
  30. Destination: destination,
  31. })
  32. input := ray.InboundInput()
  33. output := ray.InboundOutput()
  34. readFinish := &sync.Mutex{}
  35. writeFinish := &sync.Mutex{}
  36. readFinish.Lock()
  37. writeFinish.Lock()
  38. go func() {
  39. v2reader := v2io.NewAdaptiveReader(this.ConnInput)
  40. defer v2reader.Release()
  41. v2io.Pipe(v2reader, input)
  42. input.Close()
  43. readFinish.Unlock()
  44. }()
  45. go func() {
  46. v2writer := v2io.NewAdaptiveWriter(this.ConnOutput)
  47. defer v2writer.Release()
  48. v2io.Pipe(output, v2writer)
  49. output.Release()
  50. writeFinish.Unlock()
  51. }()
  52. readFinish.Lock()
  53. writeFinish.Lock()
  54. return nil
  55. }