Pārlūkot izejas kodu

handle transport errors in mux session

Darien Raymond 7 gadi atpakaļ
vecāks
revīzija
90c6113dfc
3 mainītis faili ar 48 papildinājumiem un 2 dzēšanām
  1. 1 0
      app/proxyman/mux/frame.go
  2. 34 2
      app/proxyman/mux/mux.go
  3. 13 0
      app/proxyman/mux/writer.go

+ 1 - 0
app/proxyman/mux/frame.go

@@ -15,6 +15,7 @@ const (
 	SessionStatusKeep      SessionStatus = 0x02
 	SessionStatusEnd       SessionStatus = 0x03
 	SessionStatusKeepAlive SessionStatus = 0x04
+	SessionStatusError     SessionStatus = 0x05
 )
 
 const (

+ 34 - 2
app/proxyman/mux/mux.go

@@ -146,12 +146,14 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
 	}
 	s.transferType = transferType
 	writer := NewWriter(s.ID, dest, output, transferType)
-	defer writer.Close()
 	defer s.Close()
 
 	newError("dispatching request to ", dest).WithContext(ctx).WriteToLog()
 	if err := buf.Copy(s.input, writer); err != nil {
 		newError("failed to fetch all input").Base(err).WithContext(ctx).WriteToLog()
+		writer.Error()
+	} else {
+		writer.Close()
 	}
 }
 
@@ -214,6 +216,18 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader
 	return nil
 }
 
+func (m *Client) handleStatusError(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if s, found := m.sessionManager.Get(meta.SessionID); found {
+		s.output.CloseError()
+		s.input.CloseError()
+		s.Close()
+	}
+	if meta.Option.Has(OptionData) {
+		return drain(reader)
+	}
+	return nil
+}
+
 func (m *Client) fetchOutput() {
 	defer m.done.Close()
 
@@ -237,6 +251,8 @@ func (m *Client) fetchOutput() {
 			err = m.handleStatusNew(meta, reader)
 		case SessionStatusKeep:
 			err = m.handleStatusKeep(meta, reader)
+		case SessionStatusError:
+			err = m.handleStatusError(meta, reader)
 		default:
 			newError("unknown status: ", meta.SessionStatus).AtError().WriteToLog()
 			return
@@ -294,8 +310,10 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
 	writer := NewResponseWriter(s.ID, output, s.transferType)
 	if err := buf.Copy(s.input, writer); err != nil {
 		newError("session ", s.ID, " ends.").Base(err).WithContext(ctx).WriteToLog()
+		writer.Error()
+	} else {
+		writer.Close()
 	}
-	writer.Close()
 	s.Close()
 }
 
@@ -353,6 +371,18 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.Buffered
 	return nil
 }
 
+func (w *ServerWorker) handleStatusError(meta *FrameMetadata, reader *buf.BufferedReader) error {
+	if s, found := w.sessionManager.Get(meta.SessionID); found {
+		s.input.CloseError()
+		s.output.CloseError()
+		s.Close()
+	}
+	if meta.Option.Has(OptionData) {
+		return drain(reader)
+	}
+	return nil
+}
+
 func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error {
 	meta, err := ReadMetadata(reader)
 	if err != nil {
@@ -368,6 +398,8 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead
 		err = w.handleStatusNew(ctx, meta, reader)
 	case SessionStatusKeep:
 		err = w.handleStatusKeep(meta, reader)
+	case SessionStatusError:
+		err = w.handleStatusError(meta, reader)
 	default:
 		return newError("unknown status: ", meta.SessionStatus).AtError()
 	}

+ 13 - 0
app/proxyman/mux/writer.go

@@ -112,3 +112,16 @@ func (w *Writer) Close() error {
 	w.writer.WriteMultiBuffer(buf.NewMultiBufferValue(frame))
 	return nil
 }
+
+func (w *Writer) Error() error {
+	meta := FrameMetadata{
+		SessionID:     w.id,
+		SessionStatus: SessionStatusError,
+	}
+
+	frame := buf.New()
+	common.Must(meta.WriteTo(frame))
+
+	w.writer.WriteMultiBuffer(buf.NewMultiBufferValue(frame))
+	return nil
+}