merge_reader.go 782 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. package buf
  2. type MergingReader struct {
  3. reader Reader
  4. timeoutReader TimeoutReader
  5. leftover *Buffer
  6. }
  7. func NewMergingReader(reader Reader) Reader {
  8. return &MergingReader{
  9. reader: reader,
  10. timeoutReader: reader.(TimeoutReader),
  11. }
  12. }
  13. func (r *MergingReader) Read() (*Buffer, error) {
  14. if r.leftover != nil {
  15. b := r.leftover
  16. r.leftover = nil
  17. return b, nil
  18. }
  19. b, err := r.reader.Read()
  20. if err != nil {
  21. return nil, err
  22. }
  23. if b.IsFull() {
  24. return b, nil
  25. }
  26. if r.timeoutReader == nil {
  27. return b, nil
  28. }
  29. for {
  30. b2, err := r.timeoutReader.ReadTimeout(0)
  31. if err != nil {
  32. break
  33. }
  34. nBytes := b.Append(b2.Bytes())
  35. b2.SliceFrom(nBytes)
  36. if b2.IsEmpty() {
  37. b2.Release()
  38. } else {
  39. r.leftover = b2
  40. break
  41. }
  42. }
  43. return b, nil
  44. }