浏览代码

Merge branch 'master' of https://github.com/v2ray/v2ray-core

Darien Raymond 8 年之前
父节点
当前提交
e62d649322

+ 6 - 19
app/proxyman/mux/mux.go

@@ -15,7 +15,6 @@ import (
 	"v2ray.com/core/common/buf"
 	"v2ray.com/core/common/errors"
 	"v2ray.com/core/common/net"
-	"v2ray.com/core/common/signal"
 	"v2ray.com/core/proxy"
 	"v2ray.com/core/transport/ray"
 )
@@ -147,7 +146,7 @@ func fetchInput(ctx context.Context, s *Session, output buf.Writer) {
 		log.Trace(newError("failed to write first payload").Base(err))
 		return
 	}
-	if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil {
+	if err := buf.Copy(s.input, writer); err != nil {
 		log.Trace(newError("failed to fetch all input").Base(err))
 	}
 }
@@ -175,22 +174,10 @@ func (m *Client) Dispatch(ctx context.Context, outboundRay ray.OutboundRay) bool
 }
 
 func drain(reader *Reader) error {
-	data, err := reader.Read()
-	if err != nil {
-		return err
-	}
-	data.Release()
+	buf.Copy(reader, buf.Discard)
 	return nil
 }
 
-func pipe(reader *Reader, writer buf.Writer) error {
-	data, err := reader.Read()
-	if err != nil {
-		return err
-	}
-	return writer.Write(data)
-}
-
 func (m *Client) handleStatueKeepAlive(meta *FrameMetadata, reader *Reader) error {
 	if meta.Option.Has(OptionData) {
 		return drain(reader)
@@ -211,7 +198,7 @@ func (m *Client) handleStatusKeep(meta *FrameMetadata, reader *Reader) error {
 	}
 
 	if s, found := m.sessionManager.Get(meta.SessionID); found {
-		return pipe(reader, s.output)
+		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 	}
 	return drain(reader)
 }
@@ -303,7 +290,7 @@ type ServerWorker struct {
 
 func handle(ctx context.Context, s *Session, output buf.Writer) {
 	writer := NewResponseWriter(s.ID, output)
-	if err := buf.Copy(signal.BackgroundTimer(), s.input, writer); err != nil {
+	if err := buf.Copy(s.input, writer); err != nil {
 		log.Trace(newError("session ", s.ID, " ends: ").Base(err))
 	}
 	writer.Close()
@@ -335,7 +322,7 @@ func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata,
 	w.sessionManager.Add(s)
 	go handle(ctx, s, w.outboundRay.OutboundOutput())
 	if meta.Option.Has(OptionData) {
-		return pipe(reader, s.output)
+		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 	}
 	return nil
 }
@@ -345,7 +332,7 @@ func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *Reader) err
 		return nil
 	}
 	if s, found := w.sessionManager.Get(meta.SessionID); found {
-		return pipe(reader, s.output)
+		return buf.Copy(reader, s.output, buf.IgnoreWriterError())
 	}
 	return drain(reader)
 }

+ 34 - 12
app/proxyman/mux/reader.go

@@ -8,18 +8,22 @@ import (
 )
 
 type Reader struct {
-	reader io.Reader
-	buffer *buf.Buffer
+	reader   io.Reader
+	buffer   *buf.Buffer
+	leftOver int
 }
 
 func NewReader(reader buf.Reader) *Reader {
 	return &Reader{
-		reader: buf.ToBytesReader(reader),
-		buffer: buf.NewLocal(1024),
+		reader:   buf.ToBytesReader(reader),
+		buffer:   buf.NewLocal(1024),
+		leftOver: -1,
 	}
 }
 
 func (r *Reader) ReadMetadata() (*FrameMetadata, error) {
+	r.leftOver = -1
+
 	b := r.buffer
 	b.Clear()
 
@@ -37,25 +41,43 @@ func (r *Reader) ReadMetadata() (*FrameMetadata, error) {
 	return ReadFrameFrom(b.Bytes())
 }
 
-func (r *Reader) Read() (buf.MultiBuffer, error) {
+func (r *Reader) readSize() error {
 	if err := r.buffer.Reset(buf.ReadFullFrom(r.reader, 2)); err != nil {
-		return nil, err
+		return err
+	}
+	r.leftOver = int(serial.BytesToUint16(r.buffer.Bytes()))
+	return nil
+}
+
+func (r *Reader) Read() (buf.MultiBuffer, error) {
+	if r.leftOver == 0 {
+		r.leftOver = -1
+		return nil, io.EOF
+	}
+	if r.leftOver == -1 {
+		if err := r.readSize(); err != nil {
+			return nil, err
+		}
 	}
 
-	dataLen := int(serial.BytesToUint16(r.buffer.Bytes()))
 	mb := buf.NewMultiBuffer()
-	for dataLen > 0 {
+	for r.leftOver > 0 {
 		readLen := buf.Size
-		if dataLen < readLen {
-			readLen = dataLen
+		if r.leftOver < readLen {
+			readLen = r.leftOver
 		}
 		b := buf.New()
-		if err := b.AppendSupplier(buf.ReadFullFrom(r.reader, readLen)); err != nil {
+		if err := b.AppendSupplier(func(bb []byte) (int, error) {
+			return r.reader.Read(bb[:readLen])
+		}); err != nil {
 			mb.Release()
 			return nil, err
 		}
-		dataLen -= readLen
+		r.leftOver -= b.Len()
 		mb.Append(b)
+		if b.Len() < readLen {
+			break
+		}
 	}
 
 	return mb, nil

+ 61 - 7
common/buf/io.go

@@ -47,21 +47,71 @@ func ReadAtLeastFrom(reader io.Reader, size int) Supplier {
 	}
 }
 
-func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer) error {
+type copyHandler struct {
+	onReadError  func(error) error
+	onData       func()
+	onWriteError func(error) error
+}
+
+func (h *copyHandler) readFrom(reader Reader) (MultiBuffer, error) {
+	mb, err := reader.Read()
+	if err != nil && h.onReadError != nil {
+		err = h.onReadError(err)
+	}
+	return mb, err
+}
+
+func (h *copyHandler) writeTo(writer Writer, mb MultiBuffer) error {
+	err := writer.Write(mb)
+	if err != nil && h.onWriteError != nil {
+		err = h.onWriteError(err)
+	}
+	return err
+}
+
+type CopyOption func(*copyHandler)
+
+func IgnoreReaderError() CopyOption {
+	return func(handler *copyHandler) {
+		handler.onReadError = func(err error) error {
+			return nil
+		}
+	}
+}
+
+func IgnoreWriterError() CopyOption {
+	return func(handler *copyHandler) {
+		handler.onWriteError = func(err error) error {
+			return nil
+		}
+	}
+}
+
+func UpdateActivity(timer signal.ActivityTimer) CopyOption {
+	return func(handler *copyHandler) {
+		handler.onData = func() {
+			timer.Update()
+		}
+	}
+}
+
+func copyInternal(reader Reader, writer Writer, handler *copyHandler) error {
 	for {
-		buffer, err := reader.Read()
+		buffer, err := handler.readFrom(reader)
 		if err != nil {
 			return err
 		}
 
-		timer.Update()
-
 		if buffer.IsEmpty() {
 			buffer.Release()
 			continue
 		}
 
-		if err := writer.Write(buffer); err != nil {
+		if handler.onData != nil {
+			handler.onData()
+		}
+
+		if err := handler.writeTo(writer, buffer); err != nil {
 			buffer.Release()
 			return err
 		}
@@ -70,8 +120,12 @@ func copyInternal(timer signal.ActivityTimer, reader Reader, writer Writer) erro
 
 // Copy dumps all payload from reader to writer or stops when an error occurs.
 // ActivityTimer gets updated as soon as there is a payload.
-func Copy(timer signal.ActivityTimer, reader Reader, writer Writer) error {
-	err := copyInternal(timer, reader, writer)
+func Copy(reader Reader, writer Writer, options ...CopyOption) error {
+	handler := new(copyHandler)
+	for _, option := range options {
+		option(handler)
+	}
+	err := copyInternal(reader, writer, handler)
 	if err != nil && errors.Cause(err) != io.EOF {
 		return err
 	}

+ 11 - 0
common/buf/writer.go

@@ -106,3 +106,14 @@ func (w *bytesToBufferWriter) ReadFrom(reader io.Reader) (int64, error) {
 	}
 	return totalBytes, nil
 }
+
+type noOpWriter struct{}
+
+func (noOpWriter) Write(b MultiBuffer) error {
+	b.Release()
+	return nil
+}
+
+var (
+	Discard Writer = noOpWriter{}
+)

+ 1 - 1
core.go

@@ -18,7 +18,7 @@ import (
 )
 
 var (
-	version  = "2.25"
+	version  = "2.26"
 	build    = "Custom"
 	codename = "One for all"
 	intro    = "An unified platform for anti-censorship."

+ 2 - 2
proxy/dokodemo/dokodemo.go

@@ -76,7 +76,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 
 		chunkReader := buf.NewReader(conn)
 
-		if err := buf.Copy(timer, chunkReader, inboundRay.InboundInput()); err != nil {
+		if err := buf.Copy(chunkReader, inboundRay.InboundInput(), buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport request").Base(err)
 		}
 
@@ -86,7 +86,7 @@ func (d *DokodemoDoor) Process(ctx context.Context, network net.Network, conn in
 	responseDone := signal.ExecuteAsync(func() error {
 		v2writer := buf.NewWriter(conn)
 
-		if err := buf.Copy(timer, inboundRay.InboundOutput(), v2writer); err != nil {
+		if err := buf.Copy(inboundRay.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport response").Base(err)
 		}
 		return nil

+ 2 - 2
proxy/freedom/freedom.go

@@ -118,7 +118,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 		} else {
 			writer = buf.NewSequentialWriter(conn)
 		}
-		if err := buf.Copy(timer, input, writer); err != nil {
+		if err := buf.Copy(input, writer, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to process request").Base(err)
 		}
 		return nil
@@ -128,7 +128,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 		defer output.Close()
 
 		v2reader := buf.NewReader(conn)
-		if err := buf.Copy(timer, v2reader, output); err != nil {
+		if err := buf.Copy(v2reader, output, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to process response").Base(err)
 		}
 		return nil

+ 2 - 2
proxy/http/server.go

@@ -141,7 +141,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 		defer ray.InboundInput().Close()
 
 		v2reader := buf.NewReader(reader)
-		if err := buf.Copy(timer, v2reader, ray.InboundInput()); err != nil {
+		if err := buf.Copy(v2reader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil {
 			return err
 		}
 		return nil
@@ -149,7 +149,7 @@ func (s *Server) handleConnect(ctx context.Context, request *http.Request, reade
 
 	responseDone := signal.ExecuteAsync(func() error {
 		v2writer := buf.NewWriter(writer)
-		if err := buf.Copy(timer, ray.InboundOutput(), v2writer); err != nil {
+		if err := buf.Copy(ray.InboundOutput(), v2writer, buf.UpdateActivity(timer)); err != nil {
 			return err
 		}
 		return nil

+ 4 - 4
proxy/shadowsocks/client.go

@@ -105,7 +105,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 		}
 
 		requestDone := signal.ExecuteAsync(func() error {
-			if err := buf.Copy(timer, outboundRay.OutboundInput(), bodyWriter); err != nil {
+			if err := buf.Copy(outboundRay.OutboundInput(), bodyWriter, buf.UpdateActivity(timer)); err != nil {
 				return err
 			}
 			return nil
@@ -119,7 +119,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 				return err
 			}
 
-			if err := buf.Copy(timer, responseReader, outboundRay.OutboundOutput()); err != nil {
+			if err := buf.Copy(responseReader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)); err != nil {
 				return err
 			}
 
@@ -141,7 +141,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 		})
 
 		requestDone := signal.ExecuteAsync(func() error {
-			if err := buf.Copy(timer, outboundRay.OutboundInput(), writer); err != nil {
+			if err := buf.Copy(outboundRay.OutboundInput(), writer, buf.UpdateActivity(timer)); err != nil {
 				return newError("failed to transport all UDP request").Base(err)
 			}
 			return nil
@@ -155,7 +155,7 @@ func (v *Client) Process(ctx context.Context, outboundRay ray.OutboundRay, diale
 				User:   user,
 			}
 
-			if err := buf.Copy(timer, reader, outboundRay.OutboundOutput()); err != nil {
+			if err := buf.Copy(reader, outboundRay.OutboundOutput(), buf.UpdateActivity(timer)); err != nil {
 				return newError("failed to transport all UDP response").Base(err)
 			}
 			return nil

+ 2 - 2
proxy/shadowsocks/server.go

@@ -173,7 +173,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 			return err
 		}
 
-		if err := buf.Copy(timer, ray.InboundOutput(), responseWriter); err != nil {
+		if err := buf.Copy(ray.InboundOutput(), responseWriter, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport all TCP response").Base(err)
 		}
 
@@ -183,7 +183,7 @@ func (s *Server) handleConnection(ctx context.Context, conn internet.Connection,
 	requestDone := signal.ExecuteAsync(func() error {
 		defer ray.InboundInput().Close()
 
-		if err := buf.Copy(timer, bodyReader, ray.InboundInput()); err != nil {
+		if err := buf.Copy(bodyReader, ray.InboundInput(), buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport all TCP request").Base(err)
 		}
 		return nil

+ 4 - 4
proxy/socks/client.go

@@ -90,11 +90,11 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
 	var responseFunc func() error
 	if request.Command == protocol.RequestCommandTCP {
 		requestFunc = func() error {
-			return buf.Copy(timer, ray.OutboundInput(), buf.NewWriter(conn))
+			return buf.Copy(ray.OutboundInput(), buf.NewWriter(conn), buf.UpdateActivity(timer))
 		}
 		responseFunc = func() error {
 			defer ray.OutboundOutput().Close()
-			return buf.Copy(timer, buf.NewReader(conn), ray.OutboundOutput())
+			return buf.Copy(buf.NewReader(conn), ray.OutboundOutput(), buf.UpdateActivity(timer))
 		}
 	} else if request.Command == protocol.RequestCommandUDP {
 		udpConn, err := dialer.Dial(ctx, udpRequest.Destination())
@@ -103,12 +103,12 @@ func (c *Client) Process(ctx context.Context, ray ray.OutboundRay, dialer proxy.
 		}
 		defer udpConn.Close()
 		requestFunc = func() error {
-			return buf.Copy(timer, ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)))
+			return buf.Copy(ray.OutboundInput(), buf.NewSequentialWriter(NewUDPWriter(request, udpConn)), buf.UpdateActivity(timer))
 		}
 		responseFunc = func() error {
 			defer ray.OutboundOutput().Close()
 			reader := &UDPReader{reader: udpConn}
-			return buf.Copy(timer, reader, ray.OutboundOutput())
+			return buf.Copy(reader, ray.OutboundOutput(), buf.UpdateActivity(timer))
 		}
 	}
 

+ 2 - 2
proxy/socks/server.go

@@ -124,7 +124,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 		defer input.Close()
 
 		v2reader := buf.NewReader(reader)
-		if err := buf.Copy(timer, v2reader, input); err != nil {
+		if err := buf.Copy(v2reader, input, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport all TCP request").Base(err)
 		}
 		return nil
@@ -132,7 +132,7 @@ func (v *Server) transport(ctx context.Context, reader io.Reader, writer io.Writ
 
 	responseDone := signal.ExecuteAsync(func() error {
 		v2writer := buf.NewWriter(writer)
-		if err := buf.Copy(timer, output, v2writer); err != nil {
+		if err := buf.Copy(output, v2writer, buf.UpdateActivity(timer)); err != nil {
 			return newError("failed to transport all TCP response").Base(err)
 		}
 		return nil

+ 2 - 2
proxy/vmess/inbound/inbound.go

@@ -129,7 +129,7 @@ func transferRequest(timer signal.ActivityTimer, session *encoding.ServerSession
 	defer output.Close()
 
 	bodyReader := session.DecodeRequestBody(request, input)
-	if err := buf.Copy(timer, bodyReader, output); err != nil {
+	if err := buf.Copy(bodyReader, output, buf.UpdateActivity(timer)); err != nil {
 		return err
 	}
 	return nil
@@ -157,7 +157,7 @@ func transferResponse(timer signal.ActivityTimer, session *encoding.ServerSessio
 		}
 	}
 
-	if err := buf.Copy(timer, input, bodyWriter); err != nil {
+	if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil {
 		return err
 	}
 

+ 2 - 2
proxy/vmess/outbound/outbound.go

@@ -123,7 +123,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 			return err
 		}
 
-		if err := buf.Copy(timer, input, bodyWriter); err != nil {
+		if err := buf.Copy(input, bodyWriter, buf.UpdateActivity(timer)); err != nil {
 			return err
 		}
 
@@ -147,7 +147,7 @@ func (v *Handler) Process(ctx context.Context, outboundRay ray.OutboundRay, dial
 
 		reader.SetBuffered(false)
 		bodyReader := session.DecodeResponseBody(request, reader)
-		if err := buf.Copy(timer, bodyReader, output); err != nil {
+		if err := buf.Copy(bodyReader, output, buf.UpdateActivity(timer)); err != nil {
 			return err
 		}