impl.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package pipe
  2. import (
  3. "io"
  4. "sync"
  5. "time"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/common/signal"
  8. )
  9. type state byte
  10. const (
  11. open state = iota
  12. closed
  13. errord
  14. )
  15. type pipe struct {
  16. sync.Mutex
  17. data buf.MultiBuffer
  18. readSignal *signal.Notifier
  19. writeSignal *signal.Notifier
  20. limit int32
  21. state state
  22. }
  23. func (p *pipe) getState(forRead bool) error {
  24. switch p.state {
  25. case open:
  26. return nil
  27. case closed:
  28. if forRead {
  29. if !p.data.IsEmpty() {
  30. return nil
  31. }
  32. return io.EOF
  33. }
  34. return io.ErrClosedPipe
  35. case errord:
  36. return io.ErrClosedPipe
  37. default:
  38. panic("impossible case")
  39. }
  40. }
  41. func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) {
  42. p.Lock()
  43. defer p.Unlock()
  44. if err := p.getState(true); err != nil {
  45. return nil, err
  46. }
  47. data := p.data
  48. p.data = nil
  49. return data, nil
  50. }
  51. func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
  52. for {
  53. data, err := p.readMultiBufferInternal()
  54. if data != nil || err != nil {
  55. return data, err
  56. }
  57. <-p.readSignal.Wait()
  58. }
  59. }
  60. func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) {
  61. timer := time.After(d)
  62. for {
  63. data, err := p.readMultiBufferInternal()
  64. if data != nil || err != nil {
  65. p.writeSignal.Signal()
  66. return data, err
  67. }
  68. select {
  69. case <-p.readSignal.Wait():
  70. case <-timer:
  71. return nil, buf.ErrReadTimeout
  72. }
  73. }
  74. }
  75. func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
  76. p.Lock()
  77. defer p.Unlock()
  78. if err := p.getState(false); err != nil {
  79. return err
  80. }
  81. p.data.AppendMulti(mb)
  82. return nil
  83. }
  84. func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
  85. if mb.IsEmpty() {
  86. return nil
  87. }
  88. for {
  89. if p.limit < 0 || p.data.Len()+mb.Len() <= p.limit {
  90. defer p.readSignal.Signal()
  91. return p.writeMultiBufferInternal(mb)
  92. }
  93. <-p.writeSignal.Wait()
  94. }
  95. }
  96. func (p *pipe) Close() error {
  97. p.Lock()
  98. defer p.Unlock()
  99. if p.state == closed || p.state == errord {
  100. return nil
  101. }
  102. p.state = closed
  103. p.readSignal.Signal()
  104. p.writeSignal.Signal()
  105. return nil
  106. }
  107. func (p *pipe) CloseError() {
  108. p.Lock()
  109. defer p.Unlock()
  110. p.state = errord
  111. if !p.data.IsEmpty() {
  112. p.data.Release()
  113. p.data = nil
  114. }
  115. p.readSignal.Signal()
  116. p.writeSignal.Signal()
  117. }