chan_reader.go 932 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package io
  2. import (
  3. "io"
  4. "sync"
  5. "v2ray.com/core/common/alloc"
  6. )
  7. type ChanReader struct {
  8. sync.Mutex
  9. stream Reader
  10. current *alloc.Buffer
  11. eof bool
  12. }
  13. func NewChanReader(stream Reader) *ChanReader {
  14. return &ChanReader{
  15. stream: stream,
  16. }
  17. }
  18. // Private: Visible for testing.
  19. func (this *ChanReader) Fill() {
  20. b, err := this.stream.Read()
  21. this.current = b
  22. if err != nil {
  23. this.eof = true
  24. this.current = nil
  25. }
  26. }
  27. func (this *ChanReader) Read(b []byte) (int, error) {
  28. if this.eof {
  29. return 0, io.EOF
  30. }
  31. this.Lock()
  32. defer this.Unlock()
  33. if this.current == nil {
  34. this.Fill()
  35. if this.eof {
  36. return 0, io.EOF
  37. }
  38. }
  39. nBytes, err := this.current.Read(b)
  40. if this.current.IsEmpty() {
  41. this.current.Release()
  42. this.current = nil
  43. }
  44. return nBytes, err
  45. }
  46. func (this *ChanReader) Release() {
  47. this.Lock()
  48. defer this.Unlock()
  49. this.eof = true
  50. this.current.Release()
  51. this.current = nil
  52. this.stream = nil
  53. }