copy.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package buf
  2. import (
  3. "io"
  4. "time"
  5. "unsafe"
  6. "v2ray.com/core/common/errors"
  7. "v2ray.com/core/common/signal"
  8. )
  9. type dataHandler func(MultiBuffer)
  10. //go:notinheap
  11. type copyHandler struct {
  12. onData []dataHandler
  13. }
  14. // SizeCounter is for counting bytes copied by Copy().
  15. type SizeCounter struct {
  16. Size int64
  17. }
  18. // CopyOption is an option for copying data.
  19. type CopyOption func(*copyHandler)
  20. // UpdateActivity is a CopyOption to update activity on each data copy operation.
  21. func UpdateActivity(timer signal.ActivityUpdater) CopyOption {
  22. return func(handler *copyHandler) {
  23. handler.onData = append(handler.onData, func(MultiBuffer) {
  24. timer.Update()
  25. })
  26. }
  27. }
  28. // CountSize is a CopyOption that sums the total size of data copied into the given SizeCounter.
  29. func CountSize(sc *SizeCounter) CopyOption {
  30. return func(handler *copyHandler) {
  31. handler.onData = append(handler.onData, func(b MultiBuffer) {
  32. sc.Size += int64(b.Len())
  33. })
  34. }
  35. }
  36. type readError struct {
  37. error
  38. }
  39. func (e readError) Error() string {
  40. return e.error.Error()
  41. }
  42. func (e readError) Inner() error {
  43. return e.error
  44. }
  45. func IsReadError(err error) bool {
  46. _, ok := err.(readError)
  47. return ok
  48. }
  49. type writeError struct {
  50. error
  51. }
  52. func (e writeError) Error() string {
  53. return e.error.Error()
  54. }
  55. func (e writeError) Inner() error {
  56. return e.error
  57. }
  58. func IsWriteError(err error) bool {
  59. _, ok := err.(writeError)
  60. return ok
  61. }
  62. func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
  63. for {
  64. buffer, err := reader.ReadMultiBuffer()
  65. if !buffer.IsEmpty() {
  66. for _, handler := range handler.onData {
  67. handler(buffer)
  68. }
  69. if werr := writer.WriteMultiBuffer(buffer); werr != nil {
  70. return writeError{werr}
  71. }
  72. }
  73. if err != nil {
  74. return readError{err}
  75. }
  76. }
  77. }
  78. // Copy dumps all payload from reader to writer or stops when an error occurs. It returns nil when EOF.
  79. func Copy(reader Reader, writer Writer, options ...CopyOption) error {
  80. var handler copyHandler
  81. p := uintptr(unsafe.Pointer(&handler))
  82. h := (*copyHandler)(unsafe.Pointer(p))
  83. for _, option := range options {
  84. option(h)
  85. }
  86. err := copyInternal(reader, writer, h)
  87. if err != nil && errors.Cause(err) != io.EOF {
  88. return err
  89. }
  90. return nil
  91. }
  92. var ErrNotTimeoutReader = newError("not a TimeoutReader")
  93. func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error {
  94. timeoutReader, ok := reader.(TimeoutReader)
  95. if !ok {
  96. return ErrNotTimeoutReader
  97. }
  98. mb, err := timeoutReader.ReadMultiBufferTimeout(timeout)
  99. if err != nil {
  100. return err
  101. }
  102. return writer.WriteMultiBuffer(mb)
  103. }