|  | @@ -15,6 +15,7 @@ import (
 | 
	
		
			
				|  |  |  	"v2ray.com/core/common/buf"
 | 
	
		
			
				|  |  |  	"v2ray.com/core/common/errors"
 | 
	
		
			
				|  |  |  	"v2ray.com/core/common/net"
 | 
	
		
			
				|  |  | +	"v2ray.com/core/common/protocol"
 | 
	
		
			
				|  |  |  	"v2ray.com/core/proxy"
 | 
	
		
			
				|  |  |  	"v2ray.com/core/transport/ray"
 | 
	
		
			
				|  |  |  )
 | 
	
	
		
			
				|  | @@ -173,37 +174,37 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
 | 
	
		
			
				|  |  |  	return true
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func drain(reader *Reader) error {
 | 
	
		
			
				|  |  | -	buf.Copy(reader, buf.Discard)
 | 
	
		
			
				|  |  | +func drain(reader io.Reader) error {
 | 
	
		
			
				|  |  | +	buf.Copy(NewStreamReader(reader), buf.Discard)
 | 
	
		
			
				|  |  |  	return nil
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  | +func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader io.Reader) error {
 | 
	
		
			
				|  |  |  	if meta.Option.Has(OptionData) {
 | 
	
		
			
				|  |  |  		return drain(reader)
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	return nil
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (m *Client) handleStatusNew(meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  | +func (m *Client) handleStatusNew(meta *FrameMetadata, reader io.Reader) error {
 | 
	
		
			
				|  |  |  	if meta.Option.Has(OptionData) {
 | 
	
		
			
				|  |  |  		return drain(reader)
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	return nil
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  | +func (m *Client) handleStatusKeep(meta *FrameMetadata, reader io.Reader) error {
 | 
	
		
			
				|  |  |  	if !meta.Option.Has(OptionData) {
 | 
	
		
			
				|  |  |  		return nil
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  	if s, found := m.sessionManager.Get(meta.SessionID); found {
 | 
	
		
			
				|  |  | -		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 | 
	
		
			
				|  |  | +		return buf.Copy(s.NewReader(reader), s.output, buf.IgnoreWriterError())
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	return drain(reader)
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  | +func (m *Client) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error {
 | 
	
		
			
				|  |  |  	if s, found := m.sessionManager.Get(meta.SessionID); found {
 | 
	
		
			
				|  |  |  		s.CloseDownlink()
 | 
	
		
			
				|  |  |  		s.output.Close()
 | 
	
	
		
			
				|  | @@ -217,9 +218,11 @@ func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  |  func (m *Client) fetchOutput() {
 | 
	
		
			
				|  |  |  	defer m.cancel()
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -	reader := NewReader(m.inboundRay.InboundOutput())
 | 
	
		
			
				|  |  | +	reader := buf.ToBytesReader(m.inboundRay.InboundOutput())
 | 
	
		
			
				|  |  | +	metaReader := NewMetadataReader(reader)
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  	for {
 | 
	
		
			
				|  |  | -		meta, err := reader.ReadMetadata()
 | 
	
		
			
				|  |  | +		meta, err := metaReader.Read()
 | 
	
		
			
				|  |  |  		if err != nil {
 | 
	
		
			
				|  |  |  			if errors.Cause(err) != io.EOF {
 | 
	
		
			
				|  |  |  				log.Trace(newError("failed to read metadata").Base(err))
 | 
	
	
		
			
				|  | @@ -289,7 +292,7 @@ type ServerWorker struct {
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  func handle(ctx context.Context, s *Session, output buf.Writer) {
 | 
	
		
			
				|  |  | -	writer := NewResponseWriter(s.ID, output)
 | 
	
		
			
				|  |  | +	writer := NewResponseWriter(s.ID, output, s.transferType)
 | 
	
		
			
				|  |  |  	if err := buf.Copy(s.input, writer); err != nil {
 | 
	
		
			
				|  |  |  		log.Trace(newError("session ", s.ID, " ends: ").Base(err))
 | 
	
		
			
				|  |  |  	}
 | 
	
	
		
			
				|  | @@ -297,14 +300,14 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
 | 
	
		
			
				|  |  |  	s.CloseDownlink()
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  | +func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader io.Reader) error {
 | 
	
		
			
				|  |  |  	if meta.Option.Has(OptionData) {
 | 
	
		
			
				|  |  |  		return drain(reader)
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	return nil
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  | +func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader io.Reader) error {
 | 
	
		
			
				|  |  |  	log.Trace(newError("received request for ", meta.Target))
 | 
	
		
			
				|  |  |  	inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
 | 
	
		
			
				|  |  |  	if err != nil {
 | 
	
	
		
			
				|  | @@ -314,30 +317,34 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 | 
	
		
			
				|  |  |  		return newError("failed to dispatch request.").Base(err)
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	s := &Session{
 | 
	
		
			
				|  |  | -		input:  inboundRay.InboundOutput(),
 | 
	
		
			
				|  |  | -		output: inboundRay.InboundInput(),
 | 
	
		
			
				|  |  | -		parent: w.sessionManager,
 | 
	
		
			
				|  |  | -		ID:     meta.SessionID,
 | 
	
		
			
				|  |  | +		input:        inboundRay.InboundOutput(),
 | 
	
		
			
				|  |  | +		output:       inboundRay.InboundInput(),
 | 
	
		
			
				|  |  | +		parent:       w.sessionManager,
 | 
	
		
			
				|  |  | +		ID:           meta.SessionID,
 | 
	
		
			
				|  |  | +		transferType: protocol.TransferTypeStream,
 | 
	
		
			
				|  |  | +	}
 | 
	
		
			
				|  |  | +	if meta.Target.Network == net.Network_UDP {
 | 
	
		
			
				|  |  | +		s.transferType = protocol.TransferTypePacket
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	w.sessionManager.Add(s)
 | 
	
		
			
				|  |  |  	go handle(ctx, s, w.outboundRay.OutboundOutput())
 | 
	
		
			
				|  |  |  	if meta.Option.Has(OptionData) {
 | 
	
		
			
				|  |  | -		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 | 
	
		
			
				|  |  | +		return buf.Copy(s.NewReader(reader), s.output, buf.IgnoreWriterError())
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	return nil
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  | +func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader io.Reader) error {
 | 
	
		
			
				|  |  |  	if !meta.Option.Has(OptionData) {
 | 
	
		
			
				|  |  |  		return nil
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	if s, found := w.sessionManager.Get(meta.SessionID); found {
 | 
	
		
			
				|  |  | -		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 | 
	
		
			
				|  |  | +		return buf.Copy(s.NewReader(reader), s.output, buf.IgnoreWriterError())
 | 
	
		
			
				|  |  |  	}
 | 
	
		
			
				|  |  |  	return drain(reader)
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *Reader) error {
 | 
	
		
			
				|  |  | +func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader io.Reader) error {
 | 
	
		
			
				|  |  |  	if s, found := w.sessionManager.Get(meta.SessionID); found {
 | 
	
		
			
				|  |  |  		s.CloseUplink()
 | 
	
		
			
				|  |  |  		s.output.Close()
 | 
	
	
		
			
				|  | @@ -348,8 +355,9 @@ func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *Reader) erro
 | 
	
		
			
				|  |  |  	return nil
 | 
	
		
			
				|  |  |  }
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -func (w *ServerWorker) handleFrame(ctx context.Context, reader *Reader) error {
 | 
	
		
			
				|  |  | -	meta, err := reader.ReadMetadata()
 | 
	
		
			
				|  |  | +func (w *ServerWorker) handleFrame(ctx context.Context, reader io.Reader) error {
 | 
	
		
			
				|  |  | +	metaReader := NewMetadataReader(reader)
 | 
	
		
			
				|  |  | +	meta, err := metaReader.Read()
 | 
	
		
			
				|  |  |  	if err != nil {
 | 
	
		
			
				|  |  |  		return newError("failed to read metadata").Base(err)
 | 
	
		
			
				|  |  |  	}
 | 
	
	
		
			
				|  | @@ -375,7 +383,7 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *Reader) error {
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  func (w *ServerWorker) run(ctx context.Context) {
 | 
	
		
			
				|  |  |  	input := w.outboundRay.OutboundInput()
 | 
	
		
			
				|  |  | -	reader := NewReader(input)
 | 
	
		
			
				|  |  | +	reader := buf.ToBytesReader(input)
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  	defer w.sessionManager.Close()
 | 
	
		
			
				|  |  |  
 |