浏览代码

replace channel with pipe in udp conn

Darien Raymond 7 年之前
父节点
当前提交
91109f3657
共有 3 个文件被更改,包括 39 次插入55 次删除
  1. 16 47
      app/proxyman/inbound/worker.go
  2. 13 8
      transport/pipe/impl.go
  3. 10 0
      transport/pipe/pipe.go

+ 16 - 47
app/proxyman/inbound/worker.go

@@ -2,7 +2,6 @@ package inbound
 
 import (
 	"context"
-	"io"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -20,6 +19,7 @@ import (
 	"v2ray.com/core/transport/internet"
 	"v2ray.com/core/transport/internet/tcp"
 	"v2ray.com/core/transport/internet/udp"
+	"v2ray.com/core/transport/pipe"
 )
 
 type worker interface {
@@ -121,7 +121,8 @@ func (w *tcpWorker) Port() net.Port {
 
 type udpConn struct {
 	lastActivityTime int64 // in seconds
-	input            chan *buf.Buffer
+	reader           buf.Reader
+	writer           buf.Writer
 	output           func([]byte) (int, error)
 	remote           net.Addr
 	local            net.Addr
@@ -136,52 +137,21 @@ func (c *udpConn) updateActivity() {
 
 // ReadMultiBuffer implements buf.Reader
 func (c *udpConn) ReadMultiBuffer() (buf.MultiBuffer, error) {
-	var payload buf.MultiBuffer
-
-	select {
-	case in := <-c.input:
-		payload.Append(in)
-	default:
-		select {
-		case in := <-c.input:
-			payload.Append(in)
-		case <-c.done.Wait():
-			return nil, io.EOF
-		}
-	}
-
-L:
-	for {
-		select {
-		case in := <-c.input:
-			payload.Append(in)
-		default:
-			break L
-		}
+	mb, err := c.reader.ReadMultiBuffer()
+	if err != nil {
+		return nil, err
 	}
-
 	c.updateActivity()
 
 	if c.uplink != nil {
-		c.uplink.Add(int64(payload.Len()))
+		c.uplink.Add(int64(mb.Len()))
 	}
 
-	return payload, nil
+	return mb, nil
 }
 
 func (c *udpConn) Read(buf []byte) (int, error) {
-	select {
-	case in := <-c.input:
-		defer in.Release()
-		c.updateActivity()
-		nBytes := copy(buf, in.Bytes())
-		if c.uplink != nil {
-			c.uplink.Add(int64(nBytes))
-		}
-		return nBytes, nil
-	case <-c.done.Wait():
-		return 0, io.EOF
-	}
+	panic("not implemented")
 }
 
 // Write implements io.Writer.
@@ -198,6 +168,7 @@ func (c *udpConn) Write(buf []byte) (int, error) {
 
 func (c *udpConn) Close() error {
 	common.Must(c.done.Close())
+	common.Must(common.Close(c.writer))
 	return nil
 }
 
@@ -251,8 +222,10 @@ func (w *udpWorker) getConnection(id connID) (*udpConn, bool) {
 		return conn, true
 	}
 
+	pReader, pWriter := pipe.New(pipe.DiscardOverflow(), pipe.WithSizeLimit(16*1024))
 	conn := &udpConn{
-		input: make(chan *buf.Buffer, 32),
+		reader: pReader,
+		writer: pWriter,
 		output: func(b []byte) (int, error) {
 			return w.hub.WriteTo(b, id.src)
 		},
@@ -282,13 +255,9 @@ func (w *udpWorker) callback(b *buf.Buffer, source net.Destination, originalDest
 		id.dest = originalDest
 	}
 	conn, existing := w.getConnection(id)
-	select {
-	case conn.input <- b:
-	case <-conn.done.Wait():
-		b.Release()
-	default:
-		b.Release()
-	}
+
+	// payload will be discarded in pipe is full.
+	conn.writer.WriteMultiBuffer(buf.NewMultiBufferValue(b)) // nolint: errcheck
 
 	if !existing {
 		common.Must(w.checker.Start())

+ 13 - 8
transport/pipe/impl.go

@@ -22,12 +22,13 @@ const (
 
 type pipe struct {
 	sync.Mutex
-	data        buf.MultiBuffer
-	readSignal  *signal.Notifier
-	writeSignal *signal.Notifier
-	done        *done.Instance
-	limit       int32
-	state       state
+	data            buf.MultiBuffer
+	readSignal      *signal.Notifier
+	writeSignal     *signal.Notifier
+	done            *done.Instance
+	limit           int32
+	state           state
+	discardOverflow bool
 }
 
 var errBufferFull = errors.New("buffer full")
@@ -121,10 +122,14 @@ func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
 
 	for {
 		err := p.writeMultiBufferInternal(mb)
-		if err == nil {
+		switch {
+		case err == nil:
 			p.readSignal.Signal()
 			return nil
-		} else if err != errBufferFull {
+		case err == errBufferFull && p.discardOverflow:
+			mb.Release()
+			return nil
+		case err != errBufferFull:
 			mb.Release()
 			p.readSignal.Signal()
 			return err

+ 10 - 0
transport/pipe/pipe.go

@@ -11,18 +11,28 @@ import (
 // Option for creating new Pipes.
 type Option func(*pipe)
 
+// WithoutSizeLimit returns an Option for Pipe to have no size limit.
 func WithoutSizeLimit() Option {
 	return func(p *pipe) {
 		p.limit = -1
 	}
 }
 
+// WithSizeLimit returns an Option for Pipe to have the given size limit.
 func WithSizeLimit(limit int32) Option {
 	return func(p *pipe) {
 		p.limit = limit
 	}
 }
 
+// DiscardOverflow returns an Option for Pipe to discard writes if full.
+func DiscardOverflow() Option {
+	return func(p *pipe) {
+		p.discardOverflow = true
+	}
+}
+
+// OptionsFromContext returns a list of Options from context.
 func OptionsFromContext(ctx context.Context) []Option {
 	var opt []Option