浏览代码

change status to option

Darien Raymond 7 年之前
父节点
当前提交
79c1087311
共有 3 个文件被更改,包括 34 次插入49 次删除
  1. 2 2
      app/proxyman/mux/frame.go
  2. 28 35
      app/proxyman/mux/mux.go
  3. 4 12
      app/proxyman/mux/writer.go

+ 2 - 2
app/proxyman/mux/frame.go

@@ -15,11 +15,11 @@ const (
 	SessionStatusKeep      SessionStatus = 0x02
 	SessionStatusEnd       SessionStatus = 0x03
 	SessionStatusKeepAlive SessionStatus = 0x04
-	SessionStatusError     SessionStatus = 0x05
 )
 
 const (
-	OptionData bitmask.Byte = 0x01
+	OptionData  bitmask.Byte = 0x01
+	OptionError bitmask.Byte = 0x02
 )
 
 type TargetNetwork byte

+ 28 - 35
app/proxyman/mux/mux.go

@@ -10,8 +10,10 @@ import (
 
 	"v2ray.com/core"
 	"v2ray.com/core/app/proxyman"
+	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
+	"v2ray.com/core/common/log"
 	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/protocol"
 	"v2ray.com/core/common/signal"
@@ -131,7 +133,7 @@ func (m *Client) monitor() {
 		case <-timer.C:
 			size := m.sessionManager.Size()
 			if size == 0 && m.sessionManager.CloseIfNoSession() {
-				m.done.Close()
+				common.Must(m.done.Close())
 				return
 			}
 		}
@@ -151,10 +153,10 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
 	newError("dispatching request to ", dest).WithContext(ctx).WriteToLog()
 	if err := buf.Copy(s.input, writer); err != nil {
 		newError("failed to fetch all input").Base(err).WithContext(ctx).WriteToLog()
-		writer.Error()
-	} else {
-		writer.Close()
+		writer.hasError = true
 	}
+
+	writer.Close()
 }
 
 func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool {
@@ -208,18 +210,10 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReade
 
 func (m *Client) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
-		s.Close()
-	}
-	if meta.Option.Has(OptionData) {
-		return drain(reader)
-	}
-	return nil
-}
-
-func (m *Client) handleStatusError(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if s, found := m.sessionManager.Get(meta.SessionID); found {
-		s.output.CloseError()
-		s.input.CloseError()
+		if meta.Option.Has(OptionError) {
+			s.input.CloseError()
+			s.output.CloseError()
+		}
 		s.Close()
 	}
 	if meta.Option.Has(OptionData) {
@@ -251,8 +245,6 @@ func (m *Client) fetchOutput() {
 			err = m.handleStatusNew(meta, reader)
 		case SessionStatusKeep:
 			err = m.handleStatusKeep(meta, reader)
-		case SessionStatusError:
-			err = m.handleStatusError(meta, reader)
 		default:
 			newError("unknown status: ", meta.SessionStatus).AtError().WriteToLog()
 			return
@@ -310,10 +302,10 @@ func handle(ctx context.Context, s *Session, output buf.Writer) {
 	writer := NewResponseWriter(s.ID, output, s.transferType)
 	if err := buf.Copy(s.input, writer); err != nil {
 		newError("session ", s.ID, " ends.").Base(err).WithContext(ctx).WriteToLog()
-		writer.Error()
-	} else {
-		writer.Close()
+		writer.hasError = true
 	}
+
+	writer.Close()
 	s.Close()
 }
 
@@ -326,6 +318,17 @@ func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.Bu
 
 func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
 	newError("received request for ", meta.Target).WithContext(ctx).WriteToLog()
+	{
+		msg := &log.AccessMessage{
+			To:     meta.Target,
+			Status: log.AccessAccepted,
+			Reason: "",
+		}
+		if src, f := proxy.SourceFromContext(ctx); f {
+			msg.From = src
+		}
+		log.Record(msg)
+	}
 	inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
 	if err != nil {
 		if meta.Option.Has(OptionData) {
@@ -363,18 +366,10 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.Buffere
 
 func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
 	if s, found := w.sessionManager.Get(meta.SessionID); found {
-		s.Close()
-	}
-	if meta.Option.Has(OptionData) {
-		return drain(reader)
-	}
-	return nil
-}
-
-func (w *ServerWorker) handleStatusError(meta *FrameMetadata, reader *buf.BufferedReader) error {
-	if s, found := w.sessionManager.Get(meta.SessionID); found {
-		s.input.CloseError()
-		s.output.CloseError()
+		if meta.Option.Has(OptionError) {
+			s.input.CloseError()
+			s.output.CloseError()
+		}
 		s.Close()
 	}
 	if meta.Option.Has(OptionData) {
@@ -398,8 +393,6 @@ func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedRead
 		err = w.handleStatusNew(ctx, meta, reader)
 	case SessionStatusKeep:
 		err = w.handleStatusKeep(meta, reader)
-	case SessionStatusError:
-		err = w.handleStatusError(meta, reader)
 	default:
 		return newError("unknown status: ", meta.SessionStatus).AtError()
 	}

+ 4 - 12
app/proxyman/mux/writer.go

@@ -13,6 +13,7 @@ type Writer struct {
 	writer       buf.Writer
 	id           uint16
 	followup     bool
+	hasError     bool
 	transferType protocol.TransferType
 }
 
@@ -40,6 +41,7 @@ func (w *Writer) getNextFrameMeta() FrameMetadata {
 		SessionID: w.id,
 		Target:    w.dest,
 	}
+
 	if w.followup {
 		meta.SessionStatus = SessionStatusKeep
 	} else {
@@ -105,18 +107,8 @@ func (w *Writer) Close() error {
 		SessionID:     w.id,
 		SessionStatus: SessionStatusEnd,
 	}
-
-	frame := buf.New()
-	common.Must(meta.WriteTo(frame))
-
-	w.writer.WriteMultiBuffer(buf.NewMultiBufferValue(frame))
-	return nil
-}
-
-func (w *Writer) Error() error {
-	meta := FrameMetadata{
-		SessionID:     w.id,
-		SessionStatus: SessionStatusError,
+	if w.hasError {
+		meta.Option.Set(OptionError)
 	}
 
 	frame := buf.New()