|
|
@@ -5,7 +5,10 @@ import (
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
+ "v2ray.com/core/app/dispatcher"
|
|
|
+ "v2ray.com/core/app/log"
|
|
|
"v2ray.com/core/common/buf"
|
|
|
+ "v2ray.com/core/common/net"
|
|
|
"v2ray.com/core/common/signal"
|
|
|
"v2ray.com/core/proxy"
|
|
|
"v2ray.com/core/transport/ray"
|
|
|
@@ -16,16 +19,21 @@ const (
|
|
|
maxTotal = 128
|
|
|
)
|
|
|
|
|
|
-type clientSession struct {
|
|
|
+type manager interface {
|
|
|
+ remove(id uint16)
|
|
|
+}
|
|
|
+
|
|
|
+type session struct {
|
|
|
sync.Mutex
|
|
|
- outboundRay ray.OutboundRay
|
|
|
- parent *Client
|
|
|
+ input ray.InputStream
|
|
|
+ output ray.OutputStream
|
|
|
+ parent manager
|
|
|
id uint16
|
|
|
uplinkClosed bool
|
|
|
downlinkClosed bool
|
|
|
}
|
|
|
|
|
|
-func (s *clientSession) checkAndRemove() {
|
|
|
+func (s *session) checkAndRemove() {
|
|
|
s.Lock()
|
|
|
if s.uplinkClosed && s.downlinkClosed {
|
|
|
s.parent.remove(s.id)
|
|
|
@@ -33,14 +41,14 @@ func (s *clientSession) checkAndRemove() {
|
|
|
s.Unlock()
|
|
|
}
|
|
|
|
|
|
-func (s *clientSession) closeUplink() {
|
|
|
+func (s *session) closeUplink() {
|
|
|
s.Lock()
|
|
|
s.uplinkClosed = true
|
|
|
s.Unlock()
|
|
|
s.checkAndRemove()
|
|
|
}
|
|
|
|
|
|
-func (s *clientSession) closeDownlink() {
|
|
|
+func (s *session) closeDownlink() {
|
|
|
s.Lock()
|
|
|
s.downlinkClosed = true
|
|
|
s.Unlock()
|
|
|
@@ -50,56 +58,101 @@ func (s *clientSession) closeDownlink() {
|
|
|
type Client struct {
|
|
|
access sync.RWMutex
|
|
|
count uint16
|
|
|
- sessions map[uint16]*clientSession
|
|
|
+ sessions map[uint16]*session
|
|
|
inboundRay ray.InboundRay
|
|
|
+ ctx context.Context
|
|
|
+ cancel context.CancelFunc
|
|
|
}
|
|
|
|
|
|
-func (m *Client) IsFullyOccupied() bool {
|
|
|
- m.access.RLock()
|
|
|
- defer m.access.RUnlock()
|
|
|
+var muxCoolDestination = net.TCPDestination(net.DomainAddress("v1.mux.cool"), net.Port(9527))
|
|
|
|
|
|
- return len(m.sessions) >= maxParallel
|
|
|
+func NewClient(p proxy.Outbound, dialer proxy.Dialer) (*Client, error) {
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
+ ctx = proxy.ContextWithTarget(ctx, muxCoolDestination)
|
|
|
+ pipe := ray.NewRay(ctx)
|
|
|
+ err := p.Process(ctx, pipe, dialer)
|
|
|
+ if err != nil {
|
|
|
+ cancel()
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return &Client{
|
|
|
+ sessions: make(map[uint16]*session, 256),
|
|
|
+ inboundRay: pipe,
|
|
|
+ ctx: ctx,
|
|
|
+ cancel: cancel,
|
|
|
+ }, nil
|
|
|
}
|
|
|
|
|
|
-func (m *Client) IsFullyUsed() bool {
|
|
|
+func (m *Client) isFullyOccupied() bool {
|
|
|
m.access.RLock()
|
|
|
defer m.access.RUnlock()
|
|
|
|
|
|
- return m.count >= maxTotal
|
|
|
+ return len(m.sessions) >= maxParallel
|
|
|
}
|
|
|
|
|
|
func (m *Client) remove(id uint16) {
|
|
|
m.access.Lock()
|
|
|
+ defer m.access.Unlock()
|
|
|
+
|
|
|
delete(m.sessions, id)
|
|
|
- m.access.Unlock()
|
|
|
+
|
|
|
+ if len(m.sessions) == 0 {
|
|
|
+ m.cancel()
|
|
|
+ m.inboundRay.InboundInput().Close()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (m *Client) Closed() bool {
|
|
|
+ select {
|
|
|
+ case <-m.ctx.Done():
|
|
|
+ return true
|
|
|
+ default:
|
|
|
+ return false
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (m *Client) fetchInput(ctx context.Context, session *clientSession) {
|
|
|
+func (m *Client) fetchInput(ctx context.Context, s *session) {
|
|
|
dest, _ := proxy.TargetFromContext(ctx)
|
|
|
- writer := &muxWriter{
|
|
|
+ writer := &MuxWriter{
|
|
|
dest: dest,
|
|
|
- id: session.id,
|
|
|
+ id: s.id,
|
|
|
writer: m.inboundRay.InboundInput(),
|
|
|
}
|
|
|
_, timer := signal.CancelAfterInactivity(ctx, time.Minute*5)
|
|
|
- buf.PipeUntilEOF(timer, session.outboundRay.OutboundInput(), writer)
|
|
|
+ buf.PipeUntilEOF(timer, s.input, writer)
|
|
|
writer.Close()
|
|
|
- session.closeUplink()
|
|
|
+ s.closeUplink()
|
|
|
}
|
|
|
|
|
|
-func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) {
|
|
|
+func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool {
|
|
|
m.access.Lock()
|
|
|
defer m.access.Unlock()
|
|
|
|
|
|
+ if len(m.sessions) >= maxParallel {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ if m.count >= maxTotal {
|
|
|
+ return false
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-m.ctx.Done():
|
|
|
+ return false
|
|
|
+ default:
|
|
|
+ }
|
|
|
+
|
|
|
m.count++
|
|
|
id := m.count
|
|
|
- session := &clientSession{
|
|
|
- outboundRay: outboundRay,
|
|
|
- parent: m,
|
|
|
- id: id,
|
|
|
+ s := &session{
|
|
|
+ input: outboundRay.OutboundInput(),
|
|
|
+ output: outboundRay.OutboundOutput(),
|
|
|
+ parent: m,
|
|
|
+ id: id,
|
|
|
}
|
|
|
- m.sessions[id] = session
|
|
|
- go m.fetchInput(ctx, session)
|
|
|
+ m.sessions[id] = s
|
|
|
+ go m.fetchInput(ctx, s)
|
|
|
+ return true
|
|
|
}
|
|
|
|
|
|
func (m *Client) fetchOutput() {
|
|
|
@@ -110,10 +163,11 @@ func (m *Client) fetchOutput() {
|
|
|
break
|
|
|
}
|
|
|
m.access.RLock()
|
|
|
- session, found := m.sessions[meta.SessionID]
|
|
|
+ s, found := m.sessions[meta.SessionID]
|
|
|
m.access.RUnlock()
|
|
|
if found && meta.SessionStatus == SessionStatusEnd {
|
|
|
- session.closeDownlink()
|
|
|
+ s.closeDownlink()
|
|
|
+ s.output.Close()
|
|
|
}
|
|
|
if !meta.Option.Has(OptionData) {
|
|
|
continue
|
|
|
@@ -125,7 +179,7 @@ func (m *Client) fetchOutput() {
|
|
|
break
|
|
|
}
|
|
|
if found {
|
|
|
- if err := session.outboundRay.OutboundOutput().Write(data); err != nil {
|
|
|
+ if err := s.output.Write(data); err != nil {
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
@@ -135,3 +189,104 @@ func (m *Client) fetchOutput() {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+type Server struct {
|
|
|
+ dispatcher dispatcher.Interface
|
|
|
+}
|
|
|
+
|
|
|
+func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (ray.InboundRay, error) {
|
|
|
+ if dest != muxCoolDestination {
|
|
|
+ return s.dispatcher.Dispatch(ctx, dest)
|
|
|
+ }
|
|
|
+
|
|
|
+ ray := ray.NewRay(ctx)
|
|
|
+
|
|
|
+ return ray, nil
|
|
|
+}
|
|
|
+
|
|
|
+type ServerWorker struct {
|
|
|
+ dispatcher dispatcher.Interface
|
|
|
+ outboundRay ray.OutboundRay
|
|
|
+ sessions map[uint16]*session
|
|
|
+ access sync.RWMutex
|
|
|
+}
|
|
|
+
|
|
|
+func (w *ServerWorker) remove(id uint16) {
|
|
|
+ w.access.Lock()
|
|
|
+ delete(w.sessions, id)
|
|
|
+ w.access.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+func (w *ServerWorker) handle(ctx context.Context, s *session) {
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ data, err := s.input.Read()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ w.outboundRay.OutboundOutput().Write(data)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (w *ServerWorker) run(ctx context.Context) {
|
|
|
+ input := w.outboundRay.OutboundInput()
|
|
|
+ reader := NewReader(input)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return
|
|
|
+ default:
|
|
|
+ }
|
|
|
+
|
|
|
+ meta, err := reader.ReadMetadata()
|
|
|
+ if err != nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ w.access.RLock()
|
|
|
+ s, found := w.sessions[meta.SessionID]
|
|
|
+ w.access.RUnlock()
|
|
|
+
|
|
|
+ if found && meta.SessionStatus == SessionStatusEnd {
|
|
|
+ s.closeUplink()
|
|
|
+ s.output.Close()
|
|
|
+ }
|
|
|
+
|
|
|
+ if meta.SessionStatus == SessionStatusNew {
|
|
|
+ inboundRay, err := w.dispatcher.Dispatch(ctx, meta.Target)
|
|
|
+ if err != nil {
|
|
|
+ log.Info("Proxyman|Mux: Failed to dispatch request: ", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ s = &session{
|
|
|
+ input: inboundRay.InboundOutput(),
|
|
|
+ output: inboundRay.InboundInput(),
|
|
|
+ parent: w,
|
|
|
+ id: meta.SessionID,
|
|
|
+ }
|
|
|
+ w.access.Lock()
|
|
|
+ w.sessions[meta.SessionID] = s
|
|
|
+ w.access.Unlock()
|
|
|
+ go w.handle(ctx, s)
|
|
|
+ }
|
|
|
+
|
|
|
+ if meta.Option.Has(OptionData) {
|
|
|
+ for {
|
|
|
+ data, more, err := reader.Read()
|
|
|
+ if err != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ if s != nil {
|
|
|
+ s.output.Write(data)
|
|
|
+ }
|
|
|
+ if !more {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|