inboundhandler.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package mocks
  2. import (
  3. "io"
  4. "sync"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/common/buf"
  7. v2net "v2ray.com/core/common/net"
  8. "v2ray.com/core/proxy"
  9. )
  10. type InboundConnectionHandler struct {
  11. ListeningPort v2net.Port
  12. ListeningAddress v2net.Address
  13. PacketDispatcher dispatcher.PacketDispatcher
  14. ConnInput io.Reader
  15. ConnOutput io.Writer
  16. }
  17. func (v *InboundConnectionHandler) Start() error {
  18. return nil
  19. }
  20. func (v *InboundConnectionHandler) Port() v2net.Port {
  21. return v.ListeningPort
  22. }
  23. func (v *InboundConnectionHandler) Close() {
  24. }
  25. func (v *InboundConnectionHandler) Communicate(destination v2net.Destination) error {
  26. ray := v.PacketDispatcher.DispatchToOutbound(&proxy.SessionInfo{
  27. Source: v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(0)),
  28. Destination: destination,
  29. Inbound: &proxy.InboundHandlerMeta{
  30. AllowPassiveConnection: false,
  31. },
  32. })
  33. input := ray.InboundInput()
  34. output := ray.InboundOutput()
  35. readFinish := &sync.Mutex{}
  36. writeFinish := &sync.Mutex{}
  37. readFinish.Lock()
  38. writeFinish.Lock()
  39. go func() {
  40. v2reader := buf.NewReader(v.ConnInput)
  41. defer v2reader.Release()
  42. buf.Pipe(v2reader, input)
  43. input.Close()
  44. readFinish.Unlock()
  45. }()
  46. go func() {
  47. v2writer := buf.NewWriter(v.ConnOutput)
  48. defer v2writer.Release()
  49. buf.Pipe(output, v2writer)
  50. output.Release()
  51. writeFinish.Unlock()
  52. }()
  53. readFinish.Lock()
  54. writeFinish.Lock()
  55. return nil
  56. }