Darien Raymond 8 лет назад
Родитель
Сommit
bf0a4c428e
1 измененных файлов с 64 добавлено и 49 удалено
  1. 64 49
      app/proxyman/mux/mux.go

+ 64 - 49
app/proxyman/mux/mux.go

@@ -94,13 +94,18 @@ func (m *ClientManager) onClientFinish() {
 	m.access.Lock()
 	defer m.access.Unlock()
 
-	nActive := 0
-	for idx, client := range m.clients {
-		if nActive != idx && !client.Closed() {
-			m.clients[nActive] = client
+	if len(m.clients) < 10 {
+		return
+	}
+
+	activeClients := make([]*Client, 0, len(m.clients))
+
+	for _, client := range m.clients {
+		if !client.Closed() {
+			activeClients = append(activeClients, client)
 		}
 	}
-	m.clients = m.clients[:nActive]
+	m.clients = activeClients
 }
 
 type Client struct {
@@ -209,12 +214,40 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
 	return true
 }
 
+func drain(reader *Reader) error {
+	for {
+		data, more, err := reader.Read()
+		if err != nil {
+			return err
+		}
+		data.Release()
+		if !more {
+			return nil
+		}
+	}
+}
+
+func pipe(reader *Reader, writer buf.Writer) error {
+	for {
+		data, more, err := reader.Read()
+		if err != nil {
+			return err
+		}
+		if err := writer.Write(data); err != nil {
+			return err
+		}
+		if !more {
+			return nil
+		}
+	}
+}
+
 func (m *Client) fetchOutput() {
 	reader := NewReader(m.inboundRay.InboundOutput())
 	for {
 		meta, err := reader.ReadMetadata()
 		if err != nil {
-			log.Warning("Proxyman|Mux|Client: Failed to read metadata: ", err)
+			log.Info("Proxyman|Mux|Client: Failed to read metadata: ", err)
 			break
 		}
 		m.access.RLock()
@@ -228,19 +261,15 @@ func (m *Client) fetchOutput() {
 			continue
 		}
 
-		for {
-			data, more, err := reader.Read()
-			if err != nil {
-				break
-			}
-			if found {
-				if err := s.output.Write(data); err != nil {
-					break
-				}
-			}
-			if !more {
-				break
-			}
+		if found {
+			err = pipe(reader, s.output)
+		} else {
+			err = drain(reader)
+		}
+
+		if err != nil {
+			log.Info("Proxyman|Mux|Client: Failed to read data: ", err)
+			break
 		}
 	}
 }
@@ -295,22 +324,9 @@ func handle(ctx context.Context, s *session, output buf.Writer) {
 	writer := NewResponseWriter(s.id, output)
 	defer writer.Close()
 
-	for {
-		select {
-		case <-ctx.Done():
-			log.Debug("Proxyman|Mux|ServerWorker: Session ", s.id, " ends by context.")
-			return
-		default:
-			data, err := s.input.Read()
-			if err != nil {
-				log.Info("Proxyman|Mux|ServerWorker: Session ", s.id, " ends: ", err)
-				return
-			}
-			if err := writer.Write(data); err != nil {
-				log.Info("Proxyman|Mux|ServerWorker: Session ", s.id, " ends: ", err)
-				return
-			}
-		}
+	_, timer := signal.CancelAfterInactivity(ctx, time.Minute*30)
+	if err := buf.PipeUntilEOF(timer, s.input, writer); err != nil {
+		log.Info("Proxyman|Mux|ServerWorker: Session ", s.id, " ends: ", err)
 	}
 }
 
@@ -357,20 +373,19 @@ func (w *ServerWorker) run(ctx context.Context) {
 			go handle(ctx, s, w.outboundRay.OutboundOutput())
 		}
 
-		if meta.Option.Has(OptionData) {
-			for {
-				data, more, err := reader.Read()
-				if err != nil {
-					break
-				}
-				if s != nil {
-					if err := s.output.Write(data); err != nil {
-					}
-				}
-				if !more {
-					break
-				}
-			}
+		if !meta.Option.Has(OptionData) {
+			continue
+		}
+
+		if s != nil {
+			err = pipe(reader, s.output)
+		} else {
+			err = drain(reader)
+		}
+
+		if err != nil {
+			log.Info("Proxyman|Mux|ServerWorker: Failed to read data: ", err)
+			break
 		}
 	}
 }