Darien Raymond před 8 roky
rodič
revize
72d9cb8a74

+ 1 - 0
app/proxyman/mux/frame.go

@@ -106,6 +106,7 @@ func (f FrameMetadata) AsSupplier() buf.Supplier {
 				length += nDomain + 2
 			}
 		}
+
 		serial.Uint16ToBytes(uint16(length), lengthBytes[:0])
 		return length + 2, nil
 	}

+ 29 - 23
app/proxyman/mux/mux.go

@@ -83,7 +83,7 @@ func (m *ClientManager) Dispatch(ctx context.Context, outboundRay ray.OutboundRa
 
 	client, err := NewClient(m.proxy, m.dialer, m)
 	if err != nil {
-		return err
+		return errors.Base(err).Message("Proxyman|Mux|ClientManager: Failed to create client.")
 	}
 	m.clients = append(m.clients, client)
 	client.Dispatch(ctx, outboundRay)
@@ -119,25 +119,17 @@ func NewClient(p proxy.Outbound, dialer proxy.Dialer, m *ClientManager) (*Client
 	ctx, cancel := context.WithCancel(context.Background())
 	ctx = proxy.ContextWithTarget(ctx, muxCoolDestination)
 	pipe := ray.NewRay(ctx)
-	err := p.Process(ctx, pipe, dialer)
-	if err != nil {
-		cancel()
-		return nil, err
-	}
-	return &Client{
+	go p.Process(ctx, pipe, dialer)
+	c := &Client{
 		sessions:   make(map[uint16]*session, 256),
 		inboundRay: pipe,
 		ctx:        ctx,
 		cancel:     cancel,
 		manager:    m,
-	}, nil
-}
-
-func (m *Client) isFullyOccupied() bool {
-	m.access.RLock()
-	defer m.access.RUnlock()
-
-	return len(m.sessions) >= maxParallel
+		count:      0,
+	}
+	go c.fetchOutput()
+	return c, nil
 }
 
 func (m *Client) remove(id uint16) {
@@ -162,20 +154,28 @@ func (m *Client) Closed() bool {
 	}
 }
 
-func (m *Client) fetchInput(ctx context.Context, s *session) {
+func fetchInput(ctx context.Context, s *session, output buf.Writer) {
 	dest, _ := proxy.TargetFromContext(ctx)
 	writer := &Writer{
 		dest:   dest,
 		id:     s.id,
-		writer: m.inboundRay.InboundInput(),
+		writer: output,
+	}
+	defer writer.Close()
+	defer s.closeUplink()
+
+	log.Info("Proxyman|Mux|Client: Dispatching request to ", dest)
+	data, _ := s.input.ReadTimeout(time.Millisecond * 500)
+	if data != nil {
+		if err := writer.Write(data); err != nil {
+			log.Info("Proxyman|Mux|Client: Failed to write first payload: ", err)
+			return
+		}
 	}
 	_, timer := signal.CancelAfterInactivity(ctx, time.Minute*5)
 	if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil {
 		log.Info("Proxyman|Mux|Client: Failed to fetch all input: ", err)
 	}
-
-	writer.Close()
-	s.closeUplink()
 }
 
 func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool {
@@ -205,7 +205,7 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
 		id:     id,
 	}
 	m.sessions[id] = s
-	go m.fetchInput(ctx, s)
+	go fetchInput(ctx, s, m.inboundRay.InboundInput())
 	return true
 }
 
@@ -214,6 +214,7 @@ func (m *Client) fetchOutput() {
 	for {
 		meta, err := reader.ReadMetadata()
 		if err != nil {
+			log.Warning("Proxyman|Mux|Client: Failed to read metadata: ", err)
 			break
 		}
 		m.access.RLock()
@@ -291,6 +292,9 @@ func (w *ServerWorker) remove(id uint16) {
 }
 
 func (w *ServerWorker) handle(ctx context.Context, s *session) {
+	writer := NewResponseWriter(s.id, w.outboundRay.OutboundOutput())
+	defer writer.Close()
+
 	for {
 		select {
 		case <-ctx.Done():
@@ -300,7 +304,7 @@ func (w *ServerWorker) handle(ctx context.Context, s *session) {
 			if err != nil {
 				return
 			}
-			w.outboundRay.OutboundOutput().Write(data)
+			writer.Write(data)
 		}
 	}
 }
@@ -330,6 +334,7 @@ func (w *ServerWorker) run(ctx context.Context) {
 		}
 
 		if meta.SessionStatus == SessionStatusNew {
+			log.Info("Proxyman|Mux|Server: Received request for ", meta.Target)
 			inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
 			if err != nil {
 				log.Info("Proxyman|Mux: Failed to dispatch request: ", err)
@@ -354,7 +359,8 @@ func (w *ServerWorker) run(ctx context.Context) {
 					break
 				}
 				if s != nil {
-					s.output.Write(data)
+					if err := s.output.Write(data); err != nil {
+					}
 				}
 				if !more {
 					break

+ 4 - 0
app/proxyman/mux/reader.go

@@ -4,6 +4,7 @@ import (
 	"io"
 
 	"v2ray.com/core/common/buf"
+	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/serial"
 )
 
@@ -28,6 +29,9 @@ func (r *Reader) ReadMetadata() (*FrameMetadata, error) {
 		return nil, err
 	}
 	metaLen := serial.BytesToUint16(b.Bytes())
+	if metaLen > 512 {
+		return nil, errors.New("Proxyman|Mux|Reader: Invalid metalen ", metaLen)
+	}
 	b.Clear()
 	if err := b.AppendSupplier(buf.ReadFullFrom(r.reader, int(metaLen))); err != nil {
 		return nil, err

+ 8 - 1
app/proxyman/mux/writer.go

@@ -21,6 +21,14 @@ func NewWriter(id uint16, dest net.Destination, writer buf.Writer) *Writer {
 	}
 }
 
+func NewResponseWriter(id uint16, writer buf.Writer) *Writer {
+	return &Writer{
+		id:       id,
+		writer:   writer,
+		followup: true,
+	}
+}
+
 func (w *Writer) writeInternal(b *buf.Buffer) error {
 	meta := FrameMetadata{
 		SessionID: w.id,
@@ -74,7 +82,6 @@ func (w *Writer) Write(b *buf.Buffer) error {
 func (w *Writer) Close() {
 	meta := FrameMetadata{
 		SessionID:     w.id,
-		Target:        w.dest,
 		SessionStatus: SessionStatusEnd,
 	}