Browse Source

consider closed worker is full. fixes #1414

Darien Raymond 7 years ago
parent
commit
1089a887e0
2 changed files with 71 additions and 2 deletions
  1. 2 2
      common/mux/client.go
  2. 69 0
      common/mux/client_test.go

+ 2 - 2
common/mux/client.go

@@ -35,7 +35,7 @@ func (m *ClientManager) Dispatch(ctx context.Context, link *transport.Link) erro
 		}
 	}
 
-	return newError("unable to find an available mux client")
+	return newError("unable to find an available mux client").AtWarning()
 }
 
 type WorkerPicker interface {
@@ -274,7 +274,7 @@ func (m *ClientWorker) IsClosing() bool {
 }
 
 func (m *ClientWorker) IsFull() bool {
-	if m.IsClosing() {
+	if m.IsClosing() || m.Closed() {
 		return true
 	}
 

+ 69 - 0
common/mux/client_test.go

@@ -9,6 +9,8 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/mux"
+	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/session"
 	"v2ray.com/core/testing/mocks"
 	"v2ray.com/core/transport"
 	"v2ray.com/core/transport/pipe"
@@ -45,3 +47,70 @@ func TestClientWorkerEOF(t *testing.T) {
 		t.Error("expected failed dispatching, but actually not")
 	}
 }
+
+func TestClientWorkerClose(t *testing.T) {
+	mockCtl := gomock.NewController(t)
+	defer mockCtl.Finish()
+
+	r1, w1 := pipe.New(pipe.WithoutSizeLimit())
+	worker1, err := mux.NewClientWorker(transport.Link{
+		Reader: r1,
+		Writer: w1,
+	}, mux.ClientStrategy{
+		MaxConcurrency: 4,
+		MaxConnection:  4,
+	})
+	common.Must(err)
+
+	r2, w2 := pipe.New(pipe.WithoutSizeLimit())
+	worker2, err := mux.NewClientWorker(transport.Link{
+		Reader: r2,
+		Writer: w2,
+	}, mux.ClientStrategy{
+		MaxConcurrency: 4,
+		MaxConnection:  4,
+	})
+	common.Must(err)
+
+	factory := mocks.NewMuxClientWorkerFactory(mockCtl)
+	gomock.InOrder(
+		factory.EXPECT().Create().Return(worker1, nil),
+		factory.EXPECT().Create().Return(worker2, nil),
+	)
+
+	picker := &mux.IncrementalWorkerPicker{
+		Factory: factory,
+	}
+	manager := &mux.ClientManager{
+		Picker: picker,
+	}
+
+	tr1, tw1 := pipe.New(pipe.WithoutSizeLimit())
+	ctx1 := session.ContextWithOutbound(context.Background(), &session.Outbound{
+		Target: net.TCPDestination(net.DomainAddress("www.v2ray.com"), 80),
+	})
+	common.Must(manager.Dispatch(ctx1, &transport.Link{
+		Reader: tr1,
+		Writer: tw1,
+	}))
+	defer tw1.Close()
+
+	common.Must(w1.Close())
+
+	time.Sleep(time.Millisecond * 500)
+	if !worker1.Closed() {
+		t.Error("worker1 is not finished")
+	}
+
+	tr2, tw2 := pipe.New(pipe.WithoutSizeLimit())
+	ctx2 := session.ContextWithOutbound(context.Background(), &session.Outbound{
+		Target: net.TCPDestination(net.DomainAddress("www.v2ray.com"), 80),
+	})
+	common.Must(manager.Dispatch(ctx2, &transport.Link{
+		Reader: tr2,
+		Writer: tw2,
+	}))
+	defer tw2.Close()
+
+	common.Must(w2.Close())
+}