impl.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package pipe
  2. import (
  3. "io"
  4. "sync"
  5. "time"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/common/errors"
  8. "v2ray.com/core/common/signal"
  9. )
  10. type state byte
  11. const (
  12. open state = iota
  13. closed
  14. errord
  15. )
  16. type pipe struct {
  17. sync.Mutex
  18. data buf.MultiBuffer
  19. readSignal *signal.Notifier
  20. writeSignal *signal.Notifier
  21. limit int32
  22. state state
  23. }
  24. func (p *pipe) getState(forRead bool) error {
  25. switch p.state {
  26. case open:
  27. return nil
  28. case closed:
  29. if forRead {
  30. if !p.data.IsEmpty() {
  31. return nil
  32. }
  33. return io.EOF
  34. }
  35. return io.ErrClosedPipe
  36. case errord:
  37. return io.ErrClosedPipe
  38. default:
  39. panic("impossible case")
  40. }
  41. }
  42. func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) {
  43. p.Lock()
  44. defer p.Unlock()
  45. if err := p.getState(true); err != nil {
  46. return nil, err
  47. }
  48. data := p.data
  49. p.data = nil
  50. return data, nil
  51. }
  52. func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
  53. for {
  54. data, err := p.readMultiBufferInternal()
  55. if data != nil || err != nil {
  56. return data, err
  57. }
  58. <-p.readSignal.Wait()
  59. }
  60. }
  61. var ErrTimeout = errors.New("Timeout on reading pipeline.")
  62. func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) {
  63. timer := time.After(d)
  64. for {
  65. data, err := p.readMultiBufferInternal()
  66. if data != nil || err != nil {
  67. p.writeSignal.Signal()
  68. return data, err
  69. }
  70. select {
  71. case <-p.readSignal.Wait():
  72. case <-timer:
  73. return nil, ErrTimeout
  74. }
  75. }
  76. }
  77. func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
  78. p.Lock()
  79. defer p.Unlock()
  80. if err := p.getState(false); err != nil {
  81. return err
  82. }
  83. p.data.AppendMulti(mb)
  84. return nil
  85. }
  86. func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
  87. if mb.IsEmpty() {
  88. return nil
  89. }
  90. for {
  91. if p.limit < 0 || p.data.Len()+mb.Len() <= p.limit {
  92. defer p.readSignal.Signal()
  93. return p.writeMultiBufferInternal(mb)
  94. }
  95. <-p.writeSignal.Wait()
  96. }
  97. }
  98. func (p *pipe) Close() error {
  99. p.Lock()
  100. defer p.Unlock()
  101. p.state = closed
  102. p.readSignal.Signal()
  103. p.writeSignal.Signal()
  104. return nil
  105. }
  106. func (p *pipe) CloseError() {
  107. p.Lock()
  108. defer p.Unlock()
  109. p.state = errord
  110. if !p.data.IsEmpty() {
  111. p.data.Release()
  112. p.data = nil
  113. }
  114. p.readSignal.Signal()
  115. p.writeSignal.Signal()
  116. }