Darien Raymond hace 8 años
padre
commit
9e6a57b2b8
Se han modificado 3 ficheros con 201 adiciones y 29 borrados
  1. 21 2
      app/proxyman/mux/frame.go
  2. 124 6
      app/proxyman/mux/mux.go
  3. 56 21
      app/proxyman/mux/writer.go

+ 21 - 2
app/proxyman/mux/frame.go

@@ -16,6 +16,24 @@ const (
 	SessionStatusEnd  SessionStatus = 0x03
 )
 
+type Option byte
+
+const (
+	OptionData Option = 0x01
+)
+
+func (o Option) Has(x Option) bool {
+	return (o & x) == x
+}
+
+func (o *Option) Add(x Option) {
+	*o = (*o | x)
+}
+
+func (o *Option) Clear(x Option) {
+	*o = (*o & (^x))
+}
+
 type TargetNetwork byte
 
 const (
@@ -36,7 +54,7 @@ Frame format
 2 bytes - length
 2 bytes - session id
 1 bytes - status
-1 bytes - reserved
+1 bytes - option
 
 1 byte - network
 2 bytes - port
@@ -48,6 +66,7 @@ type FrameMetadata struct {
 	SessionID     uint16
 	SessionStatus SessionStatus
 	Target        net.Destination
+	Option        Option
 }
 
 func (f FrameMetadata) AsSupplier() buf.Supplier {
@@ -55,7 +74,7 @@ func (f FrameMetadata) AsSupplier() buf.Supplier {
 		b = serial.Uint16ToBytes(uint16(0), b) // place holder for length
 
 		b = serial.Uint16ToBytes(f.SessionID, b)
-		b = append(b, byte(f.SessionStatus), 0 /* reserved */)
+		b = append(b, byte(f.SessionStatus), byte(f.Option))
 		length := 4
 
 		if f.SessionStatus == SessionStatusNew {

+ 124 - 6
app/proxyman/mux/mux.go

@@ -1,19 +1,137 @@
 package mux
 
-import "v2ray.com/core/common/net"
+import (
+	"context"
+	"sync"
+	"time"
+
+	"v2ray.com/core/common/buf"
+	"v2ray.com/core/common/signal"
+	"v2ray.com/core/proxy"
+	"v2ray.com/core/transport/ray"
+)
 
 const (
 	maxParallel = 8
 	maxTotal    = 128
 )
 
-type mergerWorker struct {
+type clientSession struct {
+	sync.Mutex
+	outboundRay    ray.OutboundRay
+	parent         *Client
+	id             uint16
+	uplinkClosed   bool
+	downlinkClosed bool
+}
+
+func (s *clientSession) checkAndRemove() {
+	s.Lock()
+	if s.uplinkClosed && s.downlinkClosed {
+		s.parent.remove(s.id)
+	}
+	s.Unlock()
+}
+
+func (s *clientSession) closeUplink() {
+	s.Lock()
+	s.uplinkClosed = true
+	s.Unlock()
+	s.checkAndRemove()
+}
+
+func (s *clientSession) closeDownlink() {
+	s.Lock()
+	s.downlinkClosed = true
+	s.Unlock()
+	s.checkAndRemove()
+}
+
+type Client struct {
+	access     sync.RWMutex
+	count      uint16
+	sessions   map[uint16]*clientSession
+	inboundRay ray.InboundRay
+}
+
+func (m *Client) IsFullyOccupied() bool {
+	m.access.RLock()
+	defer m.access.RUnlock()
+
+	return len(m.sessions) >= maxParallel
+}
+
+func (m *Client) IsFullyUsed() bool {
+	m.access.RLock()
+	defer m.access.RUnlock()
+
+	return m.count >= maxTotal
 }
 
-func (w *mergerWorker) isFull() bool {
-	return true
+func (m *Client) remove(id uint16) {
+	m.access.Lock()
+	delete(m.sessions, id)
+	m.access.Unlock()
 }
 
-type Merger struct {
-	sessions map[net.Destination]mergerWorker
+func (m *Client) fetchInput(ctx context.Context, session *clientSession) {
+	dest, _ := proxy.TargetFromContext(ctx)
+	writer := &muxWriter{
+		dest:   dest,
+		id:     session.id,
+		writer: m.inboundRay.InboundInput(),
+	}
+	_, timer := signal.CancelAfterInactivity(ctx, time.Minute*5)
+	buf.PipeUntilEOF(timer, session.outboundRay.OutboundInput(), writer)
+	writer.Close()
+	session.closeUplink()
+}
+
+func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
+	m.access.Lock()
+	defer m.access.Unlock()
+
+	m.count++
+	id := m.count
+	session := &clientSession{
+		outboundRay: outboundRay,
+		parent:      m,
+		id:          id,
+	}
+	m.sessions[id] = session
+	go m.fetchInput(ctx, session)
+}
+
+func (m *Client) fetchOutput() {
+	reader := NewReader(m.inboundRay.InboundOutput())
+	for {
+		meta, err := reader.ReadMetadata()
+		if err != nil {
+			break
+		}
+		m.access.RLock()
+		session, found := m.sessions[meta.SessionID]
+		m.access.RUnlock()
+		if found && meta.SessionStatus == SessionStatusEnd {
+			session.closeDownlink()
+		}
+		if !meta.Option.Has(OptionData) {
+			continue
+		}
+
+		for {
+			data, more, err := reader.Read()
+			if err != nil {
+				break
+			}
+			if found {
+				if err := session.outboundRay.OutboundOutput().Write(data); err != nil {
+					break
+				}
+			}
+			if !more {
+				break
+			}
+		}
+	}
 }

+ 56 - 21
app/proxyman/mux/writer.go

@@ -2,41 +2,76 @@ package mux
 
 import (
 	"v2ray.com/core/common/buf"
+	"v2ray.com/core/common/net"
 	"v2ray.com/core/common/serial"
 )
 
 type muxWriter struct {
-	meta   *FrameMetadata
-	writer buf.Writer
+	id       uint16
+	dest     net.Destination
+	writer   buf.Writer
+	followup bool
 }
 
-func (w *muxWriter) Write(b *buf.Buffer) error {
-	frame := buf.New()
-	frame.AppendSupplier(w.meta.AsSupplier())
-	if w.meta.SessionStatus == SessionStatusNew {
-		w.meta.SessionStatus = SessionStatusKeep
+func (w *muxWriter) writeInternal(b *buf.Buffer) error {
+	meta := FrameMetadata{
+		SessionID: w.id,
+		Target:    w.dest,
+	}
+	if w.followup {
+		meta.SessionStatus = SessionStatusKeep
+	} else {
+		w.followup = true
+		meta.SessionStatus = SessionStatusNew
 	}
 
-	frame.AppendSupplier(serial.WriteUint16(0))
-	lengthBytes := frame.BytesFrom(-2)
+	if b.Len() > 0 {
+		meta.Option.Add(OptionData)
+	}
 
-	nBytes, err := frame.Write(b.Bytes())
-	if err != nil {
-		return err
+	frame := buf.New()
+	frame.AppendSupplier(meta.AsSupplier())
+
+	if b.Len() > 0 {
+		frame.AppendSupplier(serial.WriteUint16(0))
+		lengthBytes := frame.BytesFrom(-2)
+
+		nBytes, err := frame.Write(b.Bytes())
+		if err != nil {
+			frame.Release()
+			return err
+		}
+
+		serial.Uint16ToBytes(uint16(nBytes), lengthBytes[:0])
+		b.SliceFrom(nBytes)
 	}
 
-	serial.Uint16ToBytes(uint16(nBytes), lengthBytes[:0])
-	if err := w.writer.Write(frame); err != nil {
-		frame.Release()
-		b.Release()
+	return w.writer.Write(frame)
+}
+
+func (w *muxWriter) Write(b *buf.Buffer) error {
+	defer b.Release()
+
+	if err := w.writeInternal(b); err != nil {
 		return err
 	}
+	for !b.IsEmpty() {
+		if err := w.writeInternal(b); err != nil {
+			return err
+		}
+	}
+	return nil
+}
 
-	b.SliceFrom(nBytes)
-	if !b.IsEmpty() {
-		return w.Write(b)
+func (w *muxWriter) Close() {
+	meta := FrameMetadata{
+		SessionID:     w.id,
+		Target:        w.dest,
+		SessionStatus: SessionStatusEnd,
 	}
-	b.Release()
 
-	return nil
+	frame := buf.New()
+	frame.AppendSupplier(meta.AsSupplier())
+
+	w.writer.Write(frame)
 }