copy.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package buf
  2. import (
  3. "io"
  4. "time"
  5. "v2ray.com/core/common/errors"
  6. "v2ray.com/core/common/signal"
  7. )
  8. type errorHandler func(error) error
  9. type dataHandler func(MultiBuffer)
  10. type copyHandler struct {
  11. onReadError []errorHandler
  12. onData []dataHandler
  13. onWriteError []errorHandler
  14. }
  15. func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) {
  16. mb, err := reader.ReadMultiBuffer()
  17. if err != nil {
  18. for _, handler := range h.onReadError {
  19. err = handler(err)
  20. }
  21. }
  22. return mb, err
  23. }
  24. func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error {
  25. err := writer.WriteMultiBuffer(mb)
  26. if err != nil {
  27. for _, handler := range h.onWriteError {
  28. err = handler(err)
  29. }
  30. }
  31. return err
  32. }
  33. // SizeCounter is for counting bytes copied by Copy().
  34. type SizeCounter struct {
  35. Size int64
  36. }
  37. // CopyOption is an option for copying data.
  38. type CopyOption func(*copyHandler)
  39. // UpdateActivity is a CopyOption to update activity on each data copy operation.
  40. func UpdateActivity(timer signal.ActivityUpdater) CopyOption {
  41. return func(handler *copyHandler) {
  42. handler.onData = append(handler.onData, func(MultiBuffer) {
  43. timer.Update()
  44. })
  45. }
  46. }
  47. // CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter.
  48. func CountSize(sc *SizeCounter) CopyOption {
  49. return func(handler *copyHandler) {
  50. handler.onData = append(handler.onData, func(b MultiBuffer) {
  51. sc.Size += int64(b.Len())
  52. })
  53. }
  54. }
  55. type readError struct {
  56. error
  57. }
  58. func (e readError) Error() string {
  59. return e.error.Error()
  60. }
  61. func (e readError) Inner() error {
  62. return e.error
  63. }
  64. func IsReadError(err error) bool {
  65. _, ok := err.(readError)
  66. return ok
  67. }
  68. type writeError struct {
  69. error
  70. }
  71. func (e writeError) Error() string {
  72. return e.error.Error()
  73. }
  74. func (e writeError) Inner() error {
  75. return e.error
  76. }
  77. func IsWriteError(err error) bool {
  78. _, ok := err.(writeError)
  79. return ok
  80. }
  81. func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
  82. for {
  83. buffer, err := handler.readFrom(reader)
  84. if !buffer.IsEmpty() {
  85. for _, handler := range handler.onData {
  86. handler(buffer)
  87. }
  88. if werr := handler.writeTo(writer, buffer); werr != nil {
  89. return writeError{werr}
  90. }
  91. }
  92. if err != nil {
  93. return readError{err}
  94. }
  95. }
  96. }
  97. // Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF.
  98. func Copy(reader Reader, writer Writer, options ...CopyOption) error {
  99. var handler copyHandler
  100. for _, option := range options {
  101. option(&handler)
  102. }
  103. err := copyInternal(reader, writer, &handler)
  104. if err != nil && errors.Cause(err) != io.EOF {
  105. return err
  106. }
  107. return nil
  108. }
  109. var ErrNotTimeoutReader = newError("not a TimeoutReader")
  110. func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error {
  111. timeoutReader, ok := reader.(TimeoutReader)
  112. if !ok {
  113. return ErrNotTimeoutReader
  114. }
  115. mb, err := timeoutReader.ReadMultiBufferTimeout(timeout)
  116. if err != nil {
  117. return err
  118. }
  119. return writer.WriteMultiBuffer(mb)
  120. }