Darien Raymond 8 年 前
コミット
5ff2b3453a
2 ファイル変更124 行追加38 行削除
  1. 41 0
      common/task/task.go
  2. 83 38
      proxy/vmess/inbound/inbound.go

+ 41 - 0
common/task/task.go

@@ -0,0 +1,41 @@
+package task
+
+import (
+	"sync"
+)
+
+type Task interface {
+	Execute() error
+}
+
+type ParallelExecutor struct {
+	sync.Mutex
+	tasks  sync.WaitGroup
+	errors []error
+}
+
+func (pe *ParallelExecutor) track(err error) {
+	if err == nil {
+		return
+	}
+
+	pe.Lock()
+	pe.errors = append(pe.errors, err)
+	pe.Unlock()
+}
+
+func (pe *ParallelExecutor) Execute(task Task) {
+	pe.tasks.Add(1)
+	go func() {
+		pe.track(task.Execute())
+		pe.tasks.Done()
+	}()
+}
+
+func (pe *ParallelExecutor) Wait() {
+	pe.tasks.Wait()
+}
+
+func (pe *ParallelExecutor) Errors() []error {
+	return pe.errors
+}

+ 83 - 38
proxy/vmess/inbound/inbound.go

@@ -15,13 +15,75 @@ import (
 	v2net "v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/serial"
+	"v2ray.com/core/common/task"
 	"v2ray.com/core/common/uuid"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/proxy/vmess"
 	"v2ray.com/core/proxy/vmess/encoding"
 	"v2ray.com/core/transport/internet"
+	"v2ray.com/core/transport/ray"
 )
 
+type requestProcessor struct {
+	session *encoding.ServerSession
+	request *protocol.RequestHeader
+	input   io.Reader
+	output  ray.OutputStream
+}
+
+func (r *requestProcessor) Execute() error {
+	defer r.output.Close()
+
+	bodyReader := r.session.DecodeRequestBody(r.request, r.input)
+	defer bodyReader.Release()
+
+	if err := buf.PipeUntilEOF(bodyReader, r.output); err != nil {
+		log.Debug("VMess|Inbound: Error when sending data to outbound: ", err)
+		return err
+	}
+
+	return nil
+}
+
+type responseProcessor struct {
+	session  *encoding.ServerSession
+	request  *protocol.RequestHeader
+	response *protocol.ResponseHeader
+	input    ray.InputStream
+	output   io.Writer
+}
+
+func (r *responseProcessor) Execute() error {
+	defer r.input.Release()
+	r.session.EncodeResponseHeader(r.response, r.output)
+
+	bodyWriter := r.session.EncodeResponseBody(r.request, r.output)
+
+	// Optimize for small response packet
+	if data, err := r.input.Read(); err == nil {
+		if err := bodyWriter.Write(data); err != nil {
+			return err
+		}
+
+		if bufferedWriter, ok := r.output.(*bufio.BufferedWriter); ok {
+			bufferedWriter.SetBuffered(false)
+		}
+
+		if err := buf.PipeUntilEOF(r.input, bodyWriter); err != nil {
+			log.Debug("VMess|Inbound: Error when sending data to downstream: ", err)
+			return err
+		}
+	}
+
+	if r.request.Option.Has(protocol.RequestOptionChunkStream) {
+		if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 type userByEmail struct {
 	sync.RWMutex
 	cache           map[string]*protocol.User
@@ -176,24 +238,17 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
 	defer input.Close()
 	defer output.Release()
 
-	var readFinish sync.Mutex
-	readFinish.Lock()
-
 	userSettings := request.User.GetSettings()
 	connReader.SetTimeOut(userSettings.PayloadReadTimeout)
 	reader.SetBuffered(false)
 
-	go func() {
-		bodyReader := session.DecodeRequestBody(request, reader)
-		if err := buf.PipeUntilEOF(bodyReader, input); err != nil {
-			log.Debug("VMess|Inbound: Error when sending data to outbound: ", err)
-			connection.SetReusable(false)
-		}
-		bodyReader.Release()
-
-		input.Close()
-		readFinish.Unlock()
-	}()
+	var executor task.ParallelExecutor
+	executor.Execute(&requestProcessor{
+		session: session,
+		request: request,
+		input:   reader,
+		output:  input,
+	})
 
 	writer := bufio.NewWriter(connection)
 	defer writer.Release()
@@ -206,34 +261,24 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
 		response.Option.Set(protocol.ResponseOptionConnectionReuse)
 	}
 
-	session.EncodeResponseHeader(response, writer)
-
-	bodyWriter := session.EncodeResponseBody(request, writer)
-
-	// Optimize for small response packet
-	if data, err := output.Read(); err == nil {
-		if err := bodyWriter.Write(data); err != nil {
-			connection.SetReusable(false)
-		}
-
-		writer.SetBuffered(false)
+	executor.Execute(&responseProcessor{
+		session:  session,
+		request:  request,
+		response: response,
+		input:    output,
+		output:   writer,
+	})
 
-		if err := buf.PipeUntilEOF(output, bodyWriter); err != nil {
-			log.Debug("VMess|Inbound: Error when sending data to downstream: ", err)
-			connection.SetReusable(false)
-		}
+	executor.Wait()
 
+	if err := writer.Flush(); err != nil {
+		connection.SetReusable(false)
 	}
-	output.Release()
-	if request.Option.Has(protocol.RequestOptionChunkStream) {
-		if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
-			connection.SetReusable(false)
-		}
-	}
-	writer.Flush()
-	bodyWriter.Release()
 
-	readFinish.Lock()
+	errors := executor.Errors()
+	if len(errors) > 0 {
+		connection.SetReusable(false)
+	}
 }
 
 type Factory struct{}