chan_reader.go 859 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package io
  2. import (
  3. "io"
  4. "sync"
  5. "v2ray.com/core/common/buf"
  6. )
  7. type ChanReader struct {
  8. sync.Mutex
  9. stream Reader
  10. current *buf.Buffer
  11. eof bool
  12. }
  13. func NewChanReader(stream Reader) *ChanReader {
  14. return &ChanReader{
  15. stream: stream,
  16. }
  17. }
  18. // Private: Visible for testing.
  19. func (v *ChanReader) Fill() {
  20. b, err := v.stream.Read()
  21. v.current = b
  22. if err != nil {
  23. v.eof = true
  24. v.current = nil
  25. }
  26. }
  27. func (v *ChanReader) Read(b []byte) (int, error) {
  28. if v.eof {
  29. return 0, io.EOF
  30. }
  31. v.Lock()
  32. defer v.Unlock()
  33. if v.current == nil {
  34. v.Fill()
  35. if v.eof {
  36. return 0, io.EOF
  37. }
  38. }
  39. nBytes, err := v.current.Read(b)
  40. if v.current.IsEmpty() {
  41. v.current.Release()
  42. v.current = nil
  43. }
  44. return nBytes, err
  45. }
  46. func (v *ChanReader) Release() {
  47. v.Lock()
  48. defer v.Unlock()
  49. v.eof = true
  50. v.current.Release()
  51. v.current = nil
  52. v.stream = nil
  53. }