Darien Raymond 8 年 前
コミット
18b0b87c52
4 ファイル変更23 行追加13 行削除
  1. 2 4
      app/proxyman/mux/mux.go
  2. 2 2
      common/buf/io.go
  3. 17 5
      common/signal/timer.go
  4. 2 2
      proxy/vmess/inbound/inbound.go

+ 2 - 4
app/proxyman/mux/mux.go

@@ -177,8 +177,7 @@ func fetchInput(ctx context.Context, s *session, output buf.Writer) {
 			return
 		}
 	}
-	_, timer := signal.CancelAfterInactivity(ctx, time.Minute*30)
-	if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil {
+	if err := buf.PipeUntilEOF(signal.BackgroundTimer(), s.input, writer); err != nil {
 		log.Info("Proxyman|Mux|Client: Failed to fetch all input: ", err)
 	}
 }
@@ -324,8 +323,7 @@ func handle(ctx context.Context, s *session, output buf.Writer) {
 	writer := NewResponseWriter(s.id, output)
 	defer writer.Close()
 
-	_, timer := signal.CancelAfterInactivity(ctx, time.Minute*30)
-	if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil {
+	if err := buf.PipeUntilEOF(signal.BackgroundTimer(), s.input, writer); err != nil {
 		log.Info("Proxyman|Mux|ServerWorker: Session ", s.id, " ends: ", err)
 	}
 }

+ 2 - 2
common/buf/io.go

@@ -42,7 +42,7 @@ func ReadFullFrom(reader io.Reader, size int) Supplier {
 
 // Pipe dumps all payload from reader to writer, until an error occurs.
 // ActivityTimer gets updated as soon as there is a payload.
-func Pipe(timer *signal.ActivityTimer, reader Reader, writer Writer) error {
+func Pipe(timer signal.ActivityTimer, reader Reader, writer Writer) error {
 	for {
 		buffer, err := reader.Read()
 		if err != nil {
@@ -65,7 +65,7 @@ func Pipe(timer *signal.ActivityTimer, reader Reader, writer Writer) error {
 }
 
 // PipeUntilEOF behaves the same as Pipe(). The only difference is PipeUntilEOF returns nil on EOF.
-func PipeUntilEOF(timer *signal.ActivityTimer, reader Reader, writer Writer) error {
+func PipeUntilEOF(timer signal.ActivityTimer, reader Reader, writer Writer) error {
 	err := Pipe(timer, reader, writer)
 	if err != nil && errors.Cause(err) != io.EOF {
 		return err

+ 17 - 5
common/signal/timer.go

@@ -5,21 +5,25 @@ import (
 	"time"
 )
 
-type ActivityTimer struct {
+type ActivityTimer interface {
+	Update()
+}
+
+type realActivityTimer struct {
 	updated chan bool
 	timeout time.Duration
 	ctx     context.Context
 	cancel  context.CancelFunc
 }
 
-func (t *ActivityTimer) Update() {
+func (t *realActivityTimer) Update() {
 	select {
 	case t.updated <- true:
 	default:
 	}
 }
 
-func (t *ActivityTimer) run() {
+func (t *realActivityTimer) run() {
 	for {
 		select {
 		case <-time.After(t.timeout):
@@ -37,9 +41,9 @@ func (t *ActivityTimer) run() {
 	}
 }
 
-func CancelAfterInactivity(ctx context.Context, timeout time.Duration) (context.Context, *ActivityTimer) {
+func CancelAfterInactivity(ctx context.Context, timeout time.Duration) (context.Context, ActivityTimer) {
 	ctx, cancel := context.WithCancel(ctx)
-	timer := &ActivityTimer{
+	timer := &realActivityTimer{
 		ctx:     ctx,
 		cancel:  cancel,
 		timeout: timeout,
@@ -48,3 +52,11 @@ func CancelAfterInactivity(ctx context.Context, timeout time.Duration) (context.
 	go timer.run()
 	return ctx, timer
 }
+
+type noOpActivityTimer struct{}
+
+func (noOpActivityTimer) Update() {}
+
+func BackgroundTimer() ActivityTimer {
+	return noOpActivityTimer{}
+}

+ 2 - 2
proxy/vmess/inbound/inbound.go

@@ -123,7 +123,7 @@ func (v *Handler) GetUser(email string) *protocol.User {
 	return user
 }
 
-func transferRequest(timer *signal.ActivityTimer, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error {
+func transferRequest(timer signal.ActivityTimer, session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error {
 	defer output.Close()
 
 	bodyReader := session.DecodeRequestBody(request, input)
@@ -133,7 +133,7 @@ func transferRequest(timer *signal.ActivityTimer, session *encoding.ServerSessio
 	return nil
 }
 
-func transferResponse(timer *signal.ActivityTimer, session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error {
+func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error {
 	session.EncodeResponseHeader(response, output)
 
 	bodyWriter := session.EncodeResponseBody(request, output)