merge_reader.go 754 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. package buf
  2. type MergingReader struct {
  3. reader Reader
  4. timeoutReader TimeoutReader
  5. leftover *Buffer
  6. }
  7. func NewMergingReader(reader Reader) Reader {
  8. return &MergingReader{
  9. reader: reader,
  10. timeoutReader: reader.(TimeoutReader),
  11. }
  12. }
  13. func (r *MergingReader) Read() (*Buffer, error) {
  14. if r.leftover != nil {
  15. return r.leftover, nil
  16. }
  17. b, err := r.reader.Read()
  18. if err != nil {
  19. return nil, err
  20. }
  21. if b.IsFull() {
  22. return b, nil
  23. }
  24. if r.timeoutReader == nil {
  25. return b, nil
  26. }
  27. for {
  28. b2, err := r.timeoutReader.ReadTimeout(0)
  29. if err != nil {
  30. break
  31. }
  32. nBytes := b.Append(b2.Bytes())
  33. b2.SliceFrom(nBytes)
  34. if b2.IsEmpty() {
  35. b2.Release()
  36. } else {
  37. r.leftover = b2
  38. break
  39. }
  40. }
  41. return b, nil
  42. }