Darien Raymond 9 år sedan
förälder
incheckning
4be27a6377
2 ändrade filer med 74 tillägg och 36 borttagningar
  1. 15 36
      proxy/dokodemo/dokodemo.go
  2. 59 0
      transport/hub/udp.go

+ 15 - 36
proxy/dokodemo/dokodemo.go

@@ -2,7 +2,6 @@ package dokodemo
 
 import (
 	"io"
-	"net"
 	"sync"
 
 	"github.com/v2ray/v2ray-core/app"
@@ -22,7 +21,7 @@ type DokodemoDoor struct {
 	port          v2net.Port
 	space         app.Space
 	tcpListener   *hub.TCPListener
-	udpConn       *net.UDPConn
+	udpHub        *hub.UDPHub
 	listeningPort v2net.Port
 }
 
@@ -47,10 +46,10 @@ func (this *DokodemoDoor) Close() {
 		this.tcpListener = nil
 		this.tcpMutex.Unlock()
 	}
-	if this.udpConn != nil {
-		this.udpConn.Close()
+	if this.udpHub != nil {
 		this.udpMutex.Lock()
-		this.udpConn = nil
+		this.udpHub.Close()
+		this.udpHub = nil
 		this.udpMutex.Unlock()
 	}
 }
@@ -82,52 +81,32 @@ func (this *DokodemoDoor) Listen(port v2net.Port) error {
 }
 
 func (this *DokodemoDoor) ListenUDP(port v2net.Port) error {
-	udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
-		IP:   []byte{0, 0, 0, 0},
-		Port: int(port),
-		Zone: "",
-	})
+	udpHub, err := hub.ListenUDP(port, this.handleUDPPackets)
 	if err != nil {
 		log.Error("Dokodemo failed to listen on port ", port, ": ", err)
 		return err
 	}
 	this.udpMutex.Lock()
-	this.udpConn = udpConn
+	this.udpHub = udpHub
 	this.udpMutex.Unlock()
-	go this.handleUDPPackets()
 	return nil
 }
 
-func (this *DokodemoDoor) handleUDPPackets() {
-	for this.accepting {
-		buffer := alloc.NewBuffer()
+func (this *DokodemoDoor) handleUDPPackets(payload *alloc.Buffer, dest v2net.Destination) {
+	packet := v2net.NewPacket(v2net.UDPDestination(this.address, this.port), payload, false)
+	ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
+	close(ray.InboundInput())
+
+	for resp := range ray.InboundOutput() {
 		this.udpMutex.RLock()
 		if !this.accepting {
 			this.udpMutex.RUnlock()
+			resp.Release()
 			return
 		}
-		nBytes, addr, err := this.udpConn.ReadFromUDP(buffer.Value)
+		this.udpHub.WriteTo(resp.Value, dest)
 		this.udpMutex.RUnlock()
-		buffer.Slice(0, nBytes)
-		if err != nil {
-			buffer.Release()
-			log.Error("Dokodemo failed to read from UDP: ", err)
-			return
-		}
-
-		packet := v2net.NewPacket(v2net.UDPDestination(this.address, this.port), buffer, false)
-		ray := this.space.PacketDispatcher().DispatchToOutbound(packet)
-		close(ray.InboundInput())
-
-		for payload := range ray.InboundOutput() {
-			this.udpMutex.RLock()
-			if !this.accepting {
-				this.udpMutex.RUnlock()
-				return
-			}
-			this.udpConn.WriteToUDP(payload.Value, addr)
-			this.udpMutex.RUnlock()
-		}
+		resp.Release()
 	}
 }
 

+ 59 - 0
transport/hub/udp.go

@@ -0,0 +1,59 @@
+package hub
+
+import (
+	"net"
+
+	"github.com/v2ray/v2ray-core/common/alloc"
+	v2net "github.com/v2ray/v2ray-core/common/net"
+)
+
+type UDPPayloadHandler func(*alloc.Buffer, v2net.Destination)
+
+type UDPHub struct {
+	conn      *net.UDPConn
+	callback  UDPPayloadHandler
+	accepting bool
+}
+
+func ListenUDP(port v2net.Port, callback UDPPayloadHandler) (*UDPHub, error) {
+	udpConn, err := net.ListenUDP("udp", &net.UDPAddr{
+		IP:   []byte{0, 0, 0, 0},
+		Port: int(port),
+	})
+	if err != nil {
+		return nil, err
+	}
+	hub := &UDPHub{
+		conn:     udpConn,
+		callback: callback,
+	}
+	go hub.start()
+	return hub, nil
+}
+
+func (this *UDPHub) Close() {
+	this.accepting = false
+	this.conn.Close()
+}
+
+func (this *UDPHub) WriteTo(payload []byte, dest v2net.Destination) (int, error) {
+	return this.conn.WriteToUDP(payload, &net.UDPAddr{
+		IP:   dest.Address().IP(),
+		Port: int(dest.Port()),
+	})
+}
+
+func (this *UDPHub) start() {
+  this.accepting = true
+	for this.accepting {
+		buffer := alloc.NewBuffer()
+		nBytes, addr, err := this.conn.ReadFromUDP(buffer.Value)
+		if err != nil {
+			buffer.Release()
+			continue
+		}
+		buffer.Slice(0, nBytes)
+		dest := v2net.UDPDestination(v2net.IPAddress(addr.IP), v2net.Port(addr.Port))
+		go this.callback(buffer, dest)
+	}
+}