Darien Raymond 8 years ago
parent
commit
c9f661f018
2 changed files with 63 additions and 58 deletions
  1. 63 55
      app/proxyman/mux/mux.go
  2. 0 3
      app/proxyman/mux/status.go

+ 63 - 55
app/proxyman/mux/mux.go

@@ -211,40 +211,43 @@ func (m *Client) fetchOutput() {
 	defer m.cancel()
 
 	reader := NewReader(m.inboundRay.InboundOutput())
+L:
 	for {
 		meta, err := reader.ReadMetadata()
 		if err != nil {
 			log.Trace(newError("failed to read metadata").Base(err))
 			break
 		}
-		if meta.SessionStatus == SessionStatusKeepAlive {
-			if meta.Option.Has(OptionData) {
-				if err := drain(reader); err != nil {
-					log.Trace(newError("failed to read data").Base(err))
-					break
+
+		var drainData bool
+		switch meta.SessionStatus {
+		case SessionStatusKeepAlive:
+			drainData = true
+		case SessionStatusEnd:
+			if s, found := m.sessionManager.Get(meta.SessionID); found {
+				s.CloseDownlink()
+				s.output.Close()
+			}
+			drainData = true
+		case SessionStatusNew:
+			drainData = true
+		case SessionStatusKeep:
+			if !meta.Option.Has(OptionData) {
+				break
+			}
+			if s, found := m.sessionManager.Get(meta.SessionID); found {
+				if err := pipe(reader, s.output); err != nil {
+					log.Trace(newError("failed to pipe data").Base(err))
+					break L
 				}
 			}
-			continue
-		}
-
-		s, found := m.sessionManager.Get(meta.SessionID)
-		if found && meta.SessionStatus == SessionStatusEnd {
-			s.CloseDownlink()
-			s.output.Close()
-		}
-		if !meta.Option.Has(OptionData) {
-			continue
-		}
-
-		if found {
-			err = pipe(reader, s.output)
-		} else {
-			err = drain(reader)
 		}
 
-		if err != nil {
-			log.Trace(newError("failed to read data").Base(err))
-			break
+		if drainData && meta.Option.Has(OptionData) {
+			if err := drain(reader); err != nil {
+				log.Trace(newError("failed to drain data").Base(err))
+				break
+			}
 		}
 	}
 }
@@ -300,6 +303,7 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
 func (w *ServerWorker) run(ctx context.Context) {
 	input := w.outboundRay.OutboundInput()
 	reader := NewReader(input)
+L:
 	for {
 		select {
 		case <-ctx.Done():
@@ -313,30 +317,25 @@ func (w *ServerWorker) run(ctx context.Context) {
 			return
 		}
 
-		if meta.SessionStatus == SessionStatusKeepAlive {
-			if meta.Option.Has(OptionData) {
-				if err := drain(reader); err != nil {
-					log.Trace(newError("failed to read data").Base(err))
-					break
-				}
+		var drainData bool
+		switch meta.SessionStatus {
+		case SessionStatusKeepAlive:
+			drainData = true
+		case SessionStatusEnd:
+			if s, found := w.sessionManager.Get(meta.SessionID); found {
+				s.CloseUplink()
+				s.output.Close()
 			}
-			continue
-		}
-
-		s, found := w.sessionManager.Get(meta.SessionID)
-		if found && meta.SessionStatus == SessionStatusEnd {
-			s.CloseUplink()
-			s.output.Close()
-		}
-
-		if meta.SessionStatus == SessionStatusNew {
+			drainData = true
+		case SessionStatusNew:
 			log.Trace(newError("received request for ", meta.Target))
 			inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
 			if err != nil {
 				log.Trace(newError("failed to dispatch request.").Base(err))
-				continue
+				drainData = true
+				break
 			}
-			s = &Session{
+			s := &Session{
 				input:  inboundRay.InboundOutput(),
 				output: inboundRay.InboundInput(),
 				parent: w.sessionManager,
@@ -344,21 +343,30 @@ func (w *ServerWorker) run(ctx context.Context) {
 			}
 			w.sessionManager.Add(s)
 			go handle(ctx, s, w.outboundRay.OutboundOutput())
+			if !meta.Option.Has(OptionData) {
+				break
+			}
+			if err := pipe(reader, s.output); err != nil {
+				log.Trace(newError("failed to read data").Base(err))
+				break L
+			}
+		case SessionStatusKeep:
+			if !meta.Option.Has(OptionData) {
+				break
+			}
+			if s, found := w.sessionManager.Get(meta.SessionID); found {
+				if err := pipe(reader, s.output); err != nil {
+					log.Trace(newError("failed to read data").Base(err))
+					break L
+				}
+			}
 		}
 
-		if !meta.Option.Has(OptionData) {
-			continue
-		}
-
-		if s != nil {
-			err = pipe(reader, s.output)
-		} else {
-			err = drain(reader)
-		}
-
-		if err != nil {
-			log.Trace(newError("failed to read data").Base(err))
-			break
+		if meta.Option.Has(OptionData) && drainData {
+			if err := drain(reader); err != nil {
+				log.Trace(newError("failed to drain data").Base(err))
+				break
+			}
 		}
 	}
 }

+ 0 - 3
app/proxyman/mux/status.go

@@ -1,3 +0,0 @@
-package mux
-
-type statusHandler func(meta *FrameMetadata) error