浏览代码

mutex protected close

Darien Raymond 8 年之前
父节点
当前提交
81d840699a
共有 3 个文件被更改,包括 35 次插入14 次删除
  1. 6 7
      app/proxyman/mux/mux.go
  2. 27 3
      app/proxyman/mux/session.go
  3. 2 4
      app/proxyman/mux/session_test.go

+ 6 - 7
app/proxyman/mux/mux.go

@@ -121,13 +121,12 @@ func (m *Client) monitor() {
 	for {
 		select {
 		case <-m.ctx.Done():
-			m.sessionManager.Close()
 			m.inboundRay.InboundInput().Close()
 			m.inboundRay.InboundOutput().CloseError()
 			return
 		case <-time.After(time.Second * 6):
 			size := m.sessionManager.Size()
-			if size == 0 {
+			if size == 0 && m.sessionManager.CloseIfNoSession() {
 				m.cancel()
 			}
 		}
@@ -169,12 +168,12 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
 	default:
 	}
 
-	s := &Session{
-		input:  outboundRay.OutboundInput(),
-		output: outboundRay.OutboundOutput(),
-		parent: m.sessionManager,
+	s := m.sessionManager.Allocate()
+	if s == nil {
+		return false
 	}
-	m.sessionManager.Allocate(s)
+	s.input = outboundRay.OutboundInput()
+	s.output = outboundRay.OutboundOutput()
 	go fetchInput(ctx, s, m.inboundRay.InboundInput())
 	return true
 }

+ 27 - 3
app/proxyman/mux/session.go

@@ -10,6 +10,7 @@ type SessionManager struct {
 	sync.RWMutex
 	count    uint16
 	sessions map[uint16]*Session
+	closed   bool
 }
 
 func NewSessionManager() *SessionManager {
@@ -26,13 +27,21 @@ func (m *SessionManager) Size() int {
 	return len(m.sessions)
 }
 
-func (m *SessionManager) Allocate(s *Session) {
+func (m *SessionManager) Allocate() *Session {
 	m.Lock()
 	defer m.Unlock()
 
+	if m.closed {
+		return nil
+	}
+
 	m.count++
-	s.ID = m.count
+	s := &Session{
+		ID:     m.count,
+		parent: m,
+	}
 	m.sessions[s.ID] = s
+	return s
 }
 
 func (m *SessionManager) Add(s *Session) {
@@ -53,17 +62,32 @@ func (m *SessionManager) Get(id uint16) (*Session, bool) {
 	m.RLock()
 	defer m.RUnlock()
 
+	if m.closed {
+		return nil, false
+	}
+
 	s, found := m.sessions[id]
 	return s, found
 }
 
-func (m *SessionManager) Close() {
+func (m *SessionManager) CloseIfNoSession() bool {
 	m.RLock()
 	defer m.RUnlock()
 
+	if len(m.sessions) == 0 {
+		return false
+	}
+
+	m.closed = true
+
 	for _, s := range m.sessions {
+		s.input.CloseError()
 		s.output.CloseError()
 	}
+
+	m.sessions = make(map[uint16]*Session)
+
+	return true
 }
 
 type Session struct {

+ 2 - 4
app/proxyman/mux/session_test.go

@@ -12,12 +12,10 @@ func TestSessionManagerAdd(t *testing.T) {
 
 	m := NewSessionManager()
 
-	s := &Session{}
-	m.Allocate(s)
+	s := m.Allocate()
 	assert.Uint16(s.ID).Equals(1)
 
-	s = &Session{}
-	m.Allocate(s)
+	s = m.Allocate()
 	assert.Uint16(s.ID).Equals(2)
 
 	s = &Session{