Pārlūkot izejas kodu

fix error handling in ray

Darien Raymond 7 gadi atpakaļ
vecāks
revīzija
f1231822f7
1 mainītis faili ar 10 papildinājumiem un 8 dzēšanām
  1. 10 8
      transport/ray/direct.go

+ 10 - 8
transport/ray/direct.go

@@ -85,14 +85,14 @@ func (s *Stream) getData() (buf.MultiBuffer, error) {
 		return mb, nil
 	}
 
-	if s.close {
-		return nil, io.EOF
-	}
-
 	if s.err {
 		return nil, io.ErrClosedPipe
 	}
 
+	if s.close {
+		return nil, io.EOF
+	}
+
 	return nil, nil
 }
 
@@ -121,7 +121,7 @@ func (s *Stream) ReadMultiBuffer() (buf.MultiBuffer, error) {
 
 		select {
 		case <-s.ctx.Done():
-			return nil, io.EOF
+			return nil, s.ctx.Err()
 		case <-s.writeSignal.Wait():
 		}
 	}
@@ -142,7 +142,7 @@ func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) {
 
 		select {
 		case <-s.ctx.Done():
-			return nil, io.EOF
+			return nil, s.ctx.Err()
 		case <-time.After(timeout):
 			return nil, buf.ErrReadTimeout
 		case <-s.writeSignal.Wait():
@@ -167,7 +167,7 @@ func (s *Stream) waitForStreamSize() error {
 	for s.Size() >= streamSizeLimit {
 		select {
 		case <-s.ctx.Done():
-			return io.ErrClosedPipe
+			return s.ctx.Err()
 		case <-s.readSignal.Wait():
 			if s.err || s.close {
 				return io.ErrClosedPipe
@@ -227,7 +227,9 @@ func (s *Stream) CloseError() {
 		s.data = nil
 		s.size = 0
 	}
+	s.access.Unlock()
+
 	s.readSignal.Signal()
 	s.writeSignal.Signal()
-	s.access.Unlock()
+
 }