Darien Raymond преди 9 години
родител
ревизия
ad3f450bce
променени са 6 файла, в които са добавени 12 реда и са изтрити 19 реда
  1. 1 1
      app/proxy/proxy.go
  2. 3 3
      common/io/buffered_writer.go
  3. 2 2
      common/io/chain_writer.go
  4. 1 0
      proxy/vmess/outbound/outbound.go
  5. 1 4
      transport/internet/ws/wsconn.go
  6. 4 9
      transport/ray/direct.go

+ 1 - 1
app/proxy/proxy.go

@@ -92,7 +92,7 @@ func (this *ProxyConnection) Read(b []byte) (int, error) {
 
 func (this *ProxyConnection) Write(b []byte) (int, error) {
 	if this.closed {
-		return 0, io.EOF
+		return 0, errors.New("Proxy|Outbound: Writing into closed connection.")
 	}
 	return this.writer.Write(b)
 }

+ 3 - 3
common/io/buffered_writer.go

@@ -27,7 +27,7 @@ func (this *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
 	defer this.Unlock()
 
 	if this.writer == nil {
-		return 0, io.EOF
+		return 0, io.ErrClosedPipe
 	}
 
 	totalBytes := int64(0)
@@ -49,7 +49,7 @@ func (this *BufferedWriter) Write(b []byte) (int, error) {
 	defer this.Unlock()
 
 	if this.writer == nil {
-		return 0, io.EOF
+		return 0, io.ErrClosedPipe
 	}
 
 	if !this.cached {
@@ -67,7 +67,7 @@ func (this *BufferedWriter) Flush() error {
 	defer this.Unlock()
 
 	if this.writer == nil {
-		return io.EOF
+		return io.ErrClosedPipe
 	}
 
 	return this.FlushWithoutLock()

+ 2 - 2
common/io/chain_writer.go

@@ -20,7 +20,7 @@ func NewChainWriter(writer Writer) *ChainWriter {
 
 func (this *ChainWriter) Write(payload []byte) (int, error) {
 	if this.writer == nil {
-		return 0, io.EOF
+		return 0, io.ErrClosedPipe
 	}
 
 	size := len(payload)
@@ -30,7 +30,7 @@ func (this *ChainWriter) Write(payload []byte) (int, error) {
 	this.Lock()
 	defer this.Unlock()
 	if this.writer == nil {
-		return 0, io.EOF
+		return 0, io.ErrClosedPipe
 	}
 
 	err := this.writer.Write(buffer)

+ 1 - 0
proxy/vmess/outbound/outbound.go

@@ -102,6 +102,7 @@ func (this *VMessOutboundHandler) handleRequest(session *encoding.ClientSession,
 	}
 	if !payload.IsEmpty() {
 		if err := streamWriter.Write(payload); err != nil {
+			log.Info("VMess|Outbound: Failed to write payload. Disabling connection reuse.")
 			conn.SetReusable(false)
 		}
 	}

+ 1 - 4
transport/internet/ws/wsconn.go

@@ -30,9 +30,7 @@ func (ws *wsconn) Read(b []byte) (n int, err error) {
 }
 
 func (ws *wsconn) read(b []byte) (n int, err error) {
-
 	if ws.connClosing {
-
 		return 0, io.EOF
 	}
 
@@ -79,9 +77,8 @@ func (ws *wsconn) readNext(b []byte) (n int, err error) {
 
 func (ws *wsconn) Write(b []byte) (n int, err error) {
 	ws.wlock.Lock()
-
 	if ws.connClosing {
-		return 0, io.EOF
+		return 0, io.ErrClosedPipe
 	}
 
 	n, err = ws.write(b)

+ 4 - 9
transport/ray/direct.go

@@ -1,7 +1,6 @@
 package ray
 
 import (
-	"errors"
 	"io"
 	"sync"
 	"time"
@@ -13,10 +12,6 @@ const (
 	bufferSize = 128
 )
 
-var (
-	ErrIOTimeout = errors.New("IO Timeout")
-)
-
 // NewRay creates a new Ray for direct traffic transport.
 func NewRay() Ray {
 	return &directRay{
@@ -79,24 +74,24 @@ func (this *Stream) Read() (*alloc.Buffer, error) {
 func (this *Stream) Write(data *alloc.Buffer) error {
 	for !this.closed {
 		err := this.TryWriteOnce(data)
-		if err != ErrIOTimeout {
+		if err != io.ErrNoProgress {
 			return err
 		}
 	}
-	return io.EOF
+	return io.ErrClosedPipe
 }
 
 func (this *Stream) TryWriteOnce(data *alloc.Buffer) error {
 	this.access.RLock()
 	defer this.access.RUnlock()
 	if this.closed {
-		return io.EOF
+		return io.ErrClosedPipe
 	}
 	select {
 	case this.buffer <- data:
 		return nil
 	case <-time.After(2 * time.Second):
-		return ErrIOTimeout
+		return io.ErrNoProgress
 	}
 }