瀏覽代碼

reuse outbound connection handler

V2Ray 10 年之前
父節點
當前提交
cd81e5531b

+ 5 - 5
common/collect/timed_queue.go

@@ -72,16 +72,16 @@ func (queue *TimedQueue) RemovedEntries() <-chan interface{} {
 func (queue *TimedQueue) cleanup(tick <-chan time.Time) {
 	for {
 		now := <-tick
-    queue.access.RLock()
-    queueLen := queue.queue.Len()
-    queue.access.RUnlock()
+		queue.access.RLock()
+		queueLen := queue.queue.Len()
+		queue.access.RUnlock()
 		if queueLen == 0 {
 			continue
 		}
 		nowSec := now.UTC().Unix()
-    queue.access.RLock()
+		queue.access.RLock()
 		firstEntryTime := queue.queue[0].timeSec
-    queue.access.RUnlock()
+		queue.access.RUnlock()
 		if firstEntryTime > nowSec {
 			continue
 		}

+ 23 - 20
point.go

@@ -25,11 +25,9 @@ func RegisterOutboundConnectionHandlerFactory(name string, factory OutboundConne
 
 // Point is an single server in V2Ray system.
 type Point struct {
-	port       uint16
-	ichFactory InboundConnectionHandlerFactory
-	ichConfig  interface{}
-	ochFactory OutboundConnectionHandlerFactory
-	ochConfig  interface{}
+	port uint16
+	ich  InboundConnectionHandler
+	och  OutboundConnectionHandler
 }
 
 // NewPoint returns a new Point server based on given configuration.
@@ -42,16 +40,25 @@ func NewPoint(pConfig config.PointConfig) (*Point, error) {
 	if !ok {
 		panic(log.Error("Unknown inbound connection handler factory %s", pConfig.InboundConfig().Protocol()))
 	}
-	vpoint.ichFactory = ichFactory
-	vpoint.ichConfig = pConfig.InboundConfig().Settings(config.TypeInbound)
+	ichConfig := pConfig.InboundConfig().Settings(config.TypeInbound)
+	ich, err := ichFactory.Create(vpoint, ichConfig)
+	if err != nil {
+		log.Error("Failed to create inbound connection handler: %v", err)
+		return nil, err
+	}
+	vpoint.ich = ich
 
 	ochFactory, ok := outboundFactories[pConfig.OutboundConfig().Protocol()]
 	if !ok {
 		panic(log.Error("Unknown outbound connection handler factory %s", pConfig.OutboundConfig().Protocol))
 	}
-
-	vpoint.ochFactory = ochFactory
-	vpoint.ochConfig = pConfig.OutboundConfig().Settings(config.TypeOutbound)
+	ochConfig := pConfig.OutboundConfig().Settings(config.TypeOutbound)
+	och, err := ochFactory.Create(vpoint, ochConfig)
+	if err != nil {
+		log.Error("Failed to create outbound connection handler: %v", err)
+		return nil, err
+	}
+	vpoint.och = och
 
 	return vpoint, nil
 }
@@ -65,11 +72,11 @@ type InboundConnectionHandler interface {
 }
 
 type OutboundConnectionHandlerFactory interface {
-	Create(VP *Point, config interface{}, firstPacket v2net.Packet) (OutboundConnectionHandler, error)
+	Create(VP *Point, config interface{}) (OutboundConnectionHandler, error)
 }
 
 type OutboundConnectionHandler interface {
-	Start(ray OutboundRay) error
+	Dispatch(firstPacket v2net.Packet, ray OutboundRay) error
 }
 
 // Start starts the Point server, and return any error during the process.
@@ -79,18 +86,14 @@ func (vp *Point) Start() error {
 		return log.Error("Invalid port %d", vp.port)
 	}
 
-	inboundConnectionHandler, err := vp.ichFactory.Create(vp, vp.ichConfig)
-	if err != nil {
-		return err
-	}
-	err = inboundConnectionHandler.Listen(vp.port)
-	return nil
+	err := vp.ich.Listen(vp.port)
+	// TODO: handle error
+	return err
 }
 
 func (p *Point) DispatchToOutbound(packet v2net.Packet) InboundRay {
 	ray := NewRay()
 	// TODO: handle error
-	och, _ := p.ochFactory.Create(p, p.ochConfig, packet)
-	_ = och.Start(ray)
+	p.och.Dispatch(packet, ray)
 	return ray
 }

+ 9 - 12
proxy/freedom/freedom.go

@@ -10,23 +10,20 @@ import (
 )
 
 type FreedomConnection struct {
-	packet v2net.Packet
 }
 
-func NewFreedomConnection(firstPacket v2net.Packet) *FreedomConnection {
-	return &FreedomConnection{
-		packet: firstPacket,
-	}
+func NewFreedomConnection() *FreedomConnection {
+	return &FreedomConnection{}
 }
 
-func (vconn *FreedomConnection) Start(ray core.OutboundRay) error {
-	conn, err := net.Dial(vconn.packet.Destination().Network(), vconn.packet.Destination().Address().String())
-	log.Info("Freedom: Opening connection to %s", vconn.packet.Destination().String())
+func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.OutboundRay) error {
+	conn, err := net.Dial(firstPacket.Destination().Network(), firstPacket.Destination().Address().String())
+	log.Info("Freedom: Opening connection to %s", firstPacket.Destination().String())
 	if err != nil {
 		if ray != nil {
 			close(ray.OutboundOutput())
 		}
-		return log.Error("Freedom: Failed to open connection: %s : %v", vconn.packet.Destination().String(), err)
+		return log.Error("Freedom: Failed to open connection: %s : %v", firstPacket.Destination().String(), err)
 	}
 
 	input := ray.OutboundInput()
@@ -35,17 +32,17 @@ func (vconn *FreedomConnection) Start(ray core.OutboundRay) error {
 	readMutex.Lock()
 	writeMutex.Lock()
 
-	if chunk := vconn.packet.Chunk(); chunk != nil {
+	if chunk := firstPacket.Chunk(); chunk != nil {
 		conn.Write(chunk)
 	}
 
-	if !vconn.packet.MoreChunks() {
+	if !firstPacket.MoreChunks() {
 		writeMutex.Unlock()
 	} else {
 		go dumpInput(conn, input, &writeMutex)
 	}
 
-	go dumpOutput(conn, output, &readMutex, vconn.packet.Destination().IsUDP())
+	go dumpOutput(conn, output, &readMutex, firstPacket.Destination().IsUDP())
 
 	go func() {
 		writeMutex.Lock()

+ 2 - 3
proxy/freedom/freedomfactory.go

@@ -2,14 +2,13 @@ package freedom
 
 import (
 	"github.com/v2ray/v2ray-core"
-	v2net "github.com/v2ray/v2ray-core/common/net"
 )
 
 type FreedomFactory struct {
 }
 
-func (factory FreedomFactory) Create(vp *core.Point, config interface{}, firstPacket v2net.Packet) (core.OutboundConnectionHandler, error) {
-	return NewFreedomConnection(firstPacket), nil
+func (factory FreedomFactory) Create(vp *core.Point, config interface{}) (core.OutboundConnectionHandler, error) {
+	return NewFreedomConnection(), nil
 }
 
 func init() {

+ 8 - 10
proxy/vmess/vmessout.go

@@ -28,15 +28,13 @@ type VNextServer struct {
 
 type VMessOutboundHandler struct {
 	vPoint       *core.Point
-	packet       v2net.Packet
 	vNextList    []VNextServer
 	vNextListUDP []VNextServer
 }
 
-func NewVMessOutboundHandler(vp *core.Point, vNextList, vNextListUDP []VNextServer, firstPacket v2net.Packet) *VMessOutboundHandler {
+func NewVMessOutboundHandler(vp *core.Point, vNextList, vNextListUDP []VNextServer) *VMessOutboundHandler {
 	return &VMessOutboundHandler{
 		vPoint:       vp,
-		packet:       firstPacket,
 		vNextList:    vNextList,
 		vNextListUDP: vNextListUDP,
 	}
@@ -65,28 +63,28 @@ func pickVNext(serverList []VNextServer) (v2net.Destination, user.User) {
 	return vNext.Destination, vNextUser
 }
 
-func (handler *VMessOutboundHandler) Start(ray core.OutboundRay) error {
+func (handler *VMessOutboundHandler) Dispatch(firstPacket v2net.Packet, ray core.OutboundRay) error {
 	vNextList := handler.vNextList
-	if handler.packet.Destination().IsUDP() {
+	if firstPacket.Destination().IsUDP() {
 		vNextList = handler.vNextListUDP
 	}
 	vNextAddress, vNextUser := pickVNext(vNextList)
 
 	command := protocol.CmdTCP
-	if handler.packet.Destination().IsUDP() {
+	if firstPacket.Destination().IsUDP() {
 		command = protocol.CmdUDP
 	}
 	request := &protocol.VMessRequest{
 		Version: protocol.Version,
 		UserId:  vNextUser.Id,
 		Command: command,
-		Address: handler.packet.Destination().Address(),
+		Address: firstPacket.Destination().Address(),
 	}
 	rand.Read(request.RequestIV[:])
 	rand.Read(request.RequestKey[:])
 	rand.Read(request.ResponseHeader[:])
 
-	go startCommunicate(request, vNextAddress, ray, handler.packet)
+	go startCommunicate(request, vNextAddress, ray, firstPacket)
 	return nil
 }
 
@@ -195,7 +193,7 @@ func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<-
 type VMessOutboundHandlerFactory struct {
 }
 
-func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig interface{}, firstPacket v2net.Packet) (core.OutboundConnectionHandler, error) {
+func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig interface{}) (core.OutboundConnectionHandler, error) {
 	config := rawConfig.(*VMessOutboundConfig)
 	servers := make([]VNextServer, 0, len(config.VNextList))
 	udpServers := make([]VNextServer, 0, len(config.VNextList))
@@ -207,7 +205,7 @@ func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig int
 			udpServers = append(udpServers, server.ToVNextServer("udp"))
 		}
 	}
-	return NewVMessOutboundHandler(vp, servers, udpServers, firstPacket), nil
+	return NewVMessOutboundHandler(vp, servers, udpServers), nil
 }
 
 func init() {

+ 1 - 1
release/server/main.go

@@ -9,8 +9,8 @@ import (
 	jsonconf "github.com/v2ray/v2ray-core/config/json"
 
 	// The following are neccesary as they register handlers in their init functions.
+	_ "github.com/v2ray/v2ray-core/proxy/freedom"
 	_ "github.com/v2ray/v2ray-core/proxy/freedom/config/json"
-  _ "github.com/v2ray/v2ray-core/proxy/freedom"
 	_ "github.com/v2ray/v2ray-core/proxy/socks"
 	_ "github.com/v2ray/v2ray-core/proxy/vmess"
 )

+ 7 - 7
testing/mocks/outboundhandler.go

@@ -13,10 +13,15 @@ type OutboundConnectionHandler struct {
 	Destination v2net.Destination
 }
 
-func (handler *OutboundConnectionHandler) Start(ray core.OutboundRay) error {
+func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray core.OutboundRay) error {
 	input := ray.OutboundInput()
 	output := ray.OutboundOutput()
 
+	handler.Destination = packet.Destination()
+	if packet.Chunk() != nil {
+		handler.Data2Send.Write(packet.Chunk())
+	}
+
 	go func() {
 		for {
 			data, open := <-input
@@ -34,11 +39,6 @@ func (handler *OutboundConnectionHandler) Start(ray core.OutboundRay) error {
 	return nil
 }
 
-func (handler *OutboundConnectionHandler) Create(point *core.Point, config interface{}, packet v2net.Packet) (core.OutboundConnectionHandler, error) {
-	handler.Destination = packet.Destination()
-	if packet.Chunk() != nil {
-		handler.Data2Send.Write(packet.Chunk())
-	}
-
+func (handler *OutboundConnectionHandler) Create(point *core.Point, config interface{}) (core.OutboundConnectionHandler, error) {
 	return handler, nil
 }