瀏覽代碼

cleanup handler functions

Darien Raymond 8 年之前
父節點
當前提交
0a2547b285
共有 1 個文件被更改,包括 108 次插入67 次删除
  1. 108 67
      app/proxyman/mux/mux.go

+ 108 - 67
app/proxyman/mux/mux.go

@@ -207,11 +207,46 @@ func pipe(reader *Reader, writer buf.Writer) error {
 	}
 }
 
+func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *Reader) error {
+	if meta.Option.Has(OptionData) {
+		return drain(reader)
+	}
+	return nil
+}
+
+func (m *Client) handleStatusNew(meta *FrameMetadata, reader *Reader) error {
+	if meta.Option.Has(OptionData) {
+		return drain(reader)
+	}
+	return nil
+}
+
+func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *Reader) error {
+	if !meta.Option.Has(OptionData) {
+		return nil
+	}
+
+	if s, found := m.sessionManager.Get(meta.SessionID); found {
+		return pipe(reader, s.output)
+	}
+	return drain(reader)
+}
+
+func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *Reader) error {
+	if s, found := m.sessionManager.Get(meta.SessionID); found {
+		s.CloseDownlink()
+		s.output.Close()
+	}
+	if meta.Option.Has(OptionData) {
+		return drain(reader)
+	}
+	return nil
+}
+
 func (m *Client) fetchOutput() {
 	defer m.cancel()
 
 	reader := NewReader(m.inboundRay.InboundOutput())
-L:
 	for {
 		meta, err := reader.ReadMetadata()
 		if err != nil {
@@ -219,35 +254,23 @@ L:
 			break
 		}
 
-		var drainData bool
 		switch meta.SessionStatus {
 		case SessionStatusKeepAlive:
-			drainData = true
+			err = m.handleStatueKeepAlive(meta, reader)
 		case SessionStatusEnd:
-			if s, found := m.sessionManager.Get(meta.SessionID); found {
-				s.CloseDownlink()
-				s.output.Close()
-			}
-			drainData = true
+			err = m.handleStatusEnd(meta, reader)
 		case SessionStatusNew:
-			drainData = true
+			err = m.handleStatusNew(meta, reader)
 		case SessionStatusKeep:
-			if !meta.Option.Has(OptionData) {
-				break
-			}
-			if s, found := m.sessionManager.Get(meta.SessionID); found {
-				if err := pipe(reader, s.output); err != nil {
-					log.Trace(newError("failed to pipe data").Base(err))
-					break L
-				}
-			}
+			err = m.handleStatusKeep(meta, reader)
+		default:
+			log.Trace(newError("unknown status: ", meta.SessionStatus).AtWarning())
+			return
 		}
 
-		if drainData && meta.Option.Has(OptionData) {
-			if err := drain(reader); err != nil {
-				log.Trace(newError("failed to drain data").Base(err))
-				break
-			}
+		if err != nil {
+			log.Trace(newError("failed to process data").Base(err))
+			return
 		}
 	}
 }
@@ -300,12 +323,63 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
 	s.CloseDownlink()
 }
 
+func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *Reader) error {
+	if meta.Option.Has(OptionData) {
+		return drain(reader)
+	}
+	return nil
+}
+
+func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *Reader) error {
+	log.Trace(newError("received request for ", meta.Target))
+	inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
+	if err != nil {
+		if meta.Option.Has(OptionData) {
+			drain(reader)
+		}
+		return newError("failed to dispatch request.").Base(err)
+	}
+	s := &Session{
+		input:  inboundRay.InboundOutput(),
+		output: inboundRay.InboundInput(),
+		parent: w.sessionManager,
+		ID:     meta.SessionID,
+	}
+	w.sessionManager.Add(s)
+	go handle(ctx, s, w.outboundRay.OutboundOutput())
+	if meta.Option.Has(OptionData) {
+		return pipe(reader, s.output)
+	}
+	return nil
+}
+
+func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *Reader) error {
+	if !meta.Option.Has(OptionData) {
+		return nil
+	}
+	if s, found := w.sessionManager.Get(meta.SessionID); found {
+		return pipe(reader, s.output)
+	}
+	return drain(reader)
+}
+
+func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *Reader) error {
+	if s, found := w.sessionManager.Get(meta.SessionID); found {
+		s.CloseUplink()
+		s.output.Close()
+	}
+	if meta.Option.Has(OptionData) {
+		return drain(reader)
+	}
+	return nil
+}
+
 func (w *ServerWorker) run(ctx context.Context) {
 	input := w.outboundRay.OutboundInput()
 	reader := NewReader(input)
 
 	defer w.sessionManager.Close()
-L:
+
 	for {
 		select {
 		case <-ctx.Done():
@@ -319,56 +393,23 @@ L:
 			return
 		}
 
-		var drainData bool
 		switch meta.SessionStatus {
 		case SessionStatusKeepAlive:
-			drainData = true
+			err = w.handleStatusKeepAlive(meta, reader)
 		case SessionStatusEnd:
-			if s, found := w.sessionManager.Get(meta.SessionID); found {
-				s.CloseUplink()
-				s.output.Close()
-			}
-			drainData = true
+			err = w.handleStatusEnd(meta, reader)
 		case SessionStatusNew:
-			log.Trace(newError("received request for ", meta.Target))
-			inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
-			if err != nil {
-				log.Trace(newError("failed to dispatch request.").Base(err))
-				drainData = true
-				break
-			}
-			s := &Session{
-				input:  inboundRay.InboundOutput(),
-				output: inboundRay.InboundInput(),
-				parent: w.sessionManager,
-				ID:     meta.SessionID,
-			}
-			w.sessionManager.Add(s)
-			go handle(ctx, s, w.outboundRay.OutboundOutput())
-			if !meta.Option.Has(OptionData) {
-				break
-			}
-			if err := pipe(reader, s.output); err != nil {
-				log.Trace(newError("failed to read data").Base(err))
-				break L
-			}
+			err = w.handleStatusNew(ctx, meta, reader)
 		case SessionStatusKeep:
-			if !meta.Option.Has(OptionData) {
-				break
-			}
-			if s, found := w.sessionManager.Get(meta.SessionID); found {
-				if err := pipe(reader, s.output); err != nil {
-					log.Trace(newError("failed to read data").Base(err))
-					break L
-				}
-			}
+			err = w.handleStatusKeep(meta, reader)
+		default:
+			log.Trace(newError("unknown status: ", meta.SessionStatus).AtWarning())
+			return
 		}
 
-		if meta.Option.Has(OptionData) && drainData {
-			if err := drain(reader); err != nil {
-				log.Trace(newError("failed to drain data").Base(err))
-				break
-			}
+		if err != nil {
+			log.Trace(newError("failed to process data").Base(err))
+			return
 		}
 	}
 }