Darien Raymond 7 years ago
parent
commit
6c89940e65
1 changed files with 18 additions and 9 deletions
  1. 18 9
      common/mux/server.go

+ 18 - 9
common/mux/server.go

@@ -44,15 +44,14 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link,
 	uplinkReader, uplinkWriter := pipe.New(opts...)
 	downlinkReader, downlinkWriter := pipe.New(opts...)
 
-	worker := &ServerWorker{
-		dispatcher: s.dispatcher,
-		link: &vio.Link{
-			Reader: uplinkReader,
-			Writer: downlinkWriter,
-		},
-		sessionManager: NewSessionManager(),
+	_, err := NewServerWorker(ctx, s.dispatcher, &vio.Link{
+		Reader: uplinkReader,
+		Writer: downlinkWriter,
+	})
+	if err != nil {
+		return nil, err
 	}
-	go worker.run(ctx)
+
 	return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
 }
 
@@ -72,6 +71,16 @@ type ServerWorker struct {
 	sessionManager *SessionManager
 }
 
+func NewServerWorker(ctx context.Context, d routing.Dispatcher, link *vio.Link) (*ServerWorker, error) {
+	worker := &ServerWorker{
+		dispatcher:     d,
+		link:           link,
+		sessionManager: NewSessionManager(),
+	}
+	go worker.run(ctx)
+	return worker, nil
+}
+
 func handle(ctx context.Context, s *Session, output buf.Writer) {
 	writer := NewResponseWriter(s.ID, output, s.transferType)
 	if err := buf.Copy(s.input, writer); err != nil {
@@ -142,7 +151,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 
 	s, found := w.sessionManager.Get(meta.SessionID)
 	if !found {
-		buf.Copy(NewStreamReader(reader), buf.Discard)
+		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
 
 	rr := s.NewReader(reader)