Browse Source

stats counter for inbound traffic

Darien Raymond 7 years ago
parent
commit
c76d492c0f

+ 2 - 14
app/dispatcher/default.go

@@ -52,18 +52,6 @@ func (*DefaultDispatcher) Start() error {
 // Close implements common.Closable.
 func (*DefaultDispatcher) Close() error { return nil }
 
-func (d *DefaultDispatcher) getStatCounter(name string) core.StatCounter {
-	c := d.stats.GetCounter(name)
-	if c != nil {
-		return c
-	}
-	c, err := d.stats.RegisterCounter(name)
-	if err != nil {
-		return nil
-	}
-	return c
-}
-
 func (d *DefaultDispatcher) getRayOption(user *protocol.User) []ray.Option {
 	var rayOptions []ray.Option
 
@@ -71,13 +59,13 @@ func (d *DefaultDispatcher) getRayOption(user *protocol.User) []ray.Option {
 		p := d.policy.ForLevel(user.Level)
 		if p.Stats.UserUplink {
 			name := "user>>>" + user.Email + ">>>traffic>>>uplink"
-			if c := d.getStatCounter(name); c != nil {
+			if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil {
 				rayOptions = append(rayOptions, ray.WithUplinkStatCounter(c))
 			}
 		}
 		if p.Stats.UserDownlink {
 			name := "user>>>" + user.Email + ">>>traffic>>>downlink"
-			if c := d.getStatCounter(name); c != nil {
+			if c, _ := core.GetOrRegisterStatCounter(d.stats, name); c != nil {
 				rayOptions = append(rayOptions, ray.WithDownlinkStatCounter(c))
 			}
 		}

+ 9 - 0
app/policy/config.go

@@ -67,3 +67,12 @@ func (p *Policy) ToCorePolicy() core.Policy {
 	}
 	return cp
 }
+
+func (p *SystemPolicy) ToCorePolicy() core.SystemPolicy {
+	return core.SystemPolicy{
+		Stats: core.SystemStatsPolicy{
+			InboundUplink:   p.Stats.InboundUplink,
+			InboundDownlink: p.Stats.InboundDownlink,
+		},
+	}
+}

+ 83 - 29
app/policy/config.pb.go

@@ -120,14 +120,55 @@ func (m *Policy_Stats) GetUserDownlink() bool {
 	return false
 }
 
+type SystemPolicy struct {
+	Stats *SystemPolicy_Stats `protobuf:"bytes,1,opt,name=stats" json:"stats,omitempty"`
+}
+
+func (m *SystemPolicy) Reset()                    { *m = SystemPolicy{} }
+func (m *SystemPolicy) String() string            { return proto.CompactTextString(m) }
+func (*SystemPolicy) ProtoMessage()               {}
+func (*SystemPolicy) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+
+func (m *SystemPolicy) GetStats() *SystemPolicy_Stats {
+	if m != nil {
+		return m.Stats
+	}
+	return nil
+}
+
+type SystemPolicy_Stats struct {
+	InboundUplink   bool `protobuf:"varint,1,opt,name=inbound_uplink,json=inboundUplink" json:"inbound_uplink,omitempty"`
+	InboundDownlink bool `protobuf:"varint,2,opt,name=inbound_downlink,json=inboundDownlink" json:"inbound_downlink,omitempty"`
+}
+
+func (m *SystemPolicy_Stats) Reset()                    { *m = SystemPolicy_Stats{} }
+func (m *SystemPolicy_Stats) String() string            { return proto.CompactTextString(m) }
+func (*SystemPolicy_Stats) ProtoMessage()               {}
+func (*SystemPolicy_Stats) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2, 0} }
+
+func (m *SystemPolicy_Stats) GetInboundUplink() bool {
+	if m != nil {
+		return m.InboundUplink
+	}
+	return false
+}
+
+func (m *SystemPolicy_Stats) GetInboundDownlink() bool {
+	if m != nil {
+		return m.InboundDownlink
+	}
+	return false
+}
+
 type Config struct {
-	Level map[uint32]*Policy `protobuf:"bytes,1,rep,name=level" json:"level,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+	Level  map[uint32]*Policy `protobuf:"bytes,1,rep,name=level" json:"level,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+	System *SystemPolicy      `protobuf:"bytes,2,opt,name=system" json:"system,omitempty"`
 }
 
 func (m *Config) Reset()                    { *m = Config{} }
 func (m *Config) String() string            { return proto.CompactTextString(m) }
 func (*Config) ProtoMessage()               {}
-func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
+func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
 
 func (m *Config) GetLevel() map[uint32]*Policy {
 	if m != nil {
@@ -136,42 +177,55 @@ func (m *Config) GetLevel() map[uint32]*Policy {
 	return nil
 }
 
+func (m *Config) GetSystem() *SystemPolicy {
+	if m != nil {
+		return m.System
+	}
+	return nil
+}
+
 func init() {
 	proto.RegisterType((*Second)(nil), "v2ray.core.app.policy.Second")
 	proto.RegisterType((*Policy)(nil), "v2ray.core.app.policy.Policy")
 	proto.RegisterType((*Policy_Timeout)(nil), "v2ray.core.app.policy.Policy.Timeout")
 	proto.RegisterType((*Policy_Stats)(nil), "v2ray.core.app.policy.Policy.Stats")
+	proto.RegisterType((*SystemPolicy)(nil), "v2ray.core.app.policy.SystemPolicy")
+	proto.RegisterType((*SystemPolicy_Stats)(nil), "v2ray.core.app.policy.SystemPolicy.Stats")
 	proto.RegisterType((*Config)(nil), "v2ray.core.app.policy.Config")
 }
 
 func init() { proto.RegisterFile("v2ray.com/core/app/policy/config.proto", fileDescriptor0) }
 
 var fileDescriptor0 = []byte{
-	// 410 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0x5d, 0xab, 0xd3, 0x30,
-	0x18, 0xc7, 0x69, 0x6b, 0x7b, 0x8e, 0x4f, 0xcf, 0x54, 0x82, 0x07, 0xea, 0x40, 0x3d, 0x6c, 0x28,
-	0xbb, 0x4a, 0xa1, 0xbb, 0xf1, 0x05, 0x27, 0xce, 0x17, 0x10, 0x14, 0x47, 0xe6, 0x0b, 0x78, 0x33,
-	0x62, 0x1a, 0x5d, 0x59, 0x96, 0x84, 0xbe, 0x4c, 0xfa, 0x35, 0xfc, 0x06, 0xde, 0xfa, 0xc9, 0xfc,
-	0x18, 0xd2, 0xa4, 0xa5, 0x37, 0xdb, 0xdc, 0x5d, 0xfa, 0xf0, 0xfb, 0xff, 0x78, 0x12, 0xfe, 0x85,
-	0x87, 0xbb, 0x24, 0xa7, 0x35, 0x66, 0x6a, 0x1b, 0x33, 0x95, 0xf3, 0x98, 0x6a, 0x1d, 0x6b, 0x25,
-	0x32, 0x56, 0xc7, 0x4c, 0xc9, 0xef, 0xd9, 0x0f, 0xac, 0x73, 0x55, 0x2a, 0x74, 0xd9, 0x71, 0x39,
-	0xc7, 0x54, 0x6b, 0x6c, 0x99, 0xd1, 0x3d, 0x08, 0x96, 0x9c, 0x29, 0x99, 0xa2, 0xdb, 0xe0, 0xef,
-	0xa8, 0xa8, 0x78, 0xe4, 0x5c, 0x39, 0x93, 0x01, 0xb1, 0x1f, 0xa3, 0xbf, 0x1e, 0x04, 0x0b, 0x83,
-	0xa2, 0xe7, 0x70, 0x56, 0x66, 0x5b, 0xae, 0xaa, 0xd2, 0x20, 0x61, 0xf2, 0x00, 0xef, 0x75, 0x62,
-	0xcb, 0xe3, 0x8f, 0x16, 0x26, 0x5d, 0x0a, 0x3d, 0x06, 0xbf, 0x28, 0x69, 0x59, 0x44, 0xae, 0x89,
-	0x8f, 0x8f, 0xc7, 0x97, 0x0d, 0x4a, 0x6c, 0x62, 0xf8, 0xcb, 0x85, 0xb3, 0xd6, 0x87, 0x9e, 0xc2,
-	0xf5, 0x35, 0x95, 0x69, 0xb1, 0xa6, 0x1b, 0xde, 0x6e, 0x72, 0xf7, 0x80, 0xca, 0x5e, 0x8d, 0xf4,
-	0x3c, 0x7a, 0x03, 0x37, 0x99, 0x92, 0x92, 0xb3, 0x32, 0x53, 0x72, 0x95, 0xa5, 0x82, 0xb7, 0xdb,
-	0xfc, 0x47, 0x71, 0xa3, 0x4f, 0xbd, 0x4d, 0x05, 0x47, 0x33, 0x08, 0x2b, 0x2d, 0x32, 0xb9, 0x59,
-	0x29, 0x29, 0xea, 0xc8, 0x3b, 0xc5, 0x01, 0x36, 0xf1, 0x41, 0x8a, 0x1a, 0xcd, 0x61, 0x90, 0xaa,
-	0x9f, 0xb2, 0x37, 0x5c, 0x3b, 0xc5, 0x70, 0xd1, 0x65, 0x1a, 0xc7, 0xf0, 0x3d, 0xf8, 0xe6, 0x91,
-	0xd0, 0x7d, 0x08, 0xab, 0x82, 0xe7, 0x2b, 0xeb, 0x37, 0x6f, 0x72, 0x4e, 0xa0, 0x19, 0x7d, 0x32,
-	0x13, 0x34, 0x86, 0x81, 0x01, 0xba, 0xb8, 0xb9, 0xf3, 0x39, 0xb9, 0x68, 0x86, 0xaf, 0xda, 0xd9,
-	0xe8, 0xb7, 0x03, 0xc1, 0x4b, 0x53, 0x19, 0x34, 0x03, 0x5f, 0xf0, 0x1d, 0x17, 0x91, 0x73, 0xe5,
-	0x4d, 0xc2, 0x64, 0x72, 0x60, 0x2b, 0x4b, 0xe3, 0x77, 0x0d, 0xfa, 0x5a, 0x96, 0x79, 0x4d, 0x6c,
-	0x6c, 0xf8, 0x05, 0xa0, 0x1f, 0xa2, 0x5b, 0xe0, 0x6d, 0x78, 0xdd, 0xf6, 0xaa, 0x39, 0xa2, 0x69,
-	0xd7, 0xb5, 0xe3, 0x6f, 0x6f, 0x9b, 0xd0, 0x56, 0xf1, 0x89, 0xfb, 0xc8, 0x99, 0x3f, 0x83, 0x3b,
-	0x4c, 0x6d, 0xf7, 0xe3, 0x0b, 0xe7, 0x6b, 0x60, 0x4f, 0x7f, 0xdc, 0xcb, 0xcf, 0x09, 0xa1, 0xcd,
-	0x82, 0x39, 0xc7, 0x2f, 0xb4, 0x6e, 0x4d, 0xdf, 0x02, 0xf3, 0x2f, 0x4c, 0xff, 0x05, 0x00, 0x00,
-	0xff, 0xff, 0x98, 0xa8, 0x2f, 0xbe, 0x35, 0x03, 0x00, 0x00,
+	// 478 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x93, 0xdf, 0x6e, 0xd3, 0x30,
+	0x18, 0xc5, 0x95, 0x96, 0x66, 0xe3, 0x6b, 0xbb, 0x4d, 0x16, 0x93, 0x4a, 0x25, 0x60, 0xea, 0x34,
+	0xd4, 0xdd, 0xb8, 0x52, 0x76, 0x03, 0x4c, 0x0c, 0x31, 0xfe, 0x48, 0x48, 0x20, 0x26, 0x97, 0x3f,
+	0x82, 0x9b, 0xca, 0x73, 0x0c, 0x8b, 0xea, 0xd8, 0x56, 0xe2, 0x14, 0xe5, 0x35, 0x78, 0x8c, 0x3d,
+	0x14, 0xd7, 0x3c, 0x06, 0x8a, 0xed, 0x2c, 0x1b, 0x5a, 0xb7, 0xde, 0x25, 0x47, 0xbf, 0x73, 0x74,
+	0x3e, 0xdb, 0x1f, 0x3c, 0x5e, 0x44, 0x19, 0x2d, 0x31, 0x53, 0xe9, 0x84, 0xa9, 0x8c, 0x4f, 0xa8,
+	0xd6, 0x13, 0xad, 0x44, 0xc2, 0xca, 0x09, 0x53, 0xf2, 0x47, 0xf2, 0x13, 0xeb, 0x4c, 0x19, 0x85,
+	0xb6, 0x6b, 0x2e, 0xe3, 0x98, 0x6a, 0x8d, 0x1d, 0x33, 0x7a, 0x08, 0xe1, 0x94, 0x33, 0x25, 0x63,
+	0x74, 0x0f, 0x3a, 0x0b, 0x2a, 0x0a, 0x3e, 0x08, 0x76, 0x82, 0x71, 0x9f, 0xb8, 0x9f, 0xd1, 0xdf,
+	0x36, 0x84, 0x27, 0x16, 0x45, 0x2f, 0x60, 0xcd, 0x24, 0x29, 0x57, 0x85, 0xb1, 0x48, 0x37, 0xda,
+	0xc3, 0xd7, 0x66, 0x62, 0xc7, 0xe3, 0x4f, 0x0e, 0x26, 0xb5, 0x0b, 0x3d, 0x85, 0x4e, 0x6e, 0xa8,
+	0xc9, 0x07, 0x2d, 0x6b, 0xdf, 0xbd, 0xd9, 0x3e, 0xad, 0x50, 0xe2, 0x1c, 0xc3, 0xdf, 0x2d, 0x58,
+	0xf3, 0x79, 0xe8, 0x10, 0xee, 0x9e, 0x51, 0x19, 0xe7, 0x67, 0x74, 0xce, 0x7d, 0x93, 0x07, 0x4b,
+	0xa2, 0xdc, 0x68, 0xa4, 0xe1, 0xd1, 0x5b, 0xd8, 0x64, 0x4a, 0x4a, 0xce, 0x4c, 0xa2, 0xe4, 0x2c,
+	0x89, 0x05, 0xf7, 0x6d, 0x6e, 0x89, 0xd8, 0x68, 0x5c, 0xef, 0x62, 0xc1, 0xd1, 0x11, 0x74, 0x0b,
+	0x2d, 0x12, 0x39, 0x9f, 0x29, 0x29, 0xca, 0x41, 0x7b, 0x95, 0x0c, 0x70, 0x8e, 0x8f, 0x52, 0x94,
+	0xe8, 0x18, 0xfa, 0xb1, 0xfa, 0x25, 0x9b, 0x84, 0x3b, 0xab, 0x24, 0xf4, 0x6a, 0x4f, 0x95, 0x31,
+	0xfc, 0x00, 0x1d, 0x7b, 0x48, 0xe8, 0x11, 0x74, 0x8b, 0x9c, 0x67, 0x33, 0x97, 0x6f, 0xcf, 0x64,
+	0x9d, 0x40, 0x25, 0x7d, 0xb6, 0x0a, 0xda, 0x85, 0xbe, 0x05, 0x6a, 0xbb, 0x9d, 0x79, 0x9d, 0xf4,
+	0x2a, 0xf1, 0xb5, 0xd7, 0x46, 0xe7, 0x01, 0xf4, 0xa6, 0x65, 0x6e, 0x78, 0x7a, 0x71, 0xe1, 0xfe,
+	0xbe, 0xdc, 0x21, 0xef, 0x2f, 0xeb, 0x76, 0xc9, 0x73, 0xf5, 0xd6, 0xbe, 0xd5, 0x05, 0xf7, 0x60,
+	0x23, 0x91, 0xa7, 0xaa, 0x90, 0xf1, 0xd5, 0x8e, 0x7d, 0xaf, 0xfa, 0x9a, 0xfb, 0xb0, 0x55, 0x63,
+	0xff, 0x35, 0xdd, 0xf4, 0xfa, 0x45, 0xd9, 0x3f, 0x01, 0x84, 0xaf, 0xec, 0xfb, 0x46, 0x47, 0xd0,
+	0x11, 0x7c, 0xc1, 0xc5, 0x20, 0xd8, 0x69, 0x8f, 0xbb, 0xd1, 0x78, 0x49, 0x4d, 0x47, 0xe3, 0xf7,
+	0x15, 0xfa, 0x46, 0x9a, 0xac, 0x24, 0xce, 0x86, 0x0e, 0x21, 0xcc, 0xed, 0x08, 0xb7, 0xbc, 0xcb,
+	0xcb, 0x73, 0x12, 0x6f, 0x19, 0x7e, 0x05, 0x68, 0x12, 0xd1, 0x16, 0xb4, 0xe7, 0xbc, 0xf4, 0x1b,
+	0x54, 0x7d, 0xa2, 0x83, 0x7a, 0xab, 0x6e, 0x7e, 0x65, 0x3e, 0xd5, 0xb1, 0xcf, 0x5a, 0x4f, 0x82,
+	0xe3, 0xe7, 0x70, 0x9f, 0xa9, 0xf4, 0x7a, 0xfc, 0x24, 0xf8, 0x1e, 0xba, 0xaf, 0xf3, 0xd6, 0xf6,
+	0x97, 0x88, 0xd0, 0x6a, 0xba, 0x8c, 0xe3, 0x97, 0x5a, 0xfb, 0xa4, 0xd3, 0xd0, 0x6e, 0xfd, 0xc1,
+	0xbf, 0x00, 0x00, 0x00, 0xff, 0xff, 0xa9, 0x9f, 0x00, 0x31, 0x1f, 0x04, 0x00, 0x00,
 }

+ 10 - 0
app/policy/config.proto

@@ -28,6 +28,16 @@ message Policy {
   Stats stats = 2;
 }
 
+message SystemPolicy {
+  message Stats {
+    bool inbound_uplink = 1;
+    bool inbound_downlink = 2;
+  }
+
+  Stats stats = 1;
+}
+
 message Config {
   map<uint32, Policy> level = 1;
+  SystemPolicy system = 2;
 }

+ 9 - 0
app/policy/manager.go

@@ -10,12 +10,14 @@ import (
 // Instance is an instance of Policy manager.
 type Instance struct {
 	levels map[uint32]*Policy
+	system *SystemPolicy
 }
 
 // New creates new Policy manager instance.
 func New(ctx context.Context, config *Config) (*Instance, error) {
 	m := &Instance{
 		levels: make(map[uint32]*Policy),
+		system: config.System,
 	}
 	if len(config.Level) > 0 {
 		for lv, p := range config.Level {
@@ -43,6 +45,13 @@ func (m *Instance) ForLevel(level uint32) core.Policy {
 	return core.DefaultPolicy()
 }
 
+func (m *Instance) ForSystem() core.SystemPolicy {
+	if m.system == nil {
+		return core.SystemPolicy{}
+	}
+	return m.system.ToCorePolicy()
+}
+
 // Start implements common.Runnable.Start().
 func (m *Instance) Start() error {
 	return nil

+ 45 - 14
app/proxyman/inbound/always.go

@@ -3,6 +3,7 @@ package inbound
 import (
 	"context"
 
+	"v2ray.com/core"
 	"v2ray.com/core/app/proxyman"
 	"v2ray.com/core/app/proxyman/mux"
 	"v2ray.com/core/common"
@@ -11,6 +12,30 @@ import (
 	"v2ray.com/core/proxy"
 )
 
+func getStatCounter(v *core.Instance, tag string) (core.StatCounter, core.StatCounter) {
+	var uplinkCounter core.StatCounter
+	var downlinkCounter core.StatCounter
+
+	policy := v.PolicyManager()
+	stats := v.Stats()
+	if len(tag) > 0 && policy.ForSystem().Stats.InboundUplink {
+		name := "inbound>>>" + tag + ">>>traffic>>>uplink"
+		c, _ := core.GetOrRegisterStatCounter(stats, name)
+		if c != nil {
+			uplinkCounter = c
+		}
+	}
+	if len(tag) > 0 && policy.ForSystem().Stats.InboundDownlink {
+		name := "inbound>>>" + tag + ">>>traffic>>>downlink"
+		c, _ := core.GetOrRegisterStatCounter(stats, name)
+		if c != nil {
+			downlinkCounter = c
+		}
+	}
+
+	return uplinkCounter, downlinkCounter
+}
+
 type AlwaysOnInboundHandler struct {
 	proxy   proxy.Inbound
 	workers []worker
@@ -34,6 +59,8 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *
 		tag:   tag,
 	}
 
+	uplinkCounter, downlinkCounter := getStatCounter(core.MustFromContext(ctx), tag)
+
 	nl := p.Network()
 	pr := receiverConfig.PortRange
 	address := receiverConfig.Listen.AsAddress()
@@ -44,26 +71,30 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig *
 		if nl.HasNetwork(net.Network_TCP) {
 			newError("creating stream worker on ", address, ":", port).AtDebug().WriteToLog()
 			worker := &tcpWorker{
-				address:      address,
-				port:         net.Port(port),
-				proxy:        p,
-				stream:       receiverConfig.StreamSettings,
-				recvOrigDest: receiverConfig.ReceiveOriginalDestination,
-				tag:          tag,
-				dispatcher:   h.mux,
-				sniffers:     receiverConfig.DomainOverride,
+				address:         address,
+				port:            net.Port(port),
+				proxy:           p,
+				stream:          receiverConfig.StreamSettings,
+				recvOrigDest:    receiverConfig.ReceiveOriginalDestination,
+				tag:             tag,
+				dispatcher:      h.mux,
+				sniffers:        receiverConfig.DomainOverride,
+				uplinkCounter:   uplinkCounter,
+				downlinkCounter: downlinkCounter,
 			}
 			h.workers = append(h.workers, worker)
 		}
 
 		if nl.HasNetwork(net.Network_UDP) {
 			worker := &udpWorker{
-				tag:          tag,
-				proxy:        p,
-				address:      address,
-				port:         net.Port(port),
-				recvOrigDest: receiverConfig.ReceiveOriginalDestination,
-				dispatcher:   h.mux,
+				tag:             tag,
+				proxy:           p,
+				address:         address,
+				port:            net.Port(port),
+				recvOrigDest:    receiverConfig.ReceiveOriginalDestination,
+				dispatcher:      h.mux,
+				uplinkCounter:   uplinkCounter,
+				downlinkCounter: downlinkCounter,
 			}
 			h.workers = append(h.workers, worker)
 		}

+ 21 - 14
app/proxyman/inbound/dynamic.go

@@ -90,6 +90,9 @@ func (h *DynamicInboundHandler) refresh() error {
 	if address == nil {
 		address = net.AnyIP
 	}
+
+	uplinkCounter, downlinkCounter := getStatCounter(h.v, h.tag)
+
 	for i := uint32(0); i < concurrency; i++ {
 		port := h.allocatePort()
 		rawProxy, err := h.v.CreateObject(h.proxyConfig)
@@ -101,14 +104,16 @@ func (h *DynamicInboundHandler) refresh() error {
 		nl := p.Network()
 		if nl.HasNetwork(net.Network_TCP) {
 			worker := &tcpWorker{
-				tag:          h.tag,
-				address:      address,
-				port:         port,
-				proxy:        p,
-				stream:       h.receiverConfig.StreamSettings,
-				recvOrigDest: h.receiverConfig.ReceiveOriginalDestination,
-				dispatcher:   h.mux,
-				sniffers:     h.receiverConfig.DomainOverride,
+				tag:             h.tag,
+				address:         address,
+				port:            port,
+				proxy:           p,
+				stream:          h.receiverConfig.StreamSettings,
+				recvOrigDest:    h.receiverConfig.ReceiveOriginalDestination,
+				dispatcher:      h.mux,
+				sniffers:        h.receiverConfig.DomainOverride,
+				uplinkCounter:   uplinkCounter,
+				downlinkCounter: downlinkCounter,
 			}
 			if err := worker.Start(); err != nil {
 				newError("failed to create TCP worker").Base(err).AtWarning().WriteToLog()
@@ -119,12 +124,14 @@ func (h *DynamicInboundHandler) refresh() error {
 
 		if nl.HasNetwork(net.Network_UDP) {
 			worker := &udpWorker{
-				tag:          h.tag,
-				proxy:        p,
-				address:      address,
-				port:         port,
-				recvOrigDest: h.receiverConfig.ReceiveOriginalDestination,
-				dispatcher:   h.mux,
+				tag:             h.tag,
+				proxy:           p,
+				address:         address,
+				port:            port,
+				recvOrigDest:    h.receiverConfig.ReceiveOriginalDestination,
+				dispatcher:      h.mux,
+				uplinkCounter:   uplinkCounter,
+				downlinkCounter: downlinkCounter,
 			}
 			if err := worker.Start(); err != nil {
 				newError("failed to create UDP worker").Base(err).AtWarning().WriteToLog()

+ 40 - 19
app/proxyman/inbound/worker.go

@@ -7,13 +7,12 @@ import (
 	"sync/atomic"
 	"time"
 
-	"v2ray.com/core/common/session"
-
 	"v2ray.com/core"
 	"v2ray.com/core/app/proxyman"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
@@ -29,14 +28,16 @@ type worker interface {
 }
 
 type tcpWorker struct {
-	address      net.Address
-	port         net.Port
-	proxy        proxy.Inbound
-	stream       *internet.StreamConfig
-	recvOrigDest bool
-	tag          string
-	dispatcher   core.Dispatcher
-	sniffers     []proxyman.KnownProtocols
+	address         net.Address
+	port            net.Port
+	proxy           proxy.Inbound
+	stream          *internet.StreamConfig
+	recvOrigDest    bool
+	tag             string
+	dispatcher      core.Dispatcher
+	sniffers        []proxyman.KnownProtocols
+	uplinkCounter   core.StatCounter
+	downlinkCounter core.StatCounter
 
 	hub internet.Listener
 }
@@ -63,6 +64,13 @@ func (w *tcpWorker) callback(conn internet.Connection) {
 	if len(w.sniffers) > 0 {
 		ctx = proxyman.ContextWithProtocolSniffers(ctx, w.sniffers)
 	}
+	if w.uplinkCounter != nil || w.downlinkCounter != nil {
+		conn = &internet.StatCouterConnection{
+			Connection: conn,
+			Uplink:     w.uplinkCounter,
+			Downlink:   w.downlinkCounter,
+		}
+	}
 	if err := w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher); err != nil {
 		newError("connection ends").Base(err).WithContext(ctx).WriteToLog()
 	}
@@ -108,6 +116,8 @@ type udpConn struct {
 	remote           net.Addr
 	local            net.Addr
 	done             *signal.Done
+	uplink           core.StatCounter
+	downlink         core.StatCounter
 }
 
 func (c *udpConn) updateActivity() {
@@ -119,7 +129,11 @@ func (c *udpConn) Read(buf []byte) (int, error) {
 	case in := <-c.input:
 		defer in.Release()
 		c.updateActivity()
-		return copy(buf, in.Bytes()), nil
+		nBytes := copy(buf, in.Bytes())
+		if c.uplink != nil {
+			c.uplink.Add(int64(nBytes))
+		}
+		return nBytes, nil
 	case <-c.done.C():
 		return 0, io.EOF
 	}
@@ -128,6 +142,9 @@ func (c *udpConn) Read(buf []byte) (int, error) {
 // Write implements io.Writer.
 func (c *udpConn) Write(buf []byte) (int, error) {
 	n, err := c.output(buf)
+	if c.downlink != nil {
+		c.downlink.Add(int64(n))
+	}
 	if err == nil {
 		c.updateActivity()
 	}
@@ -167,13 +184,15 @@ type connID struct {
 type udpWorker struct {
 	sync.RWMutex
 
-	proxy        proxy.Inbound
-	hub          *udp.Hub
-	address      net.Address
-	port         net.Port
-	recvOrigDest bool
-	tag          string
-	dispatcher   core.Dispatcher
+	proxy           proxy.Inbound
+	hub             *udp.Hub
+	address         net.Address
+	port            net.Port
+	recvOrigDest    bool
+	tag             string
+	dispatcher      core.Dispatcher
+	uplinkCounter   core.StatCounter
+	downlinkCounter core.StatCounter
 
 	done       *signal.Done
 	activeConn map[connID]*udpConn
@@ -200,7 +219,9 @@ func (w *udpWorker) getConnection(id connID) (*udpConn, bool) {
 			IP:   w.address.IP(),
 			Port: int(w.port),
 		},
-		done: signal.NewDone(),
+		done:     signal.NewDone(),
+		uplink:   w.uplinkCounter,
+		downlink: w.downlinkCounter,
 	}
 	w.activeConn[id] = conn
 

+ 16 - 2
policy.go

@@ -27,6 +27,17 @@ type StatsPolicy struct {
 	UserDownlink bool
 }
 
+type SystemStatsPolicy struct {
+	// Whether or not to enable stat counter for uplink traffic in inbound handlers.
+	InboundUplink bool
+	// Whether or not to enable stat counter for downlink traffic in inbound handlers.
+	InboundDownlink bool
+}
+
+type SystemPolicy struct {
+	Stats SystemStatsPolicy
+}
+
 // Policy is session based settings for controlling V2Ray requests. It contains various settings (or limits) that may differ for different users in the context.
 type Policy struct {
 	Timeouts TimeoutPolicy // Timeout settings
@@ -39,6 +50,9 @@ type PolicyManager interface {
 
 	// ForLevel returns the Policy for the given user level.
 	ForLevel(level uint32) Policy
+
+	// ForSystem returns the Policy for V2Ray system.
+	ForSystem() SystemPolicy
 }
 
 // DefaultPolicy returns the Policy when user is not specified.
@@ -47,8 +61,8 @@ func DefaultPolicy() Policy {
 		Timeouts: TimeoutPolicy{
 			Handshake:      time.Second * 4,
 			ConnectionIdle: time.Second * 300,
-			UplinkOnly:     time.Second * 5,
-			DownlinkOnly:   time.Second * 30,
+			UplinkOnly:     time.Second * 2,
+			DownlinkOnly:   time.Second * 5,
 		},
 		Stats: StatsPolicy{
 			UserUplink:   false,

+ 10 - 0
stats.go

@@ -17,6 +17,16 @@ type StatManager interface {
 	GetCounter(string) StatCounter
 }
 
+// GetOrRegisterStatCounter tries to get the StatCounter first. If not exist, it then tries to create a new counter.
+func GetOrRegisterStatCounter(m StatManager, name string) (StatCounter, error) {
+	counter := m.GetCounter(name)
+	if counter != nil {
+		return counter, nil
+	}
+
+	return m.RegisterCounter(name)
+}
+
 type syncStatManager struct {
 	sync.RWMutex
 	StatManager

+ 13 - 0
testing/scenarios/command_test.go

@@ -395,10 +395,16 @@ func TestCommanderStats(t *testing.T) {
 						},
 					},
 				},
+				System: &policy.SystemPolicy{
+					Stats: &policy.SystemPolicy_Stats{
+						InboundUplink: true,
+					},
+				},
 			}),
 		},
 		Inbound: []*core.InboundHandlerConfig{
 			{
+				Tag: "vmess",
 				ReceiverSettings: serial.ToTypedMessage(&proxyman.ReceiverConfig{
 					PortRange: net.SinglePortRange(serverPort),
 					Listen:    net.NewIPOrDomain(net.LocalHostIP),
@@ -521,5 +527,12 @@ func TestCommanderStats(t *testing.T) {
 	assert(sresp.Stat.Name, Equals, name)
 	assert(sresp.Stat.Value, Equals, int64(0))
 
+	sresp, err = sClient.GetStats(context.Background(), &statscmd.GetStatsRequest{
+		Name:   "inbound>>>vmess>>>traffic>>>uplink",
+		Reset_: true,
+	})
+	assert(err, IsNil)
+	assert(sresp.Stat.Value, Equals, int64(10240*1024))
+
 	CloseAllServers(servers)
 }

+ 27 - 0
transport/internet/connection.go

@@ -7,3 +7,30 @@ import (
 type Connection interface {
 	net.Conn
 }
+
+type addInt64 interface {
+	Add(int64) int64
+}
+
+type StatCouterConnection struct {
+	Connection
+	Uplink   addInt64
+	Downlink addInt64
+}
+
+func (c *StatCouterConnection) Read(b []byte) (int, error) {
+	nBytes, err := c.Connection.Read(b)
+	if c.Uplink != nil {
+		c.Uplink.Add(int64(nBytes))
+	}
+
+	return nBytes, err
+}
+
+func (c *StatCouterConnection) Write(b []byte) (int, error) {
+	nBytes, err := c.Connection.Write(b)
+	if c.Downlink != nil {
+		c.Downlink.Add(int64(nBytes))
+	}
+	return nBytes, err
+}