Browse Source

generalized event handler

Darien Raymond 8 năm trước cách đây
mục cha
commit
fd8db49dc9

+ 1 - 1
app/dispatcher/impl/default.go

@@ -39,7 +39,7 @@ func NewDefaultDispatcher(ctx context.Context, config *dispatcher.Config) (*Defa
 		return nil, newError("no space in context")
 	}
 	d := &DefaultDispatcher{}
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		d.ohm = proxyman.OutboundHandlerManagerFromSpace(space)
 		if d.ohm == nil {
 			return newError("OutboundHandlerManager is not found in the space")

+ 1 - 1
app/dns/server/server.go

@@ -52,7 +52,7 @@ func NewCacheServer(ctx context.Context, config *dns.Config) (*CacheServer, erro
 		servers: make([]NameServer, len(config.NameServers)),
 		hosts:   config.GetInternalHosts(),
 	}
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		disp := dispatcher.FromSpace(space)
 		if disp == nil {
 			return newError("dispatcher is not found in the space")

+ 4 - 0
app/policy/manager/manager.go

@@ -37,6 +37,7 @@ func global() policy.Policy {
 	}
 }
 
+// GetPolicy implements policy.Manager.
 func (m *Instance) GetPolicy(level uint32) policy.Policy {
 	if p, ok := m.levels[level]; ok {
 		return *p
@@ -44,13 +45,16 @@ func (m *Instance) GetPolicy(level uint32) policy.Policy {
 	return global()
 }
 
+// Start implements app.Application.Start().
 func (m *Instance) Start() error {
 	return nil
 }
 
+// Close implements app.Application.Close().
 func (m *Instance) Close() {
 }
 
+// Interface implement app.Application.Interface().
 func (m *Instance) Interface() interface{} {
 	return (*policy.Manager)(nil)
 }

+ 1 - 1
app/proxyman/mux/mux.go

@@ -274,7 +274,7 @@ type Server struct {
 func NewServer(ctx context.Context) *Server {
 	s := &Server{}
 	space := app.SpaceFromContext(ctx)
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		d := dispatcher.FromSpace(space)
 		if d == nil {
 			return newError("no dispatcher in space")

+ 1 - 1
app/proxyman/outbound/handler.go

@@ -33,7 +33,7 @@ func NewHandler(ctx context.Context, config *proxyman.OutboundHandlerConfig) (*H
 	if space == nil {
 		return nil, newError("no space in context")
 	}
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		ohm := proxyman.OutboundHandlerManagerFromSpace(space)
 		if ohm == nil {
 			return newError("no OutboundManager in space")

+ 1 - 1
app/router/router.go

@@ -33,7 +33,7 @@ func NewRouter(ctx context.Context, config *Config) (*Router, error) {
 		rules:          make([]Rule, len(config.Rule)),
 	}
 
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		for idx, rule := range config.Rule {
 			r.rules[idx].Tag = rule.Tag
 			cond, err := rule.BuildCondition()

+ 17 - 19
app/space.go

@@ -5,6 +5,7 @@ import (
 	"reflect"
 
 	"v2ray.com/core/common"
+	"v2ray.com/core/common/event"
 )
 
 type Application interface {
@@ -13,8 +14,6 @@ type Application interface {
 	Close()
 }
 
-type InitializationCallback func() error
-
 func CreateAppFromConfig(ctx context.Context, config interface{}) (Application, error) {
 	application, err := common.CreateObject(ctx, config)
 	if err != nil {
@@ -29,46 +28,45 @@ func CreateAppFromConfig(ctx context.Context, config interface{}) (Application,
 }
 
 // A Space contains all apps that may be available in a V2Ray runtime.
-// Caller must check the availability of an app by calling HasXXX before getting its instance.
 type Space interface {
+	event.Registry
 	GetApplication(appInterface interface{}) Application
 	AddApplication(application Application) error
 	Initialize() error
-	OnInitialize(InitializationCallback)
 	Start() error
 	Close()
 }
 
+const (
+	SpaceInitializing event.Event = iota
+)
+
 type spaceImpl struct {
-	initialized bool
+	event.Listener
 	cache       map[reflect.Type]Application
-	appInit     []InitializationCallback
+	initialized bool
 }
 
 func NewSpace() Space {
 	return &spaceImpl{
-		cache:   make(map[reflect.Type]Application),
-		appInit: make([]InitializationCallback, 0, 32),
+		cache: make(map[reflect.Type]Application),
 	}
 }
 
-func (s *spaceImpl) OnInitialize(f InitializationCallback) {
-	if s.initialized {
-		f()
-	} else {
-		s.appInit = append(s.appInit, f)
+func (s *spaceImpl) On(e event.Event, h event.Handler) {
+	if e == SpaceInitializing && s.initialized {
+		_ = h(nil) // Ignore error
+		return
 	}
+	s.Listener.On(e, h)
 }
 
 func (s *spaceImpl) Initialize() error {
-	for _, f := range s.appInit {
-		if err := f(); err != nil {
-			return err
-		}
+	if s.initialized {
+		return nil
 	}
-	s.appInit = nil
 	s.initialized = true
-	return nil
+	return s.Fire(SpaceInitializing, nil)
 }
 
 func (s *spaceImpl) GetApplication(appInterface interface{}) Application {

+ 46 - 0
common/event/event.go

@@ -0,0 +1,46 @@
+package event
+
+import "sync"
+
+type Event uint16
+
+type Handler func(data interface{}) error
+
+type Registry interface {
+	On(Event, Handler)
+}
+
+type Listener struct {
+	sync.RWMutex
+	events map[Event][]Handler
+}
+
+func (l *Listener) On(e Event, h Handler) {
+	l.Lock()
+	defer l.Unlock()
+
+	if l.events == nil {
+		l.events = make(map[Event][]Handler)
+	}
+
+	handlers := l.events[e]
+	handlers = append(handlers, h)
+	l.events[e] = handlers
+}
+
+func (l *Listener) Fire(e Event, data interface{}) error {
+	l.RLock()
+	defer l.RUnlock()
+
+	if l.events == nil {
+		return nil
+	}
+
+	for _, h := range l.events[e] {
+		if err := h(data); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}

+ 1 - 1
proxy/dokodemo/dokodemo.go

@@ -38,7 +38,7 @@ func New(ctx context.Context, config *Config) (*DokodemoDoor, error) {
 		address: config.GetPredefinedAddress(),
 		port:    net.Port(config.Port),
 	}
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		pm := policy.FromSpace(space)
 		if pm == nil {
 			return newError("Policy not found in space.")

+ 1 - 1
proxy/freedom/freedom.go

@@ -40,7 +40,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
 		timeout:        config.Timeout,
 		destOverride:   config.DestinationOverride,
 	}
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		if config.DomainStrategy == Config_USE_IP {
 			f.dns = dns.FromSpace(space)
 			if f.dns == nil {

+ 1 - 1
proxy/http/server.go

@@ -37,7 +37,7 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
 	s := &Server{
 		config: config,
 	}
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		pm := policy.FromSpace(space)
 		if pm == nil {
 			return newError("Policy not found in space.")

+ 1 - 1
proxy/shadowsocks/client.go

@@ -39,7 +39,7 @@ func NewClient(ctx context.Context, config *ClientConfig) (*Client, error) {
 	if space == nil {
 		return nil, newError("Space not found.")
 	}
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		pm := policy.FromSpace(space)
 		if pm == nil {
 			return newError("Policy not found in space.")

+ 1 - 1
proxy/shadowsocks/server.go

@@ -47,7 +47,7 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
 		account: account,
 	}
 
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		pm := policy.FromSpace(space)
 		if pm == nil {
 			return newError("Policy not found in space.")

+ 1 - 1
proxy/socks/server.go

@@ -34,7 +34,7 @@ func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
 	s := &Server{
 		config: config,
 	}
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		pm := policy.FromSpace(space)
 		if pm == nil {
 			return newError("Policy not found in space.")

+ 1 - 1
proxy/vmess/inbound/inbound.go

@@ -103,7 +103,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
 		sessionHistory: encoding.NewSessionHistory(ctx),
 	}
 
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		handler.inboundHandlerManager = proxyman.InboundHandlerManagerFromSpace(space)
 		if handler.inboundHandlerManager == nil {
 			return newError("InboundHandlerManager is not found is space.")

+ 1 - 1
proxy/vmess/outbound/outbound.go

@@ -44,7 +44,7 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
 		serverPicker: protocol.NewRoundRobinServerPicker(serverList),
 	}
 
-	space.OnInitialize(func() error {
+	space.On(app.SpaceInitializing, func(interface{}) error {
 		pm := policy.FromSpace(space)
 		if pm == nil {
 			return newError("Policy is not found in space.")