reader.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package buf
  2. import (
  3. "io"
  4. "sync"
  5. )
  6. // BytesToBufferReader is a Reader that adjusts its reading speed automatically.
  7. type BytesToBufferReader struct {
  8. reader io.Reader
  9. largeBuffer *Buffer
  10. highVolumn bool
  11. }
  12. // Read implements Reader.Read().
  13. func (v *BytesToBufferReader) Read() (*Buffer, error) {
  14. if v.highVolumn && v.largeBuffer.IsEmpty() {
  15. if v.largeBuffer == nil {
  16. v.largeBuffer = NewLocal(32 * 1024)
  17. }
  18. err := v.largeBuffer.AppendSupplier(ReadFrom(v.reader))
  19. if err != nil {
  20. return nil, err
  21. }
  22. if v.largeBuffer.Len() < Size {
  23. v.highVolumn = false
  24. }
  25. }
  26. buffer := New()
  27. if !v.largeBuffer.IsEmpty() {
  28. err := buffer.AppendSupplier(ReadFrom(v.largeBuffer))
  29. return buffer, err
  30. }
  31. err := buffer.AppendSupplier(ReadFrom(v.reader))
  32. if err != nil {
  33. buffer.Release()
  34. return nil, err
  35. }
  36. if buffer.IsFull() {
  37. v.highVolumn = true
  38. }
  39. return buffer, nil
  40. }
  41. // Release implements Releasable.Release().
  42. func (v *BytesToBufferReader) Release() {
  43. v.reader = nil
  44. }
  45. type BufferToBytesReader struct {
  46. sync.Mutex
  47. stream Reader
  48. current *Buffer
  49. eof bool
  50. }
  51. // Fill fills in the internal buffer.
  52. // Private: Visible for testing.
  53. func (v *BufferToBytesReader) Fill() {
  54. b, err := v.stream.Read()
  55. v.current = b
  56. if err != nil {
  57. v.eof = true
  58. v.current = nil
  59. }
  60. }
  61. func (v *BufferToBytesReader) Read(b []byte) (int, error) {
  62. if v.eof {
  63. return 0, io.EOF
  64. }
  65. v.Lock()
  66. defer v.Unlock()
  67. if v.current == nil {
  68. v.Fill()
  69. if v.eof {
  70. return 0, io.EOF
  71. }
  72. }
  73. nBytes, err := v.current.Read(b)
  74. if v.current.IsEmpty() {
  75. v.current.Release()
  76. v.current = nil
  77. }
  78. return nBytes, err
  79. }
  80. // Release implements Releasable.Release().
  81. func (v *BufferToBytesReader) Release() {
  82. v.Lock()
  83. defer v.Unlock()
  84. v.eof = true
  85. v.current.Release()
  86. v.current = nil
  87. v.stream = nil
  88. }