Prechádzať zdrojové kódy

add exponential backoff as retry logic

Darien Raymond 9 rokov pred
rodič
commit
30cd9e929d

+ 20 - 11
common/retry/retry.go

@@ -16,34 +16,43 @@ type Strategy interface {
 }
 
 type retryer struct {
-	NextDelay func(int) int
+	totalAttempt int
+	nextDelay    func() uint32
 }
 
 // On implements Strategy.On.
 func (r *retryer) On(method func() error) error {
 	attempt := 0
-	for {
+	for attempt < r.totalAttempt {
 		err := method()
 		if err == nil {
 			return nil
 		}
-		delay := r.NextDelay(attempt)
-		if delay < 0 {
-			return ErrRetryFailed
-		}
+		delay := r.nextDelay()
 		<-time.After(time.Duration(delay) * time.Millisecond)
 		attempt++
 	}
+	return ErrRetryFailed
 }
 
 // Timed returns a retry strategy with fixed interval.
-func Timed(attempts int, delay int) Strategy {
+func Timed(attempts int, delay uint32) Strategy {
 	return &retryer{
-		NextDelay: func(attempt int) int {
-			if attempt >= attempts {
-				return -1
-			}
+		totalAttempt: attempts,
+		nextDelay: func() uint32 {
 			return delay
 		},
 	}
 }
+
+func ExponentialBackoff(attempts int, delay uint32) Strategy {
+	nextDelay := uint32(0)
+	return &retryer{
+		totalAttempt: attempts,
+		nextDelay: func() uint32 {
+			r := nextDelay
+			nextDelay += delay
+			return r
+		},
+	}
+}

+ 17 - 5
common/retry/retry_test.go

@@ -68,14 +68,26 @@ func TestRetryExhausted(t *testing.T) {
 	startTime := time.Now()
 	called := 0
 	err := Timed(2, 1000).On(func() error {
-		if called < 5 {
-			called++
-			return errorTestOnly
-		}
-		return nil
+		called++
+		return errorTestOnly
 	})
 	duration := time.Since(startTime)
 
 	assert.Error(err).Equals(ErrRetryFailed)
 	assert.Int64(int64(duration / time.Millisecond)).AtLeast(1900)
 }
+
+func TestExponentialBackoff(t *testing.T) {
+	assert := assert.On(t)
+
+	startTime := time.Now()
+	called := 0
+	err := ExponentialBackoff(10, 100).On(func() error {
+		called++
+		return errorTestOnly
+	})
+	duration := time.Since(startTime)
+
+	assert.Error(err).Equals(ErrRetryFailed)
+	assert.Int64(int64(duration / time.Millisecond)).AtLeast(4000)
+}

+ 1 - 1
inbound_detour_always.go

@@ -58,7 +58,7 @@ func (this *InboundDetourHandlerAlways) Close() {
 // Starts the inbound connection handler.
 func (this *InboundDetourHandlerAlways) Start() error {
 	for _, ich := range this.ich {
-		err := retry.Timed(100 /* times */, 100 /* ms */).On(func() error {
+		err := retry.ExponentialBackoff(10 /* times */, 200 /* ms */).On(func() error {
 			err := ich.Start()
 			if err != nil {
 				log.Error("Failed to start inbound detour:", err)

+ 1 - 1
proxy/freedom/freedom.go

@@ -79,7 +79,7 @@ func (this *FreedomConnection) Dispatch(destination v2net.Destination, payload *
 	if this.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() {
 		destination = this.ResolveIP(destination)
 	}
-	err := retry.Timed(5, 100).On(func() error {
+	err := retry.ExponentialBackoff(5, 100).On(func() error {
 		rawConn, err := internet.Dial(this.meta.Address, destination, this.meta.GetDialerOptions())
 		if err != nil {
 			return err

+ 1 - 1
proxy/shadowsocks/client.go

@@ -44,7 +44,7 @@ func (this *Client) Dispatch(destination v2net.Destination, payload *alloc.Buffe
 	var server *protocol.ServerSpec
 	var conn internet.Connection
 
-	err := retry.Timed(5, 100).On(func() error {
+	err := retry.ExponentialBackoff(5, 100).On(func() error {
 		server = this.serverPicker.PickServer()
 		dest := server.Destination()
 		dest.Network = network

+ 1 - 1
proxy/vmess/outbound/outbound.go

@@ -33,7 +33,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	var rec *protocol.ServerSpec
 	var conn internet.Connection
 
-	err := retry.Timed(5, 100).On(func() error {
+	err := retry.ExponentialBackoff(5, 100).On(func() error {
 		rec = this.serverPicker.PickServer()
 		rawConn, err := internet.Dial(this.meta.Address, rec.Destination(), this.meta.GetDialerOptions())
 		if err != nil {

+ 17 - 7
transport/internet/tcp_hub.go

@@ -7,6 +7,7 @@ import (
 
 	"v2ray.com/core/common/log"
 	v2net "v2ray.com/core/common/net"
+	"v2ray.com/core/common/retry"
 )
 
 var (
@@ -78,14 +79,23 @@ func (this *TCPHub) Close() {
 func (this *TCPHub) start() {
 	this.accepting = true
 	for this.accepting {
-		conn, err := this.listener.Accept()
-
-		if err != nil {
-			if this.accepting {
-				log.Warning("Internet|Listener: Failed to accept new TCP connection: ", err)
+		var newConn Connection
+		err := retry.ExponentialBackoff(10, 200).On(func() error {
+			if !this.accepting {
+				return nil
+			}
+			conn, err := this.listener.Accept()
+			if err != nil {
+				if this.accepting {
+					log.Warning("Internet|Listener: Failed to accept new TCP connection: ", err)
+				}
+				return err
 			}
-			continue
+			newConn = conn
+			return nil
+		})
+		if err == nil && newConn != nil {
+			go this.connCallback(newConn)
 		}
-		go this.connCallback(conn)
 	}
 }