reader.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package buf
  2. import "io"
  3. // BytesToBufferReader is a Reader that adjusts its reading speed automatically.
  4. type BytesToBufferReader struct {
  5. reader io.Reader
  6. largeBuffer *Buffer
  7. highVolumn bool
  8. }
  9. // Read implements Reader.Read().
  10. func (v *BytesToBufferReader) Read() (*Buffer, error) {
  11. if v.highVolumn && v.largeBuffer.IsEmpty() {
  12. if v.largeBuffer == nil {
  13. v.largeBuffer = NewLocal(32 * 1024)
  14. }
  15. err := v.largeBuffer.AppendSupplier(ReadFrom(v.reader))
  16. if err != nil {
  17. return nil, err
  18. }
  19. if v.largeBuffer.Len() < Size {
  20. v.highVolumn = false
  21. }
  22. }
  23. buffer := New()
  24. if !v.largeBuffer.IsEmpty() {
  25. err := buffer.AppendSupplier(ReadFrom(v.largeBuffer))
  26. return buffer, err
  27. }
  28. err := buffer.AppendSupplier(ReadFrom(v.reader))
  29. if err != nil {
  30. buffer.Release()
  31. return nil, err
  32. }
  33. if buffer.IsFull() {
  34. v.highVolumn = true
  35. }
  36. return buffer, nil
  37. }
  38. // Release implements Releasable.Release().
  39. func (v *BytesToBufferReader) Release() {
  40. v.reader = nil
  41. }
  42. type BufferToBytesReader struct {
  43. stream Reader
  44. current *Buffer
  45. eof bool
  46. }
  47. // Fill fills in the internal buffer.
  48. // Private: Visible for testing.
  49. func (v *BufferToBytesReader) Fill() {
  50. b, err := v.stream.Read()
  51. v.current = b
  52. if err != nil {
  53. v.eof = true
  54. v.current = nil
  55. }
  56. }
  57. func (v *BufferToBytesReader) Read(b []byte) (int, error) {
  58. if v.eof {
  59. return 0, io.EOF
  60. }
  61. if v.current == nil {
  62. v.Fill()
  63. if v.eof {
  64. return 0, io.EOF
  65. }
  66. }
  67. nBytes, err := v.current.Read(b)
  68. if v.current.IsEmpty() {
  69. v.current.Release()
  70. v.current = nil
  71. }
  72. return nBytes, err
  73. }
  74. // Release implements Releasable.Release().
  75. func (v *BufferToBytesReader) Release() {
  76. v.eof = true
  77. v.current.Release()
  78. v.current = nil
  79. }