Sfoglia il codice sorgente

retry if vmess conn fails

v2ray 9 anni fa
parent
commit
e878973def
2 ha cambiato i file con 27 aggiunte e 15 eliminazioni
  1. 25 11
      proxy/vmess/outbound/outbound.go
  2. 2 4
      proxy/vmess/outbound/receiver.go

+ 25 - 11
proxy/vmess/outbound/outbound.go

@@ -11,6 +11,7 @@ import (
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/common/protocol"
 	"github.com/v2ray/v2ray-core/common/protocol/raw"
+	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/internal"
 	vmessio "github.com/v2ray/v2ray-core/proxy/vmess/io"
@@ -28,7 +29,24 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	defer ray.OutboundInput().Release()
 	defer ray.OutboundOutput().Close()
 
-	destination, vNextUser := this.receiverManager.PickReceiver()
+	var rec *Receiver
+	var conn *hub.Connection
+
+	err := retry.Timed(5, 100).On(func() error {
+		rec = this.receiverManager.PickReceiver()
+		rawConn, err := hub.Dial(this.meta.Address, rec.Destination)
+		if err != nil {
+			return err
+		}
+		conn = rawConn
+
+		return nil
+	})
+	if err != nil {
+		log.Error("Failed to find an available destination:", err)
+		return err
+	}
+	log.Info("VMessOut: Tunneling request to ", target, " via ", rec.Destination)
 
 	command := protocol.RequestCommandTCP
 	if target.IsUDP() {
@@ -36,20 +54,13 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	}
 	request := &protocol.RequestHeader{
 		Version: raw.Version,
-		User:    vNextUser,
+		User:    rec.PickUser(),
 		Command: command,
 		Address: target.Address(),
 		Port:    target.Port(),
 		Option:  protocol.RequestOptionChunkStream,
 	}
 
-	conn, err := hub.Dial(this.meta.Address, destination)
-	if err != nil {
-		log.Error("Failed to open ", destination, ": ", err)
-		return err
-	}
-	log.Info("VMessOut: Tunneling request to ", request.Address, " via ", destination)
-
 	defer conn.Close()
 
 	if transport.IsConnectionReusable() {
@@ -67,7 +78,7 @@ func (this *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *al
 	session := raw.NewClientSession(protocol.DefaultIDHash)
 
 	go this.handleRequest(session, conn, request, payload, input, &requestFinish)
-	go this.handleResponse(session, conn, request, destination, output, &responseFinish)
+	go this.handleResponse(session, conn, request, rec.Destination, output, &responseFinish)
 
 	requestFinish.Lock()
 	responseFinish.Lock()
@@ -86,7 +97,9 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
 	if request.Option.Has(protocol.RequestOptionChunkStream) {
 		streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
 	}
-	streamWriter.Write(payload)
+	if err := streamWriter.Write(payload); err != nil {
+		conn.SetReusable(false)
+	}
 	writer.SetCached(false)
 
 	err := v2io.Pipe(input, streamWriter)
@@ -112,6 +125,7 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
 
 	header, err := session.DecodeResponseHeader(reader)
 	if err != nil {
+		conn.SetReusable(false)
 		log.Warning("VMessOut: Failed to read response: ", err)
 		return
 	}

+ 2 - 4
proxy/vmess/outbound/receiver.go

@@ -127,12 +127,10 @@ func (this *ReceiverManager) pickStdReceiver() *Receiver {
 	return this.receivers[dice.Roll(len(this.receivers))]
 }
 
-func (this *ReceiverManager) PickReceiver() (v2net.Destination, *protocol.User) {
+func (this *ReceiverManager) PickReceiver() *Receiver {
 	rec := this.pickDetour()
 	if rec == nil {
 		rec = this.pickStdReceiver()
 	}
-	user := rec.PickUser()
-
-	return rec.Destination, user
+	return rec
 }