Prechádzať zdrojové kódy

refine copy handler

Darien Raymond 8 rokov pred
rodič
commit
07847576b5
2 zmenil súbory, kde vykonal 100 pridanie a 88 odobranie
  1. 100 0
      common/buf/copy.go
  2. 0 88
      common/buf/io.go

+ 100 - 0
common/buf/copy.go

@@ -0,0 +1,100 @@
+package buf
+
+import (
+	"io"
+
+	"v2ray.com/core/common/errors"
+	"v2ray.com/core/common/signal"
+)
+
+type errorHandler func(error) error
+type dataHandler func(MultiBuffer)
+
+type copyHandler struct {
+	onReadError  []errorHandler
+	onData       []dataHandler
+	onWriteError []errorHandler
+}
+
+func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) {
+	mb, err := reader.Read()
+	if err != nil {
+		for _, handler := range h.onReadError {
+			err = handler(err)
+		}
+	}
+	return mb, err
+}
+
+func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error {
+	err := writer.Write(mb)
+	if err != nil {
+		for _, handler := range h.onWriteError {
+			err = handler(err)
+		}
+	}
+	return err
+}
+
+type CopyOption func(*copyHandler)
+
+func IgnoreReaderError() CopyOption {
+	return func(handler *copyHandler) {
+		handler.onReadError = append(handler.onReadError, func(err error) error {
+			return nil
+		})
+	}
+}
+
+func IgnoreWriterError() CopyOption {
+	return func(handler *copyHandler) {
+		handler.onWriteError = append(handler.onWriteError, func(err error) error {
+			return nil
+		})
+	}
+}
+
+func UpdateActivity(timer signal.ActivityTimer) CopyOption {
+	return func(handler *copyHandler) {
+		handler.onData = append(handler.onData, func(MultiBuffer) {
+			timer.Update()
+		})
+	}
+}
+
+func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
+	for {
+		buffer, err := handler.readFrom(reader)
+		if err != nil {
+			return err
+		}
+
+		if buffer.IsEmpty() {
+			buffer.Release()
+			continue
+		}
+
+		for _, handler := range handler.onData {
+			handler(buffer)
+		}
+
+		if err := handler.writeTo(writer, buffer); err != nil {
+			buffer.Release()
+			return err
+		}
+	}
+}
+
+// Copy dumps all payload from reader to writer or stops when an error occurs.
+// ActivityTimer gets updated as soon as there is a payload.
+func Copy(reader Reader, writer Writer, options ...CopyOption) error {
+	handler := new(copyHandler)
+	for _, option := range options {
+		option(handler)
+	}
+	err := copyInternal(reader, writer, handler)
+	if err != nil && errors.Cause(err) != io.EOF {
+		return err
+	}
+	return nil
+}

+ 0 - 88
common/buf/io.go

@@ -3,9 +3,6 @@ package buf
 import (
 	"io"
 	"time"
-
-	"v2ray.com/core/common/errors"
-	"v2ray.com/core/common/signal"
 )
 
 // Reader extends io.Reader with alloc.Buffer.
@@ -47,91 +44,6 @@ func ReadAtLeastFrom(reader io.Reader, size int) Supplier {
 	}
 }
 
-type copyHandler struct {
-	onReadError  func(error) error
-	onData       func()
-	onWriteError func(error) error
-}
-
-func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) {
-	mb, err := reader.Read()
-	if err != nil && h.onReadError != nil {
-		err = h.onReadError(err)
-	}
-	return mb, err
-}
-
-func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error {
-	err := writer.Write(mb)
-	if err != nil && h.onWriteError != nil {
-		err = h.onWriteError(err)
-	}
-	return err
-}
-
-type CopyOption func(*copyHandler)
-
-func IgnoreReaderError() CopyOption {
-	return func(handler *copyHandler) {
-		handler.onReadError = func(err error) error {
-			return nil
-		}
-	}
-}
-
-func IgnoreWriterError() CopyOption {
-	return func(handler *copyHandler) {
-		handler.onWriteError = func(err error) error {
-			return nil
-		}
-	}
-}
-
-func UpdateActivity(timer signal.ActivityTimer) CopyOption {
-	return func(handler *copyHandler) {
-		handler.onData = func() {
-			timer.Update()
-		}
-	}
-}
-
-func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
-	for {
-		buffer, err := handler.readFrom(reader)
-		if err != nil {
-			return err
-		}
-
-		if buffer.IsEmpty() {
-			buffer.Release()
-			continue
-		}
-
-		if handler.onData != nil {
-			handler.onData()
-		}
-
-		if err := handler.writeTo(writer, buffer); err != nil {
-			buffer.Release()
-			return err
-		}
-	}
-}
-
-// Copy dumps all payload from reader to writer or stops when an error occurs.
-// ActivityTimer gets updated as soon as there is a payload.
-func Copy(reader Reader, writer Writer, options ...CopyOption) error {
-	handler := new(copyHandler)
-	for _, option := range options {
-		option(handler)
-	}
-	err := copyInternal(reader, writer, handler)
-	if err != nil && errors.Cause(err) != io.EOF {
-		return err
-	}
-	return nil
-}
-
 // NewReader creates a new Reader.
 // The Reader instance doesn't take the ownership of reader.
 func NewReader(reader io.Reader) Reader {