| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 | package rayimport (	"io"	"sync"	"time"	"v2ray.com/core/common/buf")const (	bufferSize = 512)// NewRay creates a new Ray for direct traffic transport.func NewRay() Ray {	return &directRay{		Input:  NewStream(),		Output: NewStream(),	}}type directRay struct {	Input  *Stream	Output *Stream}func (v *directRay) OutboundInput() InputStream {	return v.Input}func (v *directRay) OutboundOutput() OutputStream {	return v.Output}func (v *directRay) InboundInput() OutputStream {	return v.Input}func (v *directRay) InboundOutput() InputStream {	return v.Output}type Stream struct {	access sync.RWMutex	closed bool	buffer chan *buf.Buffer}func NewStream() *Stream {	return &Stream{		buffer: make(chan *buf.Buffer, bufferSize),	}}func (v *Stream) Read() (*buf.Buffer, error) {	if v.buffer == nil {		return nil, io.EOF	}	v.access.RLock()	if v.buffer == nil {		v.access.RUnlock()		return nil, io.EOF	}	channel := v.buffer	v.access.RUnlock()	result, open := <-channel	if !open {		return nil, io.EOF	}	return result, nil}func (v *Stream) Write(data *buf.Buffer) error {	for !v.closed {		err := v.TryWriteOnce(data)		if err != io.ErrNoProgress {			return err		}	}	return io.ErrClosedPipe}func (v *Stream) TryWriteOnce(data *buf.Buffer) error {	v.access.RLock()	defer v.access.RUnlock()	if v.closed {		return io.ErrClosedPipe	}	select {	case v.buffer <- data:		return nil	case <-time.After(2 * time.Second):		return io.ErrNoProgress	}}func (v *Stream) Close() {	if v.closed {		return	}	v.access.Lock()	defer v.access.Unlock()	if v.closed {		return	}	v.closed = true	close(v.buffer)}func (v *Stream) Release() {	if v.buffer == nil {		return	}	v.Close()	v.access.Lock()	defer v.access.Unlock()	if v.buffer == nil {		return	}	for data := range v.buffer {		data.Release()	}	v.buffer = nil}
 |