| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- package hub
- import (
- "sync"
- "github.com/v2ray/v2ray-core/app/dispatcher"
- "github.com/v2ray/v2ray-core/common/alloc"
- v2net "github.com/v2ray/v2ray-core/common/net"
- "github.com/v2ray/v2ray-core/transport/ray"
- )
- type UDPResponseCallback func(destination v2net.Destination, payload *alloc.Buffer)
- type connEntry struct {
- inboundRay ray.InboundRay
- callback UDPResponseCallback
- }
- type UDPServer struct {
- sync.RWMutex
- conns map[string]*connEntry
- packetDispatcher dispatcher.PacketDispatcher
- }
- func NewUDPServer(packetDispatcher dispatcher.PacketDispatcher) *UDPServer {
- return &UDPServer{
- conns: make(map[string]*connEntry),
- packetDispatcher: packetDispatcher,
- }
- }
- func (this *UDPServer) locateExistingAndDispatch(dest string, payload *alloc.Buffer) bool {
- this.RLock()
- defer this.RUnlock()
- if entry, found := this.conns[dest]; found {
- entry.inboundRay.InboundInput().Write(payload)
- return true
- }
- return false
- }
- func (this *UDPServer) Dispatch(source v2net.Destination, destination v2net.Destination, payload *alloc.Buffer, callback UDPResponseCallback) {
- destString := source.String() + "-" + destination.NetAddr()
- if this.locateExistingAndDispatch(destString, payload) {
- return
- }
- this.Lock()
- inboundRay := this.packetDispatcher.DispatchToOutbound(destination)
- inboundRay.InboundInput().Write(payload)
- this.conns[destString] = &connEntry{
- inboundRay: inboundRay,
- callback: callback,
- }
- this.Unlock()
- go this.handleConnection(destString, inboundRay, source, callback)
- }
- func (this *UDPServer) handleConnection(destString string, inboundRay ray.InboundRay, source v2net.Destination, callback UDPResponseCallback) {
- for {
- data, err := inboundRay.InboundOutput().Read()
- if err != nil {
- break
- }
- callback(source, data)
- }
- this.Lock()
- inboundRay.InboundInput().Release()
- inboundRay.InboundOutput().Release()
- delete(this.conns, destString)
- this.Unlock()
- }
|