Browse Source

fix lint warnings

Darien Raymond 7 years ago
parent
commit
adade2bffd

+ 2 - 1
.vscode/settings.json

@@ -7,7 +7,8 @@
     "--enable-gc",
     "--no-config",
     "--exclude=.*\\.pb\\.go",
-    "--disable=gas"
+    "--disable=gas",
+    "--disable=gocyclo"
   ],
   "go.formatTool": "goimports",
 

+ 1 - 1
app/commander/commander.go

@@ -50,7 +50,7 @@ func (c *Commander) Start() error {
 		if err != nil {
 			return err
 		}
-		rawService, err := c.v.CreateObject(config)
+		rawService, err := core.CreateObject(c.v, config)
 		if err != nil {
 			return err
 		}

+ 2 - 2
app/proxyman/command/command.go

@@ -62,7 +62,7 @@ type handlerServer struct {
 }
 
 func (s *handlerServer) AddInbound(ctx context.Context, request *AddInboundRequest) (*AddInboundResponse, error) {
-	rawHandler, err := s.s.CreateObject(request.Inbound)
+	rawHandler, err := core.CreateObject(s.s, request.Inbound)
 	if err != nil {
 		return nil, err
 	}
@@ -96,7 +96,7 @@ func (s *handlerServer) AlterInbound(ctx context.Context, request *AlterInboundR
 }
 
 func (s *handlerServer) AddOutbound(ctx context.Context, request *AddOutboundRequest) (*AddOutboundResponse, error) {
-	rawHandler, err := s.s.CreateObject(request.Outbound)
+	rawHandler, err := core.CreateObject(s.s, request.Outbound)
 	if err != nil {
 		return nil, err
 	}

+ 11 - 2
app/proxyman/inbound/always.go

@@ -9,6 +9,7 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/dice"
 	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/serial"
 	"v2ray.com/core/proxy"
 )
 
@@ -113,10 +114,18 @@ func (h *AlwaysOnInboundHandler) Start() error {
 }
 
 func (h *AlwaysOnInboundHandler) Close() error {
+	var errors []interface{}
 	for _, worker := range h.workers {
-		worker.Close()
+		if err := worker.Close(); err != nil {
+			errors = append(errors, err)
+		}
+	}
+	if err := h.mux.Close(); err != nil {
+		errors = append(errors, err)
+	}
+	if len(errors) > 0 {
+		return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
 	}
-	h.mux.Close()
 	return nil
 }
 

+ 4 - 2
app/proxyman/inbound/dynamic.go

@@ -69,7 +69,9 @@ func (h *DynamicInboundHandler) closeWorkers(workers []worker) {
 	ports2Del := make([]net.Port, len(workers))
 	for idx, worker := range workers {
 		ports2Del[idx] = worker.Port()
-		worker.Close()
+		if err := worker.Close(); err != nil {
+			newError("failed to close worker").Base(err).WriteToLog()
+		}
 	}
 
 	h.portMutex.Lock()
@@ -95,7 +97,7 @@ func (h *DynamicInboundHandler) refresh() error {
 
 	for i := uint32(0); i < concurrency; i++ {
 		port := h.allocatePort()
-		rawProxy, err := h.v.CreateObject(h.proxyConfig)
+		rawProxy, err := core.CreateObject(h.v, h.proxyConfig)
 		if err != nil {
 			newError("failed to create proxy instance").Base(err).AtWarning().WriteToLog()
 			continue

+ 12 - 2
app/proxyman/inbound/inbound.go

@@ -9,6 +9,7 @@ import (
 	"v2ray.com/core"
 	"v2ray.com/core/app/proxyman"
 	"v2ray.com/core/common"
+	"v2ray.com/core/common/serial"
 )
 
 // Manager is to manage all inbound handlers.
@@ -110,11 +111,20 @@ func (m *Manager) Close() error {
 
 	m.running = false
 
+	var errors []interface{}
 	for _, handler := range m.taggedHandlers {
-		handler.Close()
+		if err := handler.Close(); err != nil {
+			errors = append(errors, err)
+		}
 	}
 	for _, handler := range m.untaggedHandler {
-		handler.Close()
+		if err := handler.Close(); err != nil {
+			errors = append(errors, err)
+		}
+	}
+
+	if len(errors) > 0 {
+		return newError("failed to close all handlers").Base(newError(serial.Concat(errors...)))
 	}
 
 	return nil

+ 56 - 31
app/proxyman/inbound/worker.go

@@ -12,8 +12,10 @@ import (
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/net"
+	"v2ray.com/core/common/serial"
 	"v2ray.com/core/common/session"
 	"v2ray.com/core/common/signal/done"
+	"v2ray.com/core/common/task"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/tcp"
@@ -97,9 +99,17 @@ func (w *tcpWorker) Start() error {
 }
 
 func (w *tcpWorker) Close() error {
+	var errors []interface{}
 	if w.hub != nil {
-		common.Close(w.hub)
-		common.Close(w.proxy)
+		if err := common.Close(w.hub); err != nil {
+			errors = append(errors, err)
+		}
+		if err := common.Close(w.proxy); err != nil {
+			errors = append(errors, err)
+		}
+	}
+	if len(errors) > 0 {
+		return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
 	}
 
 	return nil
@@ -227,7 +237,7 @@ type udpWorker struct {
 	uplinkCounter   core.StatCounter
 	downlinkCounter core.StatCounter
 
-	done       *done.Instance
+	checker    *task.Periodic
 	activeConn map[connID]*udpConn
 }
 
@@ -295,7 +305,7 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest
 			if err := w.proxy.Process(ctx, net.Network_UDP, conn, w.dispatcher); err != nil {
 				newError("connection ends").Base(err).WriteToLog()
 			}
-			conn.Close()
+			conn.Close() // nolint: errcheck
 			w.removeConn(id)
 		}()
 	}
@@ -309,12 +319,37 @@ func (w *udpWorker) removeConn(id connID) {
 
 func (w *udpWorker) Start() error {
 	w.activeConn = make(map[connID]*udpConn, 16)
-	w.done = done.New()
 	h, err := udp.ListenUDP(w.address, w.port, w.callback, udp.HubReceiveOriginalDestination(w.recvOrigDest), udp.HubCapacity(256))
 	if err != nil {
 		return err
 	}
-	go w.monitor()
+	w.checker = &task.Periodic{
+		Interval: time.Second * 16,
+		Execute: func() error {
+			nowSec := time.Now().Unix()
+			w.Lock()
+			if len(w.activeConn) == 0 {
+				return nil
+			}
+
+			for addr, conn := range w.activeConn {
+				if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 8 {
+					delete(w.activeConn, addr)
+					conn.Close() // nolint: errcheck
+				}
+			}
+
+			if len(w.activeConn) == 0 {
+				w.activeConn = make(map[connID]*udpConn, 16)
+			}
+			w.Unlock()
+
+			return nil
+		},
+	}
+	if err := w.checker.Start(); err != nil {
+		return err
+	}
 	w.hub = h
 	return nil
 }
@@ -323,38 +358,28 @@ func (w *udpWorker) Close() error {
 	w.Lock()
 	defer w.Unlock()
 
+	var errors []interface{}
+
 	if w.hub != nil {
-		w.hub.Close()
+		if err := w.hub.Close(); err != nil {
+			errors = append(errors, err)
+		}
 	}
 
-	if w.done != nil {
-		common.Must(w.done.Close())
+	if w.checker != nil {
+		if err := w.checker.Close(); err != nil {
+			errors = append(errors, err)
+		}
 	}
 
-	common.Close(w.proxy)
-	return nil
-}
-
-func (w *udpWorker) monitor() {
-	timer := time.NewTicker(time.Second * 16)
-	defer timer.Stop()
+	if err := common.Close(w.proxy); err != nil {
+		errors = append(errors, err)
+	}
 
-	for {
-		select {
-		case <-w.done.Wait():
-			return
-		case <-timer.C:
-			nowSec := time.Now().Unix()
-			w.Lock()
-			for addr, conn := range w.activeConn {
-				if nowSec-atomic.LoadInt64(&conn.lastActivityTime) > 8 {
-					delete(w.activeConn, addr)
-					conn.Close()
-				}
-			}
-			w.Unlock()
-		}
+	if len(errors) > 0 {
+		return newError("failed to close all resources").Base(newError(serial.Concat(errors...)))
 	}
+	return nil
 }
 
 func (w *udpWorker) Port() net.Port {

+ 3 - 1
app/proxyman/mux/mux.go

@@ -327,10 +327,12 @@ func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*core.Link
 	return &core.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
 }
 
+// Start implements common.Runnable.
 func (s *Server) Start() error {
 	return nil
 }
 
+// Close implements common.Closable.
 func (s *Server) Close() error {
 	return nil
 }
@@ -463,7 +465,7 @@ func (w *ServerWorker) run(ctx context.Context) {
 	input := w.link.Reader
 	reader := &buf.BufferedReader{Reader: input}
 
-	defer w.sessionManager.Close()
+	defer w.sessionManager.Close() // nolint: errcheck
 
 	for {
 		select {

+ 10 - 8
common/buf/writer.go

@@ -12,13 +12,6 @@ type BufferToBytesWriter struct {
 	io.Writer
 }
 
-// NewBufferToBytesWriter returns a new BufferToBytesWriter.
-func NewBufferToBytesWriter(writer io.Writer) *BufferToBytesWriter {
-	return &BufferToBytesWriter{
-		Writer: writer,
-	}
-}
-
 // WriteMultiBuffer implements Writer. This method takes ownership of the given buffer.
 func (w *BufferToBytesWriter) WriteMultiBuffer(mb MultiBuffer) error {
 	defer mb.Release()
@@ -113,7 +106,16 @@ func (w *BufferedWriter) WriteMultiBuffer(b MultiBuffer) error {
 // Flush flushes buffered content into underlying writer.
 func (w *BufferedWriter) Flush() error {
 	if !w.buffer.IsEmpty() {
-		if err := w.writer.WriteMultiBuffer(NewMultiBufferValue(w.buffer)); err != nil {
+		b := w.buffer
+		w.buffer = nil
+
+		if writer, ok := w.writer.(io.Writer); ok {
+			_, err := writer.Write(b.Bytes())
+			b.Release()
+			if err != nil {
+				return err
+			}
+		} else if err := w.writer.WriteMultiBuffer(NewMultiBufferValue(b)); err != nil {
 			return err
 		}
 

+ 1 - 1
dns.go

@@ -55,6 +55,6 @@ func (d *syncDNSClient) Set(client DNSClient) {
 	d.Lock()
 	defer d.Unlock()
 
-	common.Close(d.DNSClient)
+	common.Close(d.DNSClient) // nolint: errcheck
 	d.DNSClient = client
 }

+ 16 - 0
functions.go

@@ -0,0 +1,16 @@
+package core
+
+import (
+	"context"
+
+	"v2ray.com/core/common"
+)
+
+// CreateObject creates a new object based on the given V2Ray instance and config. The V2Ray instance may be nil.
+func CreateObject(v *Instance, config interface{}) (interface{}, error) {
+	ctx := context.Background()
+	if v != nil {
+		ctx = context.WithValue(ctx, v2rayKey, v)
+	}
+	return common.CreateObject(ctx, config)
+}

+ 2 - 2
network.go

@@ -90,7 +90,7 @@ func (m *syncInboundHandlerManager) Set(manager InboundHandlerManager) {
 	m.Lock()
 	defer m.Unlock()
 
-	common.Close(m.InboundHandlerManager)
+	common.Close(m.InboundHandlerManager) // nolint: errcheck
 	m.InboundHandlerManager = manager
 }
 
@@ -172,6 +172,6 @@ func (m *syncOutboundHandlerManager) Set(manager OutboundHandlerManager) {
 	m.Lock()
 	defer m.Unlock()
 
-	common.Close(m.OutboundHandlerManager)
+	common.Close(m.OutboundHandlerManager) // nolint: errcheck
 	m.OutboundHandlerManager = manager
 }

+ 3 - 1
policy.go

@@ -35,6 +35,7 @@ type BufferPolicy struct {
 	PerConnection int32
 }
 
+// SystemStatsPolicy contains stat policy settings on system level.
 type SystemStatsPolicy struct {
 	// Whether or not to enable stat counter for uplink traffic in inbound handlers.
 	InboundUplink bool
@@ -42,6 +43,7 @@ type SystemStatsPolicy struct {
 	InboundDownlink bool
 }
 
+// SystemPolicy contains policy settings at system level.
 type SystemPolicy struct {
 	Stats  SystemStatsPolicy
 	Buffer BufferPolicy
@@ -178,6 +180,6 @@ func (m *syncPolicyManager) Set(manager PolicyManager) {
 	m.Lock()
 	defer m.Unlock()
 
-	common.Close(m.PolicyManager)
+	common.Close(m.PolicyManager) // nolint: errcheck
 	m.PolicyManager = manager
 }

+ 4 - 6
proxy/http/server.go

@@ -10,10 +10,6 @@ import (
 	"strings"
 	"time"
 
-	"v2ray.com/core/common/task"
-
-	"v2ray.com/core/transport/pipe"
-
 	"v2ray.com/core"
 	"v2ray.com/core/common"
 	"v2ray.com/core/common/buf"
@@ -22,7 +18,9 @@ import (
 	"v2ray.com/core/common/net"
 	http_proto "v2ray.com/core/common/protocol/http"
 	"v2ray.com/core/common/signal"
+	"v2ray.com/core/common/task"
 	"v2ray.com/core/transport/internet"
+	"v2ray.com/core/transport/pipe"
 )
 
 // Server is an HTTP proxy server.
@@ -114,7 +112,7 @@ Start:
 	if err != nil {
 		trace := newError("failed to read http request").Base(err)
 		if errors.Cause(err) != io.EOF && !isTimeout(errors.Cause(err)) {
-			trace.AtWarning()
+			trace.AtWarning() // nolint: errcheck
 		}
 		return trace
 	}
@@ -258,7 +256,7 @@ func (s *Server) handlePlainHTTP(ctx context.Context, request *http.Request, wri
 	}
 
 	// Plain HTTP request is not a stream. The request always finishes before response. Hense request has to be closed later.
-	defer common.Close(link.Writer)
+	defer common.Close(link.Writer) // nolint: errcheck
 	var result error = errWaitAnother
 
 	requestDone := func() error {

+ 2 - 2
router.go

@@ -67,7 +67,7 @@ func (d *syncDispatcher) Set(disp Dispatcher) {
 	d.Lock()
 	defer d.Unlock()
 
-	common.Close(d.Dispatcher)
+	common.Close(d.Dispatcher) // nolint: errorcheck
 	d.Dispatcher = disp
 }
 
@@ -126,6 +126,6 @@ func (r *syncRouter) Set(router Router) {
 	r.Lock()
 	defer r.Unlock()
 
-	common.Close(r.Router)
+	common.Close(r.Router) // nolint: errcheck
 	r.Router = router
 }

+ 1 - 1
stats.go

@@ -81,7 +81,7 @@ func (s *syncStatManager) Set(m StatManager) {
 	defer s.Unlock()
 
 	if s.StatManager != nil {
-		s.StatManager.Close()
+		s.StatManager.Close() // nolint: errcheck
 	}
 	s.StatManager = m
 }

+ 13 - 3
transport/internet/websocket/connection.go

@@ -9,6 +9,7 @@ import (
 
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
+	"v2ray.com/core/common/serial"
 )
 
 var (
@@ -70,7 +71,7 @@ func (c *connection) Write(b []byte) (int, error) {
 
 func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
 	if c.mergingWriter == nil {
-		c.mergingWriter = buf.NewBufferedWriter(buf.NewBufferToBytesWriter(c))
+		c.mergingWriter = buf.NewBufferedWriter(&buf.BufferToBytesWriter{Writer: c})
 	}
 	if err := c.mergingWriter.WriteMultiBuffer(mb); err != nil {
 		return err
@@ -79,8 +80,17 @@ func (c *connection) WriteMultiBuffer(mb buf.MultiBuffer) error {
 }
 
 func (c *connection) Close() error {
-	c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second*5))
-	return c.conn.Close()
+	var errors []interface{}
+	if err := c.conn.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second*5)); err != nil {
+		errors = append(errors, err)
+	}
+	if err := c.conn.Close(); err != nil {
+		errors = append(errors, err)
+	}
+	if len(errors) > 0 {
+		return newError("failed to close connection").Base(newError(serial.Concat(errors...)))
+	}
+	return nil
 }
 
 func (c *connection) LocalAddr() net.Addr {

+ 0 - 1
transport/internet/websocket/hub.go

@@ -7,7 +7,6 @@ import (
 	"strconv"
 	"sync"
 	"time"
-
 	"websocket"
 
 	"v2ray.com/core/common"

+ 12 - 9
v2ray.go

@@ -5,6 +5,7 @@ import (
 	"sync"
 
 	"v2ray.com/core/common"
+	"v2ray.com/core/common/serial"
 	"v2ray.com/core/common/uuid"
 )
 
@@ -53,13 +54,13 @@ func New(config *Config) (*Instance, error) {
 		if err != nil {
 			return nil, err
 		}
-		if _, err := server.CreateObject(settings); err != nil {
+		if _, err := CreateObject(server, settings); err != nil {
 			return nil, err
 		}
 	}
 
 	for _, inbound := range config.Inbound {
-		rawHandler, err := server.CreateObject(inbound)
+		rawHandler, err := CreateObject(server, inbound)
 		if err != nil {
 			return nil, err
 		}
@@ -73,7 +74,7 @@ func New(config *Config) (*Instance, error) {
 	}
 
 	for _, outbound := range config.Outbound {
-		rawHandler, err := server.CreateObject(outbound)
+		rawHandler, err := CreateObject(server, outbound)
 		if err != nil {
 			return nil, err
 		}
@@ -89,11 +90,6 @@ func New(config *Config) (*Instance, error) {
 	return server, nil
 }
 
-func (s *Instance) CreateObject(config interface{}) (interface{}, error) {
-	ctx := context.WithValue(context.Background(), v2rayKey, s)
-	return common.CreateObject(ctx, config)
-}
-
 // ID returns a unique ID for this V2Ray instance.
 func (s *Instance) ID() uuid.UUID {
 	return s.id
@@ -105,8 +101,15 @@ func (s *Instance) Close() error {
 	defer s.access.Unlock()
 
 	s.running = false
+
+	var errors []interface{}
 	for _, f := range s.allFeatures() {
-		f.Close()
+		if err := f.Close(); err != nil {
+			errors = append(errors, err)
+		}
+	}
+	if len(errors) > 0 {
+		return newError("failed to close all features").Base(newError(serial.Concat(errors...)))
 	}
 
 	return nil