Browse Source

fixing misbehaving code in mux that do not propagate context

Shelikhoo 4 years ago
parent
commit
b585f2283a
2 changed files with 18 additions and 7 deletions
  1. 7 6
      app/proxyman/outbound/handler.go
  2. 11 1
      common/mux/client.go

+ 7 - 6
app/proxyman/outbound/handler.go

@@ -108,14 +108,15 @@ func NewHandler(ctx context.Context, config *core.OutboundHandlerConfig) (outbou
 		h.mux = &mux.ClientManager{
 		h.mux = &mux.ClientManager{
 			Enabled: h.senderSettings.MultiplexSettings.Enabled,
 			Enabled: h.senderSettings.MultiplexSettings.Enabled,
 			Picker: &mux.IncrementalWorkerPicker{
 			Picker: &mux.IncrementalWorkerPicker{
-				Factory: &mux.DialingWorkerFactory{
-					Proxy:  proxyHandler,
-					Dialer: h,
-					Strategy: mux.ClientStrategy{
+				Factory: mux.NewDialingWorkerFactory(
+					ctx,
+					proxyHandler,
+					h,
+					mux.ClientStrategy{
 						MaxConcurrency: config.Concurrency,
 						MaxConcurrency: config.Concurrency,
 						MaxConnection:  128,
 						MaxConnection:  128,
 					},
 					},
-				},
+				),
 			},
 			},
 		}
 		}
 	}
 	}
@@ -199,7 +200,7 @@ func (h *Handler) Dial(ctx context.Context, dest net.Destination) (internet.Conn
 	if h.senderSettings != nil && h.senderSettings.ProxySettings != nil && h.senderSettings.ProxySettings.HasTag() && h.senderSettings.ProxySettings.TransportLayerProxy {
 	if h.senderSettings != nil && h.senderSettings.ProxySettings != nil && h.senderSettings.ProxySettings.HasTag() && h.senderSettings.ProxySettings.TransportLayerProxy {
 		tag := h.senderSettings.ProxySettings.Tag
 		tag := h.senderSettings.ProxySettings.Tag
 		newError("transport layer proxying to ", tag, " for dest ", dest).AtDebug().WriteToLog(session.ExportIDToError(ctx))
 		newError("transport layer proxying to ", tag, " for dest ", dest).AtDebug().WriteToLog(session.ExportIDToError(ctx))
-		session.SetTransportLayerProxyTagToContext(ctx, tag)
+		ctx = session.SetTransportLayerProxyTagToContext(ctx, tag)
 	}
 	}
 
 
 	conn, err := internet.Dial(ctx, dest, h.streamSettings)
 	conn, err := internet.Dial(ctx, dest, h.streamSettings)

+ 11 - 1
common/mux/client.go

@@ -131,8 +131,18 @@ type DialingWorkerFactory struct {
 	Proxy    proxy.Outbound
 	Proxy    proxy.Outbound
 	Dialer   internet.Dialer
 	Dialer   internet.Dialer
 	Strategy ClientStrategy
 	Strategy ClientStrategy
+
+	ctx context.Context
 }
 }
 
 
+func NewDialingWorkerFactory(ctx context.Context, Proxy proxy.Outbound, Dialer internet.Dialer, Strategy ClientStrategy) *DialingWorkerFactory {
+	return &DialingWorkerFactory{
+		Proxy:    Proxy,
+		Dialer:   Dialer,
+		Strategy: Strategy,
+		ctx:      ctx,
+	}
+}
 func (f *DialingWorkerFactory) Create() (*ClientWorker, error) {
 func (f *DialingWorkerFactory) Create() (*ClientWorker, error) {
 	opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)}
 	opts := []pipe.Option{pipe.WithSizeLimit(64 * 1024)}
 	uplinkReader, upLinkWriter := pipe.New(opts...)
 	uplinkReader, upLinkWriter := pipe.New(opts...)
@@ -148,7 +158,7 @@ func (f *DialingWorkerFactory) Create() (*ClientWorker, error) {
 	}
 	}
 
 
 	go func(p proxy.Outbound, d internet.Dialer, c common.Closable) {
 	go func(p proxy.Outbound, d internet.Dialer, c common.Closable) {
-		ctx := session.ContextWithOutbound(context.Background(), &session.Outbound{
+		ctx := session.ContextWithOutbound(f.ctx, &session.Outbound{
 			Target: net.TCPDestination(muxCoolAddress, muxCoolPort),
 			Target: net.TCPDestination(muxCoolAddress, muxCoolPort),
 		})
 		})
 		ctx, cancel := context.WithCancel(ctx)
 		ctx, cancel := context.WithCancel(ctx)