| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 | package rayimport (	"errors"	"io"	"time"	"context"	"v2ray.com/core/common/buf")const (	bufferSize = 512)var ErrReadTimeout = errors.New("Ray: timeout.")// NewRay creates a new Ray for direct traffic transport.func NewRay(ctx context.Context) Ray {	return &directRay{		Input:  NewStream(ctx),		Output: NewStream(ctx),	}}type directRay struct {	Input  *Stream	Output *Stream}func (v *directRay) OutboundInput() InputStream {	return v.Input}func (v *directRay) OutboundOutput() OutputStream {	return v.Output}func (v *directRay) InboundInput() OutputStream {	return v.Input}func (v *directRay) InboundOutput() InputStream {	return v.Output}func (v *directRay) AddInspector(inspector Inspector) {	if inspector == nil {		return	}	v.Input.inspector.AddInspector(inspector)	v.Output.inspector.AddInspector(inspector)}type Stream struct {	buffer    chan *buf.Buffer	ctx       context.Context	close     chan bool	err       chan bool	inspector *InspectorChain}func NewStream(ctx context.Context) *Stream {	return &Stream{		ctx:       ctx,		buffer:    make(chan *buf.Buffer, bufferSize),		close:     make(chan bool),		err:       make(chan bool),		inspector: &InspectorChain{},	}}func (v *Stream) Read() (*buf.Buffer, error) {	select {	case <-v.ctx.Done():		return nil, io.ErrClosedPipe	case <-v.err:		return nil, io.ErrClosedPipe	case b := <-v.buffer:		return b, nil	default:		select {		case <-v.ctx.Done():			return nil, io.ErrClosedPipe		case b := <-v.buffer:			return b, nil		case <-v.close:			return nil, io.EOF		case <-v.err:			return nil, io.ErrClosedPipe		}	}}func (v *Stream) ReadTimeout(timeout time.Duration) (*buf.Buffer, error) {	select {	case <-v.ctx.Done():		return nil, io.ErrClosedPipe	case <-v.err:		return nil, io.ErrClosedPipe	case b := <-v.buffer:		return b, nil	default:		select {		case <-v.ctx.Done():			return nil, io.ErrClosedPipe		case b := <-v.buffer:			return b, nil		case <-v.close:			return nil, io.EOF		case <-v.err:			return nil, io.ErrClosedPipe		case <-time.After(timeout):			return nil, ErrReadTimeout		}	}}func (v *Stream) Write(data *buf.Buffer) (err error) {	if data.IsEmpty() {		return	}	select {	case <-v.ctx.Done():		return io.ErrClosedPipe	case <-v.err:		return io.ErrClosedPipe	case <-v.close:		return io.ErrClosedPipe	default:		select {		case <-v.ctx.Done():			return io.ErrClosedPipe		case <-v.err:			return io.ErrClosedPipe		case <-v.close:			return io.ErrClosedPipe		case v.buffer <- data:			v.inspector.Input(data)			return nil		}	}}func (v *Stream) Close() {	defer swallowPanic()	close(v.close)}func (v *Stream) CloseError() {	defer swallowPanic()	close(v.err)	n := len(v.buffer)	for i := 0; i < n; i++ {		select {		case b := <-v.buffer:			b.Release()		default:			return		}	}}func (v *Stream) Release() {}func swallowPanic() {	recover()}
 |