Forráskód Böngészése

fix draining in mux

Darien Raymond 7 éve
szülő
commit
8e62134bdf
1 módosított fájl, 25 hozzáadás és 16 törlés
  1. 25 16
      app/proxyman/mux/mux.go

+ 25 - 16
app/proxyman/mux/mux.go

@@ -204,20 +204,20 @@ func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool {
 	return true
 }
 
-func drain(reader *buf.BufferedReader) error {
-	return buf.Copy(NewStreamReader(reader), buf.Discard)
+func drain(reader buf.Reader) error {
+	return buf.Copy(reader, buf.Discard)
 }
 
 func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if meta.Option.Has(OptionData) {
-		return drain(reader)
+		return drain(NewStreamReader(reader))
 	}
 	return nil
 }
 
 func (m *Client) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if meta.Option.Has(OptionData) {
-		return drain(reader)
+		return drain(NewStreamReader(reader))
 	}
 	return nil
 }
@@ -228,14 +228,15 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade
 	}
 
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
-		if err := buf.Copy(s.NewReader(reader), s.output); err != nil {
-			drain(reader)
+		rr := s.NewReader(reader)
+		if err := buf.Copy(rr, s.output); err != nil {
+			drain(rr)
 			pipe.CloseError(s.input)
 			return s.Close()
 		}
 		return nil
 	}
-	return drain(reader)
+	return drain(NewStreamReader(reader))
 }
 
 func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
@@ -247,7 +248,7 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader
 		s.Close()
 	}
 	if meta.Option.Has(OptionData) {
-		return drain(reader)
+		return drain(NewStreamReader(reader))
 	}
 	return nil
 }
@@ -346,7 +347,7 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
 
 func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if meta.Option.Has(OptionData) {
-		return drain(reader)
+		return drain(NewStreamReader(reader))
 	}
 	return nil
 }
@@ -367,7 +368,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 	link, err := w.dispatcher.Dispatch(ctx, meta.Target)
 	if err != nil {
 		if meta.Option.Has(OptionData) {
-			drain(reader)
+			drain(NewStreamReader(reader))
 		}
 		return newError("failed to dispatch request.").Base(err)
 	}
@@ -383,8 +384,15 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 	}
 	w.sessionManager.Add(s)
 	go handle(ctx, s, w.link.Writer)
-	if meta.Option.Has(OptionData) {
-		return buf.Copy(s.NewReader(reader), s.output, buf.IgnoreWriterError())
+	if !meta.Option.Has(OptionData) {
+		return nil
+	}
+
+	rr := s.NewReader(reader)
+	if err := buf.Copy(rr, s.output); err != nil {
+		drain(rr)
+		pipe.CloseError(s.input)
+		return s.Close()
 	}
 	return nil
 }
@@ -394,14 +402,15 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 		return nil
 	}
 	if s, found := w.sessionManager.Get(meta.SessionID); found {
-		if err := buf.Copy(s.NewReader(reader), s.output); err != nil {
-			drain(reader)
+		rr := s.NewReader(reader)
+		if err := buf.Copy(rr, s.output); err != nil {
+			drain(rr)
 			pipe.CloseError(s.input)
 			return s.Close()
 		}
 		return nil
 	}
-	return drain(reader)
+	return drain(NewStreamReader(reader))
 }
 
 func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
@@ -413,7 +422,7 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
 		s.Close()
 	}
 	if meta.Option.Has(OptionData) {
-		return drain(reader)
+		return drain(NewStreamReader(reader))
 	}
 	return nil
 }