|
|
@@ -144,6 +144,19 @@ func (m *Client) monitor() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func copyFirstPayload(reader *pipe.Reader, writer *Writer) error {
|
|
|
+ data, err := reader.ReadMultiBufferWithTimeout(time.Millisecond * 200)
|
|
|
+ if err == buf.ErrReadTimeout {
|
|
|
+ return writer.writeMetaOnly()
|
|
|
+ }
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ return writer.WriteMultiBuffer(data)
|
|
|
+}
|
|
|
+
|
|
|
func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
|
|
|
dest, _ := proxy.TargetFromContext(ctx)
|
|
|
transferType := protocol.TransferTypeStream
|
|
|
@@ -153,14 +166,22 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
|
|
|
s.transferType = transferType
|
|
|
writer := NewWriter(s.ID, dest, output, transferType)
|
|
|
defer s.Close()
|
|
|
+ defer writer.Close()
|
|
|
|
|
|
newError("dispatching request to ", dest).WithContext(ctx).WriteToLog()
|
|
|
+ if pReader, ok := s.input.(*pipe.Reader); ok {
|
|
|
+ if err := copyFirstPayload(pReader, writer); err != nil {
|
|
|
+ newError("failed to fetch first payload").Base(err).WithContext(ctx).WriteToLog()
|
|
|
+ writer.hasError = true
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if err := buf.Copy(s.input, writer); err != nil {
|
|
|
newError("failed to fetch all input").Base(err).WithContext(ctx).WriteToLog()
|
|
|
writer.hasError = true
|
|
|
+ return
|
|
|
}
|
|
|
-
|
|
|
- writer.Close()
|
|
|
}
|
|
|
|
|
|
func (m *Client) Dispatch(ctx context.Context, link *core.Link) bool {
|