| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 | package pipeimport (	"errors"	"io"	"sync"	"time"	"v2ray.com/core/common/buf"	"v2ray.com/core/common/signal")type state byteconst (	open state = iota	closed	errord)type pipe struct {	sync.Mutex	data        buf.MultiBuffer	readSignal  *signal.Notifier	writeSignal *signal.Notifier	limit       int32	state       state}var errBufferFull = errors.New("buffer full")func (p *pipe) getState(forRead bool) error {	switch p.state {	case open:		if !forRead && p.limit >= 0 && p.data.Len() > p.limit {			return errBufferFull		}		return nil	case closed:		if forRead {			if !p.data.IsEmpty() {				return nil			}			return io.EOF		}		return io.ErrClosedPipe	case errord:		return io.ErrClosedPipe	default:		panic("impossible case")	}}func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) {	p.Lock()	defer p.Unlock()	if err := p.getState(true); err != nil {		return nil, err	}	data := p.data	p.data = nil	return data, nil}func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {	for {		data, err := p.readMultiBufferInternal()		if data != nil || err != nil {			p.writeSignal.Signal()			return data, err		}		<-p.readSignal.Wait()	}}func (p *pipe) ReadMultiBufferWithTimeout(d time.Duration) (buf.MultiBuffer, error) {	timer := time.After(d)	for {		data, err := p.readMultiBufferInternal()		if data != nil || err != nil {			p.writeSignal.Signal()			return data, err		}		select {		case <-p.readSignal.Wait():		case <-timer:			return nil, buf.ErrReadTimeout		}	}}func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {	p.Lock()	defer p.Unlock()	if err := p.getState(false); err != nil {		return err	}	p.data.AppendMulti(mb)	return nil}func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {	if mb.IsEmpty() {		return nil	}	for {		err := p.writeMultiBufferInternal(mb)		if err == nil || err != errBufferFull {			p.readSignal.Signal()			return err		}		<-p.writeSignal.Wait()	}}func (p *pipe) Close() error {	p.Lock()	defer p.Unlock()	if p.state == closed || p.state == errord {		return nil	}	p.state = closed	p.readSignal.Signal()	p.writeSignal.Signal()	return nil}func (p *pipe) CloseError() {	p.Lock()	defer p.Unlock()	p.state = errord	if !p.data.IsEmpty() {		p.data.Release()		p.data = nil	}	p.readSignal.Signal()	p.writeSignal.Signal()}
 |