impl.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package pipe
  2. import (
  3. "io"
  4. "sync"
  5. "time"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/common/signal"
  8. )
  9. type state byte
  10. const (
  11. open state = iota
  12. closed
  13. errord
  14. )
  15. type pipe struct {
  16. sync.Mutex
  17. data buf.MultiBuffer
  18. readSignal *signal.Notifier
  19. writeSignal *signal.Notifier
  20. limit int32
  21. state state
  22. }
  23. func (p *pipe) getState(forRead bool) error {
  24. switch p.state {
  25. case open:
  26. return nil
  27. case closed:
  28. if forRead {
  29. if !p.data.IsEmpty() {
  30. return nil
  31. }
  32. return io.EOF
  33. }
  34. return io.ErrClosedPipe
  35. case errord:
  36. return io.ErrClosedPipe
  37. default:
  38. panic("impossible case")
  39. }
  40. }
  41. func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) {
  42. p.Lock()
  43. defer p.Unlock()
  44. if err := p.getState(true); err != nil {
  45. return nil, err
  46. }
  47. data := p.data
  48. p.data = nil
  49. return data, nil
  50. }
  51. func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
  52. for {
  53. data, err := p.readMultiBufferInternal()
  54. if data != nil || err != nil {
  55. p.writeSignal.Signal()
  56. return data, err
  57. }
  58. <-p.readSignal.Wait()
  59. }
  60. }
  61. func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) {
  62. timer := time.After(d)
  63. for {
  64. data, err := p.readMultiBufferInternal()
  65. if data != nil || err != nil {
  66. p.writeSignal.Signal()
  67. return data, err
  68. }
  69. select {
  70. case <-p.readSignal.Wait():
  71. case <-timer:
  72. return nil, buf.ErrReadTimeout
  73. }
  74. }
  75. }
  76. func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
  77. p.Lock()
  78. defer p.Unlock()
  79. if err := p.getState(false); err != nil {
  80. return err
  81. }
  82. p.data.AppendMulti(mb)
  83. return nil
  84. }
  85. func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
  86. if mb.IsEmpty() {
  87. return nil
  88. }
  89. for {
  90. if p.limit < 0 || p.data.Len()+mb.Len() <= p.limit {
  91. defer p.readSignal.Signal()
  92. return p.writeMultiBufferInternal(mb)
  93. }
  94. <-p.writeSignal.Wait()
  95. }
  96. }
  97. func (p *pipe) Close() error {
  98. p.Lock()
  99. defer p.Unlock()
  100. if p.state == closed || p.state == errord {
  101. return nil
  102. }
  103. p.state = closed
  104. p.readSignal.Signal()
  105. p.writeSignal.Signal()
  106. return nil
  107. }
  108. func (p *pipe) CloseError() {
  109. p.Lock()
  110. defer p.Unlock()
  111. p.state = errord
  112. if !p.data.IsEmpty() {
  113. p.data.Release()
  114. p.data = nil
  115. }
  116. p.readSignal.Signal()
  117. p.writeSignal.Signal()
  118. }