loopback.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. //go:build !confonly
  2. // +build !confonly
  3. package loopback
  4. import (
  5. "context"
  6. core "github.com/v2fly/v2ray-core/v5"
  7. "github.com/v2fly/v2ray-core/v5/common"
  8. "github.com/v2fly/v2ray-core/v5/common/buf"
  9. "github.com/v2fly/v2ray-core/v5/common/net"
  10. "github.com/v2fly/v2ray-core/v5/common/retry"
  11. "github.com/v2fly/v2ray-core/v5/common/session"
  12. "github.com/v2fly/v2ray-core/v5/common/task"
  13. "github.com/v2fly/v2ray-core/v5/features/routing"
  14. "github.com/v2fly/v2ray-core/v5/transport"
  15. "github.com/v2fly/v2ray-core/v5/transport/internet"
  16. )
  17. type Loopback struct {
  18. config *Config
  19. dispatcherInstance routing.Dispatcher
  20. }
  21. func (l *Loopback) Process(ctx context.Context, link *transport.Link, _ internet.Dialer) error {
  22. outbound := session.OutboundFromContext(ctx)
  23. if outbound == nil || !outbound.Target.IsValid() {
  24. return newError("target not specified.")
  25. }
  26. destination := outbound.Target
  27. newError("opening connection to ", destination).WriteToLog(session.ExportIDToError(ctx))
  28. input := link.Reader
  29. output := link.Writer
  30. var conn internet.Connection
  31. err := retry.ExponentialBackoff(5, 100).On(func() error {
  32. dialDest := destination
  33. content := new(session.Content)
  34. content.SkipDNSResolve = true
  35. ctx = session.ContextWithContent(ctx, content)
  36. inbound := session.InboundFromContext(ctx)
  37. inbound.Tag = l.config.InboundTag
  38. ctx = session.ContextWithInbound(ctx, inbound)
  39. rawConn, err := l.dispatcherInstance.Dispatch(ctx, dialDest)
  40. if err != nil {
  41. return err
  42. }
  43. var readerOpt net.ConnectionOption
  44. if dialDest.Network == net.Network_TCP {
  45. readerOpt = net.ConnectionOutputMulti(rawConn.Reader)
  46. } else {
  47. readerOpt = net.ConnectionOutputMultiUDP(rawConn.Reader)
  48. }
  49. conn = net.NewConnection(net.ConnectionInputMulti(rawConn.Writer), readerOpt)
  50. return nil
  51. })
  52. if err != nil {
  53. return newError("failed to open connection to ", destination).Base(err)
  54. }
  55. defer conn.Close()
  56. requestDone := func() error {
  57. var writer buf.Writer
  58. if destination.Network == net.Network_TCP {
  59. writer = buf.NewWriter(conn)
  60. } else {
  61. writer = &buf.SequentialWriter{Writer: conn}
  62. }
  63. if err := buf.Copy(input, writer); err != nil {
  64. return newError("failed to process request").Base(err)
  65. }
  66. return nil
  67. }
  68. responseDone := func() error {
  69. var reader buf.Reader
  70. if destination.Network == net.Network_TCP {
  71. reader = buf.NewReader(conn)
  72. } else {
  73. reader = buf.NewPacketReader(conn)
  74. }
  75. if err := buf.Copy(reader, output); err != nil {
  76. return newError("failed to process response").Base(err)
  77. }
  78. return nil
  79. }
  80. if err := task.Run(ctx, requestDone, task.OnSuccess(responseDone, task.Close(output))); err != nil {
  81. return newError("connection ends").Base(err)
  82. }
  83. return nil
  84. }
  85. func (l *Loopback) init(config *Config, dispatcherInstance routing.Dispatcher) error {
  86. l.dispatcherInstance = dispatcherInstance
  87. l.config = config
  88. return nil
  89. }
  90. func init() {
  91. common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  92. l := new(Loopback)
  93. err := core.RequireFeatures(ctx, func(dispatcherInstance routing.Dispatcher) error {
  94. return l.init(config.(*Config), dispatcherInstance)
  95. })
  96. return l, err
  97. }))
  98. }