reader.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package buf
  2. import (
  3. "io"
  4. "sync"
  5. )
  6. // BytesToBufferReader is a Reader that adjusts its reading speed automatically.
  7. type BytesToBufferReader struct {
  8. reader io.Reader
  9. largeBuffer *Buffer
  10. highVolumn bool
  11. }
  12. // Read implements Reader.Read().
  13. func (v *BytesToBufferReader) Read() (*Buffer, error) {
  14. if v.highVolumn && v.largeBuffer.IsEmpty() {
  15. if v.largeBuffer == nil {
  16. v.largeBuffer = NewLocal(32 * 1024)
  17. }
  18. err := v.largeBuffer.AppendSupplier(ReadFrom(v.reader))
  19. if err != nil {
  20. return nil, err
  21. }
  22. if v.largeBuffer.Len() < Size {
  23. v.highVolumn = false
  24. }
  25. }
  26. buffer := New()
  27. if !v.largeBuffer.IsEmpty() {
  28. buffer.AppendSupplier(ReadFrom(v.largeBuffer))
  29. return buffer, nil
  30. }
  31. err := buffer.AppendSupplier(ReadFrom(v.reader))
  32. if err != nil {
  33. buffer.Release()
  34. return nil, err
  35. }
  36. if buffer.IsFull() {
  37. v.highVolumn = true
  38. }
  39. return buffer, nil
  40. }
  41. // Release implements Releasable.Release().
  42. func (v *BytesToBufferReader) Release() {
  43. v.reader = nil
  44. }
  45. type BufferToBytesReader struct {
  46. sync.Mutex
  47. stream Reader
  48. current *Buffer
  49. eof bool
  50. }
  51. // Private: Visible for testing.
  52. func (v *BufferToBytesReader) Fill() {
  53. b, err := v.stream.Read()
  54. v.current = b
  55. if err != nil {
  56. v.eof = true
  57. v.current = nil
  58. }
  59. }
  60. func (v *BufferToBytesReader) Read(b []byte) (int, error) {
  61. if v.eof {
  62. return 0, io.EOF
  63. }
  64. v.Lock()
  65. defer v.Unlock()
  66. if v.current == nil {
  67. v.Fill()
  68. if v.eof {
  69. return 0, io.EOF
  70. }
  71. }
  72. nBytes, err := v.current.Read(b)
  73. if v.current.IsEmpty() {
  74. v.current.Release()
  75. v.current = nil
  76. }
  77. return nBytes, err
  78. }
  79. func (v *BufferToBytesReader) Release() {
  80. v.Lock()
  81. defer v.Unlock()
  82. v.eof = true
  83. v.current.Release()
  84. v.current = nil
  85. v.stream = nil
  86. }