浏览代码

switch to pubsub in dns service

Darien Raymond 7 年之前
父节点
当前提交
4368edf87c

+ 13 - 6
app/dns/udpns.go

@@ -11,7 +11,7 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
-	"v2ray.com/core/common/signal"
+	"v2ray.com/core/common/signal/pubsub"
 	"v2ray.com/core/common/task"
 	"v2ray.com/core/transport/internet/udp"
 )
@@ -33,7 +33,7 @@ type ClassicNameServer struct {
 	sync.RWMutex
 	address   net.Destination
 	ips       map[string][]IPRecord
-	updated   signal.Notifier
+	pub       *pubsub.Service
 	udpServer *udp.Dispatcher
 	cleanup   *task.Periodic
 	reqID     uint32
@@ -46,6 +46,7 @@ func NewClassicNameServer(address net.Destination, dispatcher core.Dispatcher, c
 		ips:       make(map[string][]IPRecord),
 		udpServer: udp.NewDispatcher(dispatcher),
 		clientIP:  clientIP,
+		pub:       pubsub.NewService(),
 	}
 	s.cleanup = &task.Periodic{
 		Interval: time.Minute,
@@ -96,7 +97,10 @@ func (s *ClassicNameServer) HandleResponse(payload *buf.Buffer) {
 	now := time.Now()
 	for _, rr := range msg.Answer {
 		var ip net.IP
-		domain = rr.Header().Name
+		name := rr.Header().Name
+		if len(name) > 0 {
+			domain = rr.Header().Name
+		}
 		ttl := rr.Header().Ttl
 		switch rr := rr.(type) {
 		case *dns.A:
@@ -105,7 +109,7 @@ func (s *ClassicNameServer) HandleResponse(payload *buf.Buffer) {
 			ip = rr.AAAA
 		}
 		if ttl == 0 {
-			ttl = 300
+			ttl = 600
 		}
 		if len(ip) > 0 {
 			ips = append(ips, IPRecord{
@@ -133,7 +137,7 @@ func (s *ClassicNameServer) updateIP(domain string, ips []IPRecord) {
 		}
 	}
 	s.ips[domain] = ips
-	s.updated.Signal()
+	s.pub.Publish(domain, nil)
 }
 
 func (s *ClassicNameServer) getMsgOptions() *dns.OPT {
@@ -255,6 +259,9 @@ func (s *ClassicNameServer) QueryIP(ctx context.Context, domain string) ([]net.I
 		return ips, nil
 	}
 
+	sub := s.pub.Subscribe(fqdn)
+	defer sub.Close()
+
 	s.sendQuery(ctx, fqdn)
 
 	for {
@@ -266,7 +273,7 @@ func (s *ClassicNameServer) QueryIP(ctx context.Context, domain string) ([]net.I
 		select {
 		case <-ctx.Done():
 			return nil, ctx.Err()
-		case <-s.updated.Wait():
+		case <-sub.Wait():
 		}
 	}
 }

+ 9 - 30
common/signal/notifier.go

@@ -1,47 +1,26 @@
 package signal
 
-import "sync"
-
 // Notifier is a utility for notifying changes. The change producer may notify changes multiple time, and the consumer may get notified asynchronously.
 type Notifier struct {
-	sync.Mutex
-	waiters    []chan struct{}
-	notCosumed bool
+	c chan struct{}
 }
 
 // NewNotifier creates a new Notifier.
 func NewNotifier() *Notifier {
-	return &Notifier{}
+	return &Notifier{
+		c: make(chan struct{}, 1),
+	}
 }
 
 // Signal signals a change, usually by producer. This method never blocks.
 func (n *Notifier) Signal() {
-	n.Lock()
-	defer n.Unlock()
-
-	if len(n.waiters) == 0 {
-		n.notCosumed = true
-		return
-	}
-
-	for _, w := range n.waiters {
-		close(w)
+	select {
+	case n.c <- struct{}{}:
+	default:
 	}
-	n.waiters = make([]chan struct{}, 0, 8)
 }
 
-// Wait returns a channel for waiting for changes.
+// Wait returns a channel for waiting for changes. The returned channel never gets closed.
 func (n *Notifier) Wait() <-chan struct{} {
-	n.Lock()
-	defer n.Unlock()
-
-	w := make(chan struct{})
-	if n.notCosumed {
-		n.notCosumed = false
-		close(w)
-		return w
-	}
-
-	n.waiters = append(n.waiters, w)
-	return w
+	return n.c
 }

+ 1 - 1
common/signal/notifier_test.go

@@ -10,7 +10,7 @@ import (
 func TestNotifierSignal(t *testing.T) {
 	//assert := With(t)
 
-	var n Notifier
+	n := NewNotifier()
 
 	w := n.Wait()
 	n.Signal()

+ 97 - 0
common/signal/pubsub/pubsub.go

@@ -0,0 +1,97 @@
+package pubsub
+
+import (
+	"sync"
+	"time"
+
+	"v2ray.com/core/common"
+	"v2ray.com/core/common/task"
+)
+
+type Subscriber struct {
+	name    string
+	buffer  chan interface{}
+	removed chan struct{}
+}
+
+func (s *Subscriber) push(msg interface{}) {
+	select {
+	case s.buffer <- msg:
+	default:
+	}
+}
+
+func (s *Subscriber) Wait() <-chan interface{} {
+	return s.buffer
+}
+
+func (s *Subscriber) Close() {
+	close(s.removed)
+}
+
+func (s *Subscriber) IsClosed() bool {
+	select {
+	case <-s.removed:
+		return true
+	default:
+		return false
+	}
+}
+
+type Service struct {
+	sync.RWMutex
+	subs  []*Subscriber
+	ctask *task.Periodic
+}
+
+func NewService() *Service {
+	s := &Service{}
+	s.ctask = &task.Periodic{
+		Execute:  s.cleanup,
+		Interval: time.Second * 30,
+	}
+	common.Must(s.ctask.Start())
+	return s
+}
+
+func (s *Service) cleanup() error {
+	s.Lock()
+	defer s.Unlock()
+
+	if len(s.subs) < 16 {
+		return nil
+	}
+
+	newSub := make([]*Subscriber, 0, len(s.subs))
+	for _, sub := range s.subs {
+		if !sub.IsClosed() {
+			newSub = append(newSub, sub)
+		}
+	}
+
+	s.subs = newSub
+	return nil
+}
+
+func (s *Service) Subscribe(name string) *Subscriber {
+	sub := &Subscriber{
+		name:    name,
+		buffer:  make(chan interface{}, 16),
+		removed: make(chan struct{}),
+	}
+	s.Lock()
+	s.subs = append(s.subs, sub)
+	s.Unlock()
+	return sub
+}
+
+func (s *Service) Publish(name string, message interface{}) {
+	s.RLock()
+	defer s.RUnlock()
+
+	for _, sub := range s.subs {
+		if sub.name == name && !sub.IsClosed() {
+			sub.push(message)
+		}
+	}
+}

+ 33 - 0
common/signal/pubsub/pubsub_test.go

@@ -0,0 +1,33 @@
+package pubsub_test
+
+import (
+	"testing"
+
+	. "v2ray.com/core/common/signal/pubsub"
+	. "v2ray.com/ext/assert"
+)
+
+func TestPubsub(t *testing.T) {
+	assert := With(t)
+
+	service := NewService()
+
+	sub := service.Subscribe("a")
+	service.Publish("a", 1)
+
+	select {
+	case v := <-sub.Wait():
+		assert(v.(int), Equals, 1)
+	default:
+		t.Fail()
+	}
+
+	sub.Close()
+	service.Publish("a", 2)
+
+	select {
+	case <-sub.Wait():
+		t.Fail()
+	default:
+	}
+}

+ 1 - 1
proxy/vmess/outbound/outbound.go

@@ -62,7 +62,7 @@ func (v *Handler) Process(ctx context.Context, link *core.Link, dialer proxy.Dia
 	if err != nil {
 		return newError("failed to find an available destination").Base(err).AtWarning()
 	}
-	defer conn.Close()
+	defer conn.Close() //nolint: errcheck
 
 	target, ok := proxy.TargetFromContext(ctx)
 	if !ok {