|  | @@ -6,6 +6,7 @@ import (
 | 
											
												
													
														|  |  	"sync"
 |  |  	"sync"
 | 
											
												
													
														|  |  	"time"
 |  |  	"time"
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +	"v2ray.com/core/common"
 | 
											
												
													
														|  |  	"v2ray.com/core/common/buf"
 |  |  	"v2ray.com/core/common/buf"
 | 
											
												
													
														|  |  	"v2ray.com/core/common/platform"
 |  |  	"v2ray.com/core/common/platform"
 | 
											
												
													
														|  |  )
 |  |  )
 | 
											
										
											
												
													
														|  | @@ -50,6 +51,7 @@ func init() {
 | 
											
												
													
														|  |  	streamSizeLimit = uint64(size) * 1024 * 1024
 |  |  	streamSizeLimit = uint64(size) * 1024 * 1024
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// Stream is a sequential container for data in bytes.
 | 
											
												
													
														|  |  type Stream struct {
 |  |  type Stream struct {
 | 
											
												
													
														|  |  	access      sync.RWMutex
 |  |  	access      sync.RWMutex
 | 
											
												
													
														|  |  	data        buf.MultiBuffer
 |  |  	data        buf.MultiBuffer
 | 
											
										
											
												
													
														|  | @@ -61,6 +63,7 @@ type Stream struct {
 | 
											
												
													
														|  |  	err         bool
 |  |  	err         bool
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// NewStream creates a new Stream.
 | 
											
												
													
														|  |  func NewStream(ctx context.Context) *Stream {
 |  |  func NewStream(ctx context.Context) *Stream {
 | 
											
												
													
														|  |  	return &Stream{
 |  |  	return &Stream{
 | 
											
												
													
														|  |  		ctx:         ctx,
 |  |  		ctx:         ctx,
 | 
											
										
											
												
													
														|  | @@ -92,15 +95,17 @@ func (s *Stream) getData() (buf.MultiBuffer, error) {
 | 
											
												
													
														|  |  	return nil, nil
 |  |  	return nil, nil
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// Peek fills in the given buffer with data from head of the Stream.
 | 
											
												
													
														|  |  func (s *Stream) Peek(b *buf.Buffer) {
 |  |  func (s *Stream) Peek(b *buf.Buffer) {
 | 
											
												
													
														|  |  	s.access.RLock()
 |  |  	s.access.RLock()
 | 
											
												
													
														|  |  	defer s.access.RUnlock()
 |  |  	defer s.access.RUnlock()
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | -	b.Reset(func(data []byte) (int, error) {
 |  | 
 | 
											
												
													
														|  | 
 |  | +	common.Must(b.Reset(func(data []byte) (int, error) {
 | 
											
												
													
														|  |  		return s.data.Copy(data), nil
 |  |  		return s.data.Copy(data), nil
 | 
											
												
													
														|  | -	})
 |  | 
 | 
											
												
													
														|  | 
 |  | +	}))
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// Read reads data from the Stream.
 | 
											
												
													
														|  |  func (s *Stream) Read() (buf.MultiBuffer, error) {
 |  |  func (s *Stream) Read() (buf.MultiBuffer, error) {
 | 
											
												
													
														|  |  	for {
 |  |  	for {
 | 
											
												
													
														|  |  		mb, err := s.getData()
 |  |  		mb, err := s.getData()
 | 
											
										
											
												
													
														|  | @@ -121,6 +126,7 @@ func (s *Stream) Read() (buf.MultiBuffer, error) {
 | 
											
												
													
														|  |  	}
 |  |  	}
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// ReadTimeout reads from the Stream with a specified timeout.
 | 
											
												
													
														|  |  func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) {
 |  |  func (s *Stream) ReadTimeout(timeout time.Duration) (buf.MultiBuffer, error) {
 | 
											
												
													
														|  |  	for {
 |  |  	for {
 | 
											
												
													
														|  |  		mb, err := s.getData()
 |  |  		mb, err := s.getData()
 | 
											
										
											
												
													
														|  | @@ -151,6 +157,7 @@ func (s *Stream) Size() uint64 {
 | 
											
												
													
														|  |  	return s.size
 |  |  	return s.size
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// waitForStreamSize waits until the Stream has room for more data, or any error happens.
 | 
											
												
													
														|  |  func (s *Stream) waitForStreamSize() error {
 |  |  func (s *Stream) waitForStreamSize() error {
 | 
											
												
													
														|  |  	if streamSizeLimit == 0 {
 |  |  	if streamSizeLimit == 0 {
 | 
											
												
													
														|  |  		return nil
 |  |  		return nil
 | 
											
										
											
												
													
														|  | @@ -170,6 +177,7 @@ func (s *Stream) waitForStreamSize() error {
 | 
											
												
													
														|  |  	return nil
 |  |  	return nil
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// Write writes more data into the Stream.
 | 
											
												
													
														|  |  func (s *Stream) Write(data buf.MultiBuffer) error {
 |  |  func (s *Stream) Write(data buf.MultiBuffer) error {
 | 
											
												
													
														|  |  	if data.IsEmpty() {
 |  |  	if data.IsEmpty() {
 | 
											
												
													
														|  |  		return nil
 |  |  		return nil
 | 
											
										
											
												
													
														|  | @@ -213,6 +221,7 @@ func (s *Stream) notifyWrite() {
 | 
											
												
													
														|  |  	}
 |  |  	}
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// Close closes the stream for writing. Read() still works until EOF.
 | 
											
												
													
														|  |  func (s *Stream) Close() {
 |  |  func (s *Stream) Close() {
 | 
											
												
													
														|  |  	s.access.Lock()
 |  |  	s.access.Lock()
 | 
											
												
													
														|  |  	s.close = true
 |  |  	s.close = true
 | 
											
										
											
												
													
														|  | @@ -221,6 +230,7 @@ func (s *Stream) Close() {
 | 
											
												
													
														|  |  	s.access.Unlock()
 |  |  	s.access.Unlock()
 | 
											
												
													
														|  |  }
 |  |  }
 | 
											
												
													
														|  |  
 |  |  
 | 
											
												
													
														|  | 
 |  | +// CloseError closes the Stream with error. Read() will return an error afterwards.
 | 
											
												
													
														|  |  func (s *Stream) CloseError() {
 |  |  func (s *Stream) CloseError() {
 | 
											
												
													
														|  |  	s.access.Lock()
 |  |  	s.access.Lock()
 | 
											
												
													
														|  |  	s.err = true
 |  |  	s.err = true
 |