Browse Source

force chunked stream

v2ray 9 years ago
parent
commit
58530e6920
3 changed files with 12 additions and 8 deletions
  1. 5 1
      proxy/vmess/inbound/inbound.go
  2. 3 0
      proxy/vmess/io/reader.go
  3. 4 7
      proxy/vmess/outbound/outbound.go

+ 5 - 1
proxy/vmess/inbound/inbound.go

@@ -6,6 +6,7 @@ import (
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/app/dispatcher"
 	"github.com/v2ray/v2ray-core/app/dispatcher"
 	"github.com/v2ray/v2ray-core/app/proxyman"
 	"github.com/v2ray/v2ray-core/app/proxyman"
+	"github.com/v2ray/v2ray-core/common/alloc"
 	v2io "github.com/v2ray/v2ray-core/common/io"
 	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	v2net "github.com/v2ray/v2ray-core/common/net"
@@ -185,6 +186,9 @@ func (this *VMessInboundHandler) HandleConnection(connection hub.Connection) {
 				writer = vmessio.NewAuthChunkWriter(writer)
 				writer = vmessio.NewAuthChunkWriter(writer)
 			}
 			}
 			v2io.Pipe(output, writer)
 			v2io.Pipe(output, writer)
+			if request.Option.IsChunkStream() {
+				writer.Write(alloc.NewSmallBuffer().Clear())
+			}
 			output.Release()
 			output.Release()
 			writer.Release()
 			writer.Release()
 			finish.Unlock()
 			finish.Unlock()
@@ -192,8 +196,8 @@ func (this *VMessInboundHandler) HandleConnection(connection hub.Connection) {
 		writeFinish.Lock()
 		writeFinish.Lock()
 	}
 	}
 
 
-	connection.CloseWrite()
 	readFinish.Lock()
 	readFinish.Lock()
+	writeFinish.Lock()
 }
 }
 
 
 func init() {
 func init() {

+ 3 - 0
proxy/vmess/io/reader.go

@@ -27,6 +27,9 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) {
 	}
 	}
 
 
 	length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value()
 	length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value()
+	if length == 4 { // Length of authentication bytes.
+		return nil, io.EOF
+	}
 	if _, err := io.ReadFull(this.reader, buffer.Value[:length]); err != nil {
 	if _, err := io.ReadFull(this.reader, buffer.Value[:length]); err != nil {
 		buffer.Release()
 		buffer.Release()
 		return nil, err
 		return nil, err

+ 4 - 7
proxy/vmess/outbound/outbound.go

@@ -35,9 +35,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 		Command: command,
 		Command: command,
 		Address: target.Address(),
 		Address: target.Address(),
 		Port:    target.Port(),
 		Port:    target.Port(),
-	}
-	if command == proto.RequestCommandUDP {
-		request.Option |= proto.RequestOptionChunkStream
+		Option:  proto.RequestOptionChunkStream,
 	}
 	}
 
 
 	conn, err := dialer.Dial(destination)
 	conn, err := dialer.Dial(destination)
@@ -65,10 +63,6 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	go this.handleResponse(session, conn, request, destination, output, &responseFinish)
 	go this.handleResponse(session, conn, request, destination, output, &responseFinish)
 
 
 	requestFinish.Lock()
 	requestFinish.Lock()
-	if tcpConn, ok := conn.(*net.TCPConn); ok {
-		tcpConn.CloseWrite()
-	}
-
 	responseFinish.Lock()
 	responseFinish.Lock()
 	output.Close()
 	output.Close()
 	input.Release()
 	input.Release()
@@ -97,6 +91,9 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
 		streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
 		streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
 	}
 	}
 	v2io.Pipe(input, streamWriter)
 	v2io.Pipe(input, streamWriter)
+	if request.Option.IsChunkStream() {
+		streamWriter.Write(alloc.NewSmallBuffer().Clear())
+	}
 	streamWriter.Release()
 	streamWriter.Release()
 	return
 	return
 }
 }