ソースを参照

Merge branch 'master' of https://github.com/v2ray/v2ray-core

Darien Raymond 8 年 前
コミット
d742cf71cb

+ 0 - 72
common/net/timed_io.go

@@ -1,72 +0,0 @@
-package net
-
-import (
-	"io"
-	"net"
-	"time"
-)
-
-var (
-	emptyTime time.Time
-)
-
-type TimeOutReader struct {
-	timeout    uint32
-	connection net.Conn
-	worker     io.Reader
-}
-
-func NewTimeOutReader(timeout uint32 /* seconds */, connection net.Conn) *TimeOutReader {
-	reader := &TimeOutReader{
-		connection: connection,
-		timeout:    0,
-	}
-	reader.SetTimeOut(timeout)
-	return reader
-}
-
-func (reader *TimeOutReader) Read(p []byte) (int, error) {
-	return reader.worker.Read(p)
-}
-
-func (reader *TimeOutReader) GetTimeOut() uint32 {
-	return reader.timeout
-}
-
-func (reader *TimeOutReader) SetTimeOut(value uint32) {
-	if reader.worker != nil && value == reader.timeout {
-		return
-	}
-	reader.timeout = value
-	if value > 0 {
-		reader.worker = &timedReaderWorker{
-			timeout:    value,
-			connection: reader.connection,
-		}
-	} else {
-		reader.worker = &noOpReaderWorker{
-			connection: reader.connection,
-		}
-	}
-}
-
-type timedReaderWorker struct {
-	timeout    uint32
-	connection net.Conn
-}
-
-func (v *timedReaderWorker) Read(p []byte) (int, error) {
-	deadline := time.Duration(v.timeout) * time.Second
-	v.connection.SetReadDeadline(time.Now().Add(deadline))
-	nBytes, err := v.connection.Read(p)
-	v.connection.SetReadDeadline(emptyTime)
-	return nBytes, err
-}
-
-type noOpReaderWorker struct {
-	connection net.Conn
-}
-
-func (v *noOpReaderWorker) Read(p []byte) (int, error) {
-	return v.connection.Read(p)
-}

+ 0 - 19
common/net/timed_io_test.go

@@ -1,19 +0,0 @@
-package net_test
-
-import (
-	"testing"
-
-	. "v2ray.com/core/common/net"
-	"v2ray.com/core/testing/assert"
-)
-
-func TestTimeOutSettings(t *testing.T) {
-	assert := assert.On(t)
-
-	reader := NewTimeOutReader(8, nil)
-	assert.Uint32(reader.GetTimeOut()).Equals(8)
-	reader.SetTimeOut(8) // no op
-	assert.Uint32(reader.GetTimeOut()).Equals(8)
-	reader.SetTimeOut(9)
-	assert.Uint32(reader.GetTimeOut()).Equals(9)
-}

+ 11 - 6
common/protocol/user.go

@@ -1,6 +1,8 @@
 package protocol
 
 import (
+	"time"
+
 	"v2ray.com/core/common/errors"
 )
 
@@ -30,15 +32,18 @@ func (v *User) GetTypedAccount() (Account, error) {
 }
 
 func (v *User) GetSettings() UserSettings {
-	settings := UserSettings{
-		PayloadReadTimeout: 120,
-	}
-	if v.Level > 0 {
-		settings.PayloadReadTimeout = 0
+	settings := UserSettings{}
+	switch v.Level {
+	case 0:
+		settings.PayloadTimeout = time.Second * 30
+	case 1:
+		settings.PayloadTimeout = time.Minute * 2
+	default:
+		settings.PayloadTimeout = time.Minute * 5
 	}
 	return settings
 }
 
 type UserSettings struct {
-	PayloadReadTimeout uint32
+	PayloadTimeout time.Duration
 }

+ 6 - 3
proxy/dokodemo/dokodemo.go

@@ -70,15 +70,18 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 	}
 	ctx = proxy.ContextWithDestination(ctx, dest)
 	ctx, cancel := context.WithCancel(ctx)
-	timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
+	timeout := time.Second * time.Duration(d.config.Timeout)
+	if timeout == 0 {
+		timeout = time.Minute * 2
+	}
+	timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
 
 	inboundRay := d.packetDispatcher.DispatchToOutbound(ctx)
 
 	requestDone := signal.ExecuteAsync(func() error {
 		defer inboundRay.InboundInput().Close()
 
-		timedReader := net.NewTimeOutReader(d.config.Timeout, conn)
-		chunkReader := buf.NewReader(timedReader)
+		chunkReader := buf.NewReader(conn)
 
 		if err := buf.PipeUntilEOF(timer, chunkReader, inboundRay.InboundInput()); err != nil {
 			log.Info("Dokodemo: Failed to transport request: ", err)

+ 9 - 3
proxy/http/server.go

@@ -82,8 +82,8 @@ func parseHost(rawHost string, defaultPort v2net.Port) (v2net.Destination, error
 func (s *Server) Process(ctx context.Context, network v2net.Network, conn internet.Connection) error {
 	conn.SetReusable(false)
 
-	timedReader := v2net.NewTimeOutReader(s.config.Timeout, conn)
-	reader := bufio.OriginalReaderSize(timedReader, 2048)
+	conn.SetReadDeadline(time.Now().Add(time.Second * 8))
+	reader := bufio.OriginalReaderSize(conn, 2048)
 
 	request, err := http.ReadRequest(reader)
 	if err != nil {
@@ -93,6 +93,8 @@ func (s *Server) Process(ctx context.Context, network v2net.Network, conn intern
 		return err
 	}
 	log.Info("HTTP: Request to Method [", request.Method, "] Host [", request.Host, "] with URL [", request.URL, "]")
+	conn.SetReadDeadline(time.Time{})
+
 	defaultPort := v2net.Port(80)
 	if strings.ToLower(request.URL.Scheme) == "https" {
 		defaultPort = v2net.Port(443)
@@ -133,7 +135,11 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 	}
 
 	ctx, cancel := context.WithCancel(ctx)
-	timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
+	timeout := time.Second * time.Duration(s.config.Timeout)
+	if timeout == 0 {
+		timeout = time.Minute * 2
+	}
+	timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
 	ray := s.packetDispatcher.DispatchToOutbound(ctx)
 
 	requestDone := signal.ExecuteAsync(func() error {

+ 5 - 6
proxy/shadowsocks/server.go

@@ -137,20 +137,18 @@ func (v *Server) handlerUDPPayload(ctx context.Context, conn internet.Connection
 }
 
 func (s *Server) handleConnection(ctx context.Context, conn internet.Connection) error {
-	timedReader := net.NewTimeOutReader(16, conn)
-	bufferedReader := bufio.NewReader(timedReader)
+	conn.SetReadDeadline(time.Now().Add(time.Second * 8))
+	bufferedReader := bufio.NewReader(conn)
 	request, bodyReader, err := ReadTCPSession(s.user, bufferedReader)
 	if err != nil {
 		log.Access(conn.RemoteAddr(), "", log.AccessRejected, err)
 		log.Info("Shadowsocks|Server: Failed to create request from: ", conn.RemoteAddr(), ": ", err)
 		return err
 	}
+	conn.SetReadDeadline(time.Time{})
 
 	bufferedReader.SetBuffered(false)
 
-	userSettings := s.user.GetSettings()
-	timedReader.SetTimeOut(userSettings.PayloadReadTimeout)
-
 	dest := request.Destination()
 	log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "")
 	log.Info("Shadowsocks|Server: Tunnelling request to ", dest)
@@ -159,7 +157,8 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection)
 	ctx = protocol.ContextWithUser(ctx, request.User)
 
 	ctx, cancel := context.WithCancel(ctx)
-	timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
+	userSettings := s.user.GetSettings()
+	timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout)
 	ray := s.packetDispatcher.DispatchToOutbound(ctx)
 
 	requestDone := signal.ExecuteAsync(func() error {

+ 1 - 1
proxy/socks/client.go

@@ -107,7 +107,7 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay) error {
 		}
 		responseFunc = func() error {
 			defer ray.OutboundOutput().Close()
-			reader := &UDPReader{reader: net.NewTimeOutReader(16, udpConn)}
+			reader := &UDPReader{reader: udpConn}
 			return buf.PipeUntilEOF(timer, reader, ray.OutboundOutput())
 		}
 	}

+ 8 - 4
proxy/socks/server.go

@@ -72,8 +72,8 @@ func (s *Server) Process(ctx context.Context, network net.Network, conn internet
 }
 
 func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error {
-	timedReader := net.NewTimeOutReader(16 /* seconds, for handshake */, conn)
-	reader := bufio.NewReader(timedReader)
+	conn.SetReadDeadline(time.Now().Add(time.Second * 8))
+	reader := bufio.NewReader(conn)
 
 	inboundDest := proxy.InboundDestinationFromContext(ctx)
 	session := &ServerSession{
@@ -88,13 +88,13 @@ func (s *Server) processTCP(ctx context.Context, conn internet.Connection) error
 		log.Info("Socks|Server: Failed to read request: ", err)
 		return err
 	}
+	conn.SetReadDeadline(time.Time{})
 
 	if request.Command == protocol.RequestCommandTCP {
 		dest := request.Destination()
 		log.Info("Socks|Server: TCP Connect request to ", dest)
 		log.Access(source, dest, log.AccessAccepted, "")
 
-		timedReader.SetTimeOut(s.config.Timeout)
 		ctx = proxy.ContextWithDestination(ctx, dest)
 		return s.transport(ctx, reader, conn)
 	}
@@ -117,7 +117,11 @@ func (*Server) handleUDP() error {
 
 func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writer) error {
 	ctx, cancel := context.WithCancel(ctx)
-	timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
+	timeout := time.Second * time.Duration(v.config.Timeout)
+	if timeout == 0 {
+		timeout = time.Minute * 2
+	}
+	timer := signal.CancelAfterInactivity(ctx, cancel, timeout)
 
 	ray := v.packetDispatcher.DispatchToOutbound(ctx)
 	input := ray.InboundInput()

+ 6 - 5
proxy/vmess/inbound/inbound.go

@@ -178,8 +178,8 @@ func transferResponse(timer *signal.ActivityTimer, session *encoding.ServerSessi
 }
 
 func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network, connection internet.Connection) error {
-	connReader := net.NewTimeOutReader(8, connection)
-	reader := bufio.NewReader(connReader)
+	connection.SetReadDeadline(time.Now().Add(time.Second * 8))
+	reader := bufio.NewReader(connection)
 
 	session := encoding.NewServerSession(v.clients)
 	request, err := session.DecodeRequestHeader(reader)
@@ -195,19 +195,20 @@ func (v *VMessInboundHandler) Process(ctx context.Context, network net.Network,
 	log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
 	log.Info("VMess|Inbound: Received request for ", request.Destination())
 
+	connection.SetReadDeadline(time.Time{})
+
 	connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
+	userSettings := request.User.GetSettings()
 
 	ctx = proxy.ContextWithDestination(ctx, request.Destination())
 	ctx = protocol.ContextWithUser(ctx, request.User)
 	ctx, cancel := context.WithCancel(ctx)
-	timer := signal.CancelAfterInactivity(ctx, cancel, time.Minute*2)
+	timer := signal.CancelAfterInactivity(ctx, cancel, userSettings.PayloadTimeout)
 	ray := v.packetDispatcher.DispatchToOutbound(ctx)
 
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
 
-	userSettings := request.User.GetSettings()
-	connReader.SetTimeOut(userSettings.PayloadReadTimeout)
 	reader.SetBuffered(false)
 
 	requestDone := signal.ExecuteAsync(func() error {