Browse Source

better handle errors from copy

Darien Raymond 7 years ago
parent
commit
c272677065
2 changed files with 41 additions and 29 deletions
  1. 19 16
      common/mux/client.go
  2. 22 13
      common/mux/server.go

+ 19 - 16
common/mux/client.go

@@ -201,20 +201,16 @@ func (m *Client) Dispatch(ctx context.Context, link *vio.Link) bool {
 	return true
 }
 
-func drain(reader buf.Reader) error {
-	return buf.Copy(reader, buf.Discard)
-}
-
 func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
+		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
 	return nil
 }
 
 func (m *Client) handleStatusNew(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
+		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
 	return nil
 }
@@ -224,16 +220,23 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade
 		return nil
 	}
 
-	if s, found := m.sessionManager.Get(meta.SessionID); found {
-		rr := s.NewReader(reader)
-		if err := buf.Copy(rr, s.output); err != nil {
-			drain(rr)
-			pipe.CloseError(s.input)
-			return s.Close()
-		}
-		return nil
+	s, found := m.sessionManager.Get(meta.SessionID)
+	if !found {
+		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
-	return drain(NewStreamReader(reader))
+
+	rr := s.NewReader(reader)
+	err := buf.Copy(rr, s.output)
+	if err != nil && buf.IsWriteError(err) {
+		newError("failed to write to downstream. closing session ", s.ID).Base(err).WriteToLog()
+
+		drainErr := buf.Copy(rr, buf.Discard)
+		pipe.CloseError(s.input)
+		s.Close()
+		return drainErr
+	}
+
+	return err
 }
 
 func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
@@ -245,7 +248,7 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader
 		s.Close()
 	}
 	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
+		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
 	return nil
 }

+ 22 - 13
common/mux/server.go

@@ -85,7 +85,7 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
 
 func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
+		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
 	return nil
 }
@@ -106,7 +106,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 	link, err := w.dispatcher.Dispatch(ctx, meta.Target)
 	if err != nil {
 		if meta.Option.Has(OptionData) {
-			drain(NewStreamReader(reader))
+			buf.Copy(NewStreamReader(reader), buf.Discard)
 		}
 		return newError("failed to dispatch request.").Base(err)
 	}
@@ -128,7 +128,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 
 	rr := s.NewReader(reader)
 	if err := buf.Copy(rr, s.output); err != nil {
-		drain(rr)
+		buf.Copy(rr, buf.Discard)
 		pipe.CloseError(s.input)
 		return s.Close()
 	}
@@ -139,16 +139,25 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 	if !meta.Option.Has(OptionData) {
 		return nil
 	}
-	if s, found := w.sessionManager.Get(meta.SessionID); found {
-		rr := s.NewReader(reader)
-		if err := buf.Copy(rr, s.output); err != nil {
-			drain(rr)
-			pipe.CloseError(s.input)
-			return s.Close()
-		}
-		return nil
+
+	s, found := w.sessionManager.Get(meta.SessionID)
+	if !found {
+		buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
-	return drain(NewStreamReader(reader))
+
+	rr := s.NewReader(reader)
+	err := buf.Copy(rr, s.output)
+
+	if err != nil && buf.IsWriteError(err) {
+		newError("failed to write to downstream writer. closing session ", s.ID).Base(err)
+
+		drainErr := buf.Copy(rr, buf.Discard)
+		pipe.CloseError(s.input)
+		s.Close()
+		return drainErr
+	}
+
+	return err
 }
 
 func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
@@ -160,7 +169,7 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
 		s.Close()
 	}
 	if meta.Option.Has(OptionData) {
-		return drain(NewStreamReader(reader))
+		return buf.Copy(NewStreamReader(reader), buf.Discard)
 	}
 	return nil
 }