copy.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package buf
  2. import (
  3. "io"
  4. "v2ray.com/core/common/errors"
  5. "v2ray.com/core/common/signal"
  6. )
  7. type errorHandler func(error) error
  8. type dataHandler func(MultiBuffer)
  9. type copyHandler struct {
  10. onReadError []errorHandler
  11. onData []dataHandler
  12. onWriteError []errorHandler
  13. }
  14. func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) {
  15. mb, err := reader.ReadMultiBuffer()
  16. if err != nil {
  17. for _, handler := range h.onReadError {
  18. err = handler(err)
  19. }
  20. }
  21. return mb, err
  22. }
  23. func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error {
  24. err := writer.WriteMultiBuffer(mb)
  25. if err != nil {
  26. for _, handler := range h.onWriteError {
  27. err = handler(err)
  28. }
  29. }
  30. return err
  31. }
  32. // SizeCounter is for counting bytes copied by Copy().
  33. type SizeCounter struct {
  34. Size int64
  35. }
  36. // CopyOption is an option for copying data.
  37. type CopyOption func(*copyHandler)
  38. // IgnoreReaderError is a CopyOption that ignores errors from reader. Copy will continue in such case.
  39. func IgnoreReaderError() CopyOption {
  40. return func(handler *copyHandler) {
  41. handler.onReadError = append(handler.onReadError, func(err error) error {
  42. return nil
  43. })
  44. }
  45. }
  46. // IgnoreWriterError is a CopyOption that ignores errors from writer. Copy will continue in such case.
  47. func IgnoreWriterError() CopyOption {
  48. return func(handler *copyHandler) {
  49. handler.onWriteError = append(handler.onWriteError, func(err error) error {
  50. return nil
  51. })
  52. }
  53. }
  54. // UpdateActivity is a CopyOption to update activity on each data copy operation.
  55. func UpdateActivity(timer signal.ActivityUpdater) CopyOption {
  56. return func(handler *copyHandler) {
  57. handler.onData = append(handler.onData, func(MultiBuffer) {
  58. timer.Update()
  59. })
  60. }
  61. }
  62. // CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter.
  63. func CountSize(sc *SizeCounter) CopyOption {
  64. return func(handler *copyHandler) {
  65. handler.onData = append(handler.onData, func(b MultiBuffer) {
  66. sc.Size += int64(b.Len())
  67. })
  68. }
  69. }
  70. func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
  71. for {
  72. buffer, err := handler.readFrom(reader)
  73. if !buffer.IsEmpty() {
  74. for _, handler := range handler.onData {
  75. handler(buffer)
  76. }
  77. if werr := handler.writeTo(writer, buffer); werr != nil {
  78. buffer.Release()
  79. return werr
  80. }
  81. } else if err != nil {
  82. return err
  83. }
  84. }
  85. }
  86. // Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF.
  87. func Copy(reader Reader, writer Writer, options ...CopyOption) error {
  88. handler := new(copyHandler)
  89. for _, option := range options {
  90. option(handler)
  91. }
  92. err := copyInternal(reader, writer, handler)
  93. if err != nil && errors.Cause(err) != io.EOF {
  94. return err
  95. }
  96. return nil
  97. }