Selaa lähdekoodia

Simplify channel operations

V2Ray 10 vuotta sitten
vanhempi
commit
d12737b3b8
11 muutettua tiedostoa jossa 63 lisäystä ja 176 poistoa
  1. 1 1
      config.go
  2. 0 75
      io/bufferset.go
  3. 4 3
      io/vmess/vmess.go
  4. 1 1
      net/address.go
  5. 5 23
      net/freedom/freedom.go
  6. 6 23
      net/socks/socks.go
  7. 34 0
      net/transport.go
  8. 0 5
      net/vmess/vmess.go
  9. 6 22
      net/vmess/vmessin.go
  10. 5 22
      net/vmess/vmessout.go
  11. 1 1
      release/server/main.go

+ 1 - 1
config.go

@@ -16,7 +16,7 @@ type ConnectionConfig struct {
 
 // Config is the config for Point server.
 type Config struct {
-	Port           uint16            `json:"port"` // Port of this Point server.
+	Port           uint16           `json:"port"` // Port of this Point server.
 	InboundConfig  ConnectionConfig `json:"inbound"`
 	OutboundConfig ConnectionConfig `json:"outbound"`
 }

+ 0 - 75
io/bufferset.go

@@ -1,75 +0,0 @@
-package io
-
-import (
-	"errors"
-)
-
-const (
-	SizeSmall  = 16
-	SizeMedium = 128
-	SizeLarge  = 512
-)
-
-var (
-	ErrorNoChannel = errors.New("No suitable channels found.")
-)
-
-type BufferSet struct {
-	small  chan []byte
-	medium chan []byte
-	large  chan []byte
-}
-
-func NewBufferSet() *BufferSet {
-	bSet := new(BufferSet)
-	bSet.small = make(chan []byte, 128)
-	bSet.medium = make(chan []byte, 128)
-	bSet.large = make(chan []byte, 128)
-	return bSet
-}
-
-func (bSet *BufferSet) detectBucket(size int, strict bool) (chan []byte, error) {
-	if strict {
-		if size == SizeSmall {
-			return bSet.small, nil
-		} else if size == SizeMedium {
-			return bSet.medium, nil
-		} else if size == SizeLarge {
-			return bSet.large, nil
-		}
-	} else {
-		if size <= SizeSmall {
-			return bSet.small, nil
-		} else if size <= SizeMedium {
-			return bSet.medium, nil
-		} else if size <= SizeLarge {
-			return bSet.large, nil
-		}
-	}
-	return nil, ErrorNoChannel
-}
-
-func (bSet *BufferSet) FetchBuffer(minSize int) []byte {
-	var buffer []byte
-	byteChan, err := bSet.detectBucket(minSize, false)
-	if err != nil {
-		return make([]byte, minSize)
-	}
-	select {
-	case buffer = <-byteChan:
-	default:
-		buffer = make([]byte, minSize)
-	}
-	return buffer
-}
-
-func (bSet *BufferSet) ReturnBuffer(buffer []byte) {
-	byteChan, err := bSet.detectBucket(len(buffer), true)
-	if err != nil {
-		return
-	}
-	select {
-	case byteChan <- buffer:
-	default:
-	}
-}

+ 4 - 3
io/vmess/vmess.go

@@ -9,7 +9,6 @@ import (
 	"errors"
 	"fmt"
 	"io"
-	_ "log"
 	mrand "math/rand"
 
 	"github.com/v2ray/v2ray-core"
@@ -27,6 +26,8 @@ const (
 
 var (
 	ErrorInvalidUser = errors.New("Invalid User")
+
+	emptyIV = make([]byte, blockSize)
 )
 
 // VMessRequest implements the request message of VMess protocol. It only contains
@@ -75,7 +76,7 @@ func (r *VMessRequestReader) Read(reader io.Reader) (*VMessRequest, error) {
 	}
 	request.UserId = *userId
 
-	decryptor, err := NewDecryptionReader(reader, userId.Hash([]byte("PWD")), make([]byte, blockSize))
+	decryptor, err := NewDecryptionReader(reader, userId.Hash([]byte("PWD")), emptyIV)
 	if err != nil {
 		return nil, err
 	}
@@ -224,7 +225,7 @@ func (w *VMessRequestWriter) Write(writer io.Writer, request *VMessRequest) erro
 	if err != nil {
 		return err
 	}
-	aesStream := cipher.NewCFBEncrypter(aesCipher, make([]byte, blockSize))
+	aesStream := cipher.NewCFBEncrypter(aesCipher, emptyIV)
 	cWriter := v2io.NewCryptionWriter(aesStream, writer)
 
 	_, err = writer.Write(buffer[0:encryptionBegin])

+ 1 - 1
net/vdest.go → net/address.go

@@ -1,4 +1,4 @@
-package core
+package net
 
 import (
 	"net"

+ 5 - 23
net/freedom/freedom.go

@@ -1,7 +1,6 @@
 package freedom
 
 import (
-	"io"
 	"net"
 
 	"github.com/v2ray/v2ray-core"
@@ -36,31 +35,14 @@ func (vconn *FreedomConnection) Start(ray core.OutboundRay) error {
 }
 
 func (vconn *FreedomConnection) DumpInput(conn net.Conn, input <-chan []byte, finish chan<- bool) {
-	for {
-		data, open := <-input
-		if !open {
-			finish <- true
-			log.Debug("Freedom finishing input.")
-			break
-		}
-		nBytes, err := conn.Write(data)
-		log.Debug("Freedom wrote %d bytes with error %v", nBytes, err)
-	}
+	v2net.ChanToWriter(conn, input)
+	finish <- true
 }
 
 func (vconn *FreedomConnection) DumpOutput(conn net.Conn, output chan<- []byte, finish chan<- bool) {
-	for {
-		buffer := make([]byte, 512)
-		nBytes, err := conn.Read(buffer)
-		log.Debug("Freedom reading %d bytes with error %v", nBytes, err)
-		if err == io.EOF {
-			close(output)
-			finish <- true
-			log.Debug("Freedom finishing output.")
-			break
-		}
-		output <- buffer[:nBytes]
-	}
+	v2net.ReaderToChan(output, conn)
+	close(output)
+	finish <- true
 }
 
 func (vconn *FreedomConnection) CloseConn(conn net.Conn, finish <-chan bool) {

+ 6 - 23
net/socks/socks.go

@@ -2,13 +2,13 @@ package socks
 
 import (
 	"errors"
-	"io"
 	"net"
 	"strconv"
 
 	"github.com/v2ray/v2ray-core"
 	socksio "github.com/v2ray/v2ray-core/io/socks"
 	"github.com/v2ray/v2ray-core/log"
+	v2net "github.com/v2ray/v2ray-core/net"
 )
 
 var (
@@ -127,31 +127,14 @@ func (server *SocksServer) HandleConnection(connection net.Conn) error {
 }
 
 func (server *SocksServer) dumpInput(conn net.Conn, input chan<- []byte, finish chan<- bool) {
-	for {
-		buffer := make([]byte, 512)
-		nBytes, err := conn.Read(buffer)
-		log.Debug("Reading %d bytes, with error %v", nBytes, err)
-		if err == io.EOF {
-			close(input)
-			finish <- true
-			log.Debug("Socks finishing input.")
-			break
-		}
-		input <- buffer[:nBytes]
-	}
+	v2net.ReaderToChan(input, conn)
+	close(input)
+	finish <- true
 }
 
 func (server *SocksServer) dumpOutput(conn net.Conn, output <-chan []byte, finish chan<- bool) {
-	for {
-		buffer, open := <-output
-		if !open {
-			finish <- true
-			log.Debug("Socks finishing output")
-			break
-		}
-		nBytes, err := conn.Write(buffer)
-		log.Debug("Writing %d bytes with error %v", nBytes, err)
-	}
+	v2net.ChanToWriter(conn, output)
+	finish <- true
 }
 
 func (server *SocksServer) waitForFinish(finish <-chan bool) {

+ 34 - 0
net/transport.go

@@ -0,0 +1,34 @@
+package net
+
+import (
+	"io"
+
+	"github.com/v2ray/v2ray-core/log"
+)
+
+const (
+	bufferSize = 8192
+)
+
+func ReaderToChan(stream chan<- []byte, reader io.Reader) error {
+	for {
+		buffer := make([]byte, bufferSize)
+		nBytes, err := reader.Read(buffer)
+		if err != nil {
+			return err
+		}
+		stream <- buffer[:nBytes]
+	}
+	return nil
+}
+
+func ChanToWriter(writer io.Writer, stream <-chan []byte) error {
+	for buffer := range stream {
+		nBytes, err := writer.Write(buffer)
+		log.Debug("Writing %d bytes with error %v", nBytes, err)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}

+ 0 - 5
net/vmess/vmess.go

@@ -1,5 +0,0 @@
-package vmess
-
-const (
-	BufferSize = 512
-)

+ 6 - 22
net/vmess/vmessin.go

@@ -10,6 +10,7 @@ import (
 	v2io "github.com/v2ray/v2ray-core/io"
 	vmessio "github.com/v2ray/v2ray-core/io/vmess"
 	"github.com/v2ray/v2ray-core/log"
+	v2net "github.com/v2ray/v2ray-core/net"
 )
 
 type VMessInboundHandler struct {
@@ -92,31 +93,14 @@ func (handler *VMessInboundHandler) HandleConnection(connection net.Conn) error
 }
 
 func (handler *VMessInboundHandler) dumpInput(reader io.Reader, input chan<- []byte, finish chan<- bool) {
-	for {
-		buffer := make([]byte, BufferSize)
-		nBytes, err := reader.Read(buffer)
-		log.Debug("VMessInbound: Reading %d bytes with error %v", nBytes, err)
-		if err == io.EOF {
-			close(input)
-			log.Debug("VMessInbound finishing input.")
-			finish <- true
-			break
-		}
-		input <- buffer[:nBytes]
-	}
+	v2net.ReaderToChan(input, reader)
+	close(input)
+	finish <- true
 }
 
 func (handler *VMessInboundHandler) dumpOutput(writer io.Writer, output <-chan []byte, finish chan<- bool) {
-	for {
-		buffer, open := <-output
-		if !open {
-			finish <- true
-			log.Debug("VMessInbound finishing output.")
-			break
-		}
-		nBytes, err := writer.Write(buffer)
-		log.Debug("VmessInbound: Wrote %d bytes with error %v", nBytes, err)
-	}
+	v2net.ChanToWriter(writer, output)
+	finish <- true
 }
 
 func (handler *VMessInboundHandler) waitForFinish(finish <-chan bool) {

+ 5 - 22
net/vmess/vmessout.go

@@ -118,31 +118,14 @@ func (handler *VMessOutboundHandler) startCommunicate(request *vmessio.VMessRequ
 }
 
 func (handler *VMessOutboundHandler) dumpOutput(reader io.Reader, output chan<- []byte, finish chan<- bool) {
-	for {
-		buffer := make([]byte, BufferSize)
-		nBytes, err := reader.Read(buffer)
-		log.Debug("VMessOutbound: Reading %d bytes, with error %v", nBytes, err)
-		if err == io.EOF {
-			close(output)
-			finish <- true
-			log.Debug("VMessOutbound finishing output.")
-			break
-		}
-		output <- buffer[:nBytes]
-	}
+	v2net.ReaderToChan(output, reader)
+	close(output)
+	finish <- true
 }
 
 func (handler *VMessOutboundHandler) dumpInput(writer io.Writer, input <-chan []byte, finish chan<- bool) {
-	for {
-		buffer, open := <-input
-		if !open {
-			finish <- true
-			log.Debug("VMessOutbound finishing input.")
-			break
-		}
-		nBytes, err := writer.Write(buffer)
-		log.Debug("VMessOutbound: Wrote %d bytes with error %v", nBytes, err)
-	}
+	v2net.ChanToWriter(writer, input)
+	finish <- true
 }
 
 func (handler *VMessOutboundHandler) waitForFinish(finish <-chan bool) {

+ 1 - 1
release/server/main.go

@@ -8,7 +8,7 @@ import (
 	"github.com/v2ray/v2ray-core"
 	"github.com/v2ray/v2ray-core/log"
 
-  // The following are neccesary as they register handlers in their init functions.
+	// The following are neccesary as they register handlers in their init functions.
 	_ "github.com/v2ray/v2ray-core/net/freedom"
 	_ "github.com/v2ray/v2ray-core/net/socks"
 	_ "github.com/v2ray/v2ray-core/net/vmess"