| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 | package rayimport (	"io"	"sync"	"github.com/v2ray/v2ray-core/common/alloc")const (	bufferSize = 128)// NewRay creates a new Ray for direct traffic transport.func NewRay() Ray {	return &directRay{		Input:  NewStream(),		Output: NewStream(),	}}type directRay struct {	Input  *Stream	Output *Stream}func (this *directRay) OutboundInput() InputStream {	return this.Input}func (this *directRay) OutboundOutput() OutputStream {	return this.Output}func (this *directRay) InboundInput() OutputStream {	return this.Input}func (this *directRay) InboundOutput() InputStream {	return this.Output}type Stream struct {	access sync.RWMutex	closed bool	buffer chan *alloc.Buffer}func NewStream() *Stream {	return &Stream{		buffer: make(chan *alloc.Buffer, bufferSize),	}}func (this *Stream) Read() (*alloc.Buffer, error) {	if this.buffer == nil {		return nil, io.EOF	}	this.access.RLock()	defer this.access.RUnlock()	if this.buffer == nil {		return nil, io.EOF	}	result, open := <-this.buffer	if !open {		return nil, io.EOF	}	return result, nil}func (this *Stream) Write(data *alloc.Buffer) error {	if this.closed {		return io.EOF	}	if this.buffer == nil {		return io.EOF	}	this.access.RLock()	defer this.access.RUnlock()	if this.buffer == nil {		return io.EOF	}	this.buffer <- data	return nil}func (this *Stream) Close() {	if this.closed {		return	}	this.access.RLock()	defer this.access.RUnlock()	if this.closed {		return	}	this.closed = true	close(this.buffer)}func (this *Stream) Release() {	if this.buffer == nil {		return	}	this.Close()	this.access.Lock()	defer this.access.Unlock()	if this.buffer == nil {		return	}	for data := range this.buffer {		data.Release()	}	this.buffer = nil}
 |