Prechádzať zdrojové kódy

completely remove allow passive connection

Darien Raymond 8 rokov pred
rodič
commit
d72029211f

+ 1 - 60
app/dispatcher/impl/default.go

@@ -2,7 +2,6 @@ package impl
 
 import (
 	"context"
-	"time"
 
 	"v2ray.com/core/app"
 	"v2ray.com/core/app/dispatcher"
@@ -10,7 +9,6 @@ import (
 	"v2ray.com/core/app/proxyman"
 	"v2ray.com/core/app/router"
 	"v2ray.com/core/common"
-	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/proxy"
@@ -71,70 +69,13 @@ func (v *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin
 	}
 
 	direct := ray.NewRay(ctx)
-	var waitFunc func() error
-	if allowPassiveConnection, ok := proxy.AllowPassiveConnectionFromContext(ctx); ok && allowPassiveConnection {
-		waitFunc = noOpWait()
-	} else {
-		wdi := &waitDataInspector{
-			hasData: make(chan bool, 1),
-		}
-		direct.AddInspector(wdi)
-		waitFunc = waitForData(wdi)
-	}
-
-	go v.waitAndDispatch(ctx, waitFunc, direct, dispatcher)
+	go dispatcher.Dispatch(ctx, direct)
 
 	return direct, nil
 }
 
-func (v *DefaultDispatcher) waitAndDispatch(ctx context.Context, wait func() error, link ray.OutboundRay, dispatcher proxyman.OutboundHandler) {
-	if err := wait(); err != nil {
-		log.Info("DefaultDispatcher: Failed precondition: ", err)
-		link.OutboundInput().CloseError()
-		link.OutboundOutput().CloseError()
-		return
-	}
-
-	dispatcher.Dispatch(ctx, link)
-}
-
 func init() {
 	common.Must(common.RegisterConfig((*dispatcher.Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
 		return NewDefaultDispatcher(ctx, config.(*dispatcher.Config))
 	}))
 }
-
-type waitDataInspector struct {
-	hasData chan bool
-}
-
-func (wdi *waitDataInspector) Input(*buf.Buffer) {
-	select {
-	case wdi.hasData <- true:
-	default:
-	}
-}
-
-func (wdi *waitDataInspector) WaitForData() bool {
-	select {
-	case <-wdi.hasData:
-		return true
-	case <-time.After(time.Minute):
-		return false
-	}
-}
-
-func waitForData(wdi *waitDataInspector) func() error {
-	return func() error {
-		if wdi.WaitForData() {
-			return nil
-		}
-		return errors.New("DefaultDispatcher: No data.")
-	}
-}
-
-func noOpWait() func() error {
-	return func() error {
-		return nil
-	}
-}

+ 1 - 1
app/proxyman/config.proto

@@ -53,7 +53,7 @@ message ReceiverConfig {
   AllocationStrategy allocation_strategy = 3;
   v2ray.core.transport.internet.StreamConfig stream_settings = 4;
   bool receive_original_destination = 5;
-  bool allow_passive_connection = 6;
+  bool allow_passive_connection = 6 [deprecated=true];
 }
 
 message InboundHandlerConfig {

+ 7 - 8
app/proxyman/inbound/always.go

@@ -37,14 +37,13 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *
 		if nl.HasNetwork(net.Network_TCP) {
 			log.Debug("Proxyman|DefaultInboundHandler: creating tcp worker on ", address, ":", port)
 			worker := &tcpWorker{
-				address:          address,
-				port:             net.Port(port),
-				proxy:            p,
-				stream:           receiverConfig.StreamSettings,
-				recvOrigDest:     receiverConfig.ReceiveOriginalDestination,
-				tag:              tag,
-				allowPassiveConn: receiverConfig.AllowPassiveConnection,
-				dispatcher:       h.mux,
+				address:      address,
+				port:         net.Port(port),
+				proxy:        p,
+				stream:       receiverConfig.StreamSettings,
+				recvOrigDest: receiverConfig.ReceiveOriginalDestination,
+				tag:          tag,
+				dispatcher:   h.mux,
 			}
 			h.workers = append(h.workers, worker)
 		}

+ 7 - 8
app/proxyman/inbound/dynamic.go

@@ -97,14 +97,13 @@ func (h *DynamicInboundHandler) refresh() error {
 		nl := p.Network()
 		if nl.HasNetwork(v2net.Network_TCP) {
 			worker := &tcpWorker{
-				tag:              h.tag,
-				address:          address,
-				port:             port,
-				proxy:            p,
-				stream:           h.receiverConfig.StreamSettings,
-				recvOrigDest:     h.receiverConfig.ReceiveOriginalDestination,
-				allowPassiveConn: h.receiverConfig.AllowPassiveConnection,
-				dispatcher:       h.mux,
+				tag:          h.tag,
+				address:      address,
+				port:         port,
+				proxy:        p,
+				stream:       h.receiverConfig.StreamSettings,
+				recvOrigDest: h.receiverConfig.ReceiveOriginalDestination,
+				dispatcher:   h.mux,
 			}
 			if err := worker.Start(); err != nil {
 				log.Warning("Proxyman:InboundHandler: Failed to create TCP worker: ", err)

+ 7 - 9
app/proxyman/inbound/worker.go

@@ -26,14 +26,13 @@ type worker interface {
 }
 
 type tcpWorker struct {
-	address          v2net.Address
-	port             v2net.Port
-	proxy            proxy.Inbound
-	stream           *internet.StreamConfig
-	recvOrigDest     bool
-	tag              string
-	allowPassiveConn bool
-	dispatcher       dispatcher.Interface
+	address      v2net.Address
+	port         v2net.Port
+	proxy        proxy.Inbound
+	stream       *internet.StreamConfig
+	recvOrigDest bool
+	tag          string
+	dispatcher   dispatcher.Interface
 
 	ctx    context.Context
 	cancel context.CancelFunc
@@ -51,7 +50,6 @@ func (w *tcpWorker) callback(conn internet.Connection) {
 	if len(w.tag) > 0 {
 		ctx = proxy.ContextWithInboundTag(ctx, w.tag)
 	}
-	ctx = proxy.ContextWithAllowPassiveConnection(ctx, w.allowPassiveConn)
 	ctx = proxy.ContextWithInboundDestination(ctx, v2net.TCPDestination(w.address, w.port))
 	ctx = proxy.ContextWithSource(ctx, v2net.DestinationFromAddr(conn.RemoteAddr()))
 	if err := w.proxy.Process(ctx, v2net.Network_TCP, conn, w.dispatcher); err != nil {

+ 9 - 22
transport/ray/direct.go

@@ -1,13 +1,11 @@
 package ray
 
 import (
+	"context"
 	"errors"
 	"io"
-
 	"time"
 
-	"context"
-
 	"v2ray.com/core/common/buf"
 )
 
@@ -46,29 +44,19 @@ func (v *directRay) InboundOutput() InputStream {
 	return v.Output
 }
 
-func (v *directRay) AddInspector(inspector Inspector) {
-	if inspector == nil {
-		return
-	}
-	v.Input.inspector.AddInspector(inspector)
-	v.Output.inspector.AddInspector(inspector)
-}
-
 type Stream struct {
-	buffer    chan *buf.Buffer
-	ctx       context.Context
-	close     chan bool
-	err       chan bool
-	inspector *InspectorChain
+	buffer chan *buf.Buffer
+	ctx    context.Context
+	close  chan bool
+	err    chan bool
 }
 
 func NewStream(ctx context.Context) *Stream {
 	return &Stream{
-		ctx:       ctx,
-		buffer:    make(chan *buf.Buffer, bufferSize),
-		close:     make(chan bool),
-		err:       make(chan bool),
-		inspector: &InspectorChain{},
+		ctx:    ctx,
+		buffer: make(chan *buf.Buffer, bufferSize),
+		close:  make(chan bool),
+		err:    make(chan bool),
 	}
 }
 
@@ -139,7 +127,6 @@ func (v *Stream) Write(data *buf.Buffer) (err error) {
 		case <-v.close:
 			return io.ErrClosedPipe
 		case v.buffer <- data:
-			v.inspector.Input(data)
 			return nil
 		}
 	}

+ 0 - 36
transport/ray/inspector.go

@@ -1,36 +0,0 @@
-package ray
-
-import (
-	"sync"
-
-	"v2ray.com/core/common/buf"
-)
-
-type Inspector interface {
-	Input(*buf.Buffer)
-}
-
-type NoOpInspector struct{}
-
-func (NoOpInspector) Input(*buf.Buffer) {}
-
-type InspectorChain struct {
-	sync.RWMutex
-	chain []Inspector
-}
-
-func (ic *InspectorChain) AddInspector(inspector Inspector) {
-	ic.Lock()
-	defer ic.Unlock()
-
-	ic.chain = append(ic.chain, inspector)
-}
-
-func (ic *InspectorChain) Input(b *buf.Buffer) {
-	ic.RLock()
-	defer ic.RUnlock()
-
-	for _, inspector := range ic.chain {
-		inspector.Input(b)
-	}
-}

+ 0 - 1
transport/ray/ray.go

@@ -32,7 +32,6 @@ type InboundRay interface {
 type Ray interface {
 	InboundRay
 	OutboundRay
-	AddInspector(Inspector)
 }
 
 type RayStream interface {