|
@@ -1,6 +1,7 @@
|
|
|
package pipe
|
|
package pipe
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
|
|
+ "errors"
|
|
|
"io"
|
|
"io"
|
|
|
"sync"
|
|
"sync"
|
|
|
"time"
|
|
"time"
|
|
@@ -26,9 +27,14 @@ type pipe struct {
|
|
|
state state
|
|
state state
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+var errBufferFull = errors.New("buffer full")
|
|
|
|
|
+
|
|
|
func (p *pipe) getState(forRead bool) error {
|
|
func (p *pipe) getState(forRead bool) error {
|
|
|
switch p.state {
|
|
switch p.state {
|
|
|
case open:
|
|
case open:
|
|
|
|
|
+ if !forRead && p.limit >= 0 && p.data.Len() > p.limit {
|
|
|
|
|
+ return errBufferFull
|
|
|
|
|
+ }
|
|
|
return nil
|
|
return nil
|
|
|
case closed:
|
|
case closed:
|
|
|
if forRead {
|
|
if forRead {
|
|
@@ -105,9 +111,10 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
for {
|
|
|
- if p.limit < 0 || p.data.Len()+mb.Len() <= p.limit {
|
|
|
|
|
- defer p.readSignal.Signal()
|
|
|
|
|
- return p.writeMultiBufferInternal(mb)
|
|
|
|
|
|
|
+ err := p.writeMultiBufferInternal(mb)
|
|
|
|
|
+ if err == nil || err != errBufferFull {
|
|
|
|
|
+ p.readSignal.Signal()
|
|
|
|
|
+ return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
<-p.writeSignal.Wait()
|
|
<-p.writeSignal.Wait()
|