Explorar o código

Cleanup root directory

V2Ray %!s(int64=10) %!d(string=hai) anos
pai
achega
890d185979

+ 10 - 0
app/packet_dispatcher.go

@@ -0,0 +1,10 @@
+package app
+
+import (
+	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport/ray"
+)
+
+type PacketDispatcher interface {
+	DispatchToOutbound(packet v2net.Packet) ray.InboundRay
+}

+ 14 - 45
point.go → app/point/point.go

@@ -1,34 +1,19 @@
-package core
+package point
 
 import (
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
 	"github.com/v2ray/v2ray-core/common/retry"
 	"github.com/v2ray/v2ray-core/config"
+	"github.com/v2ray/v2ray-core/proxy"
+	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
-var (
-	inboundFactories  = make(map[string]InboundConnectionHandlerFactory)
-	outboundFactories = make(map[string]OutboundConnectionHandlerFactory)
-)
-
-func RegisterInboundConnectionHandlerFactory(name string, factory InboundConnectionHandlerFactory) error {
-	// TODO check name
-	inboundFactories[name] = factory
-	return nil
-}
-
-func RegisterOutboundConnectionHandlerFactory(name string, factory OutboundConnectionHandlerFactory) error {
-	// TODO check name
-	outboundFactories[name] = factory
-	return nil
-}
-
 // Point is an single server in V2Ray system.
 type Point struct {
 	port uint16
-	ich  InboundConnectionHandler
-	och  OutboundConnectionHandler
+	ich  proxy.InboundConnectionHandler
+	och  proxy.OutboundConnectionHandler
 }
 
 // NewPoint returns a new Point server based on given configuration.
@@ -37,8 +22,8 @@ func NewPoint(pConfig config.PointConfig) (*Point, error) {
 	var vpoint = new(Point)
 	vpoint.port = pConfig.Port()
 
-	ichFactory, ok := inboundFactories[pConfig.InboundConfig().Protocol()]
-	if !ok {
+	ichFactory := proxy.GetInboundConnectionHandlerFactory(pConfig.InboundConfig().Protocol())
+	if ichFactory == nil {
 		log.Error("Unknown inbound connection handler factory %s", pConfig.InboundConfig().Protocol())
 		return nil, config.BadConfiguration
 	}
@@ -50,13 +35,13 @@ func NewPoint(pConfig config.PointConfig) (*Point, error) {
 	}
 	vpoint.ich = ich
 
-	ochFactory, ok := outboundFactories[pConfig.OutboundConfig().Protocol()]
-	if !ok {
+	ochFactory := proxy.GetOutboundConnectionHandlerFactory(pConfig.OutboundConfig().Protocol())
+	if ochFactory == nil {
 		log.Error("Unknown outbound connection handler factory %s", pConfig.OutboundConfig().Protocol())
 		return nil, config.BadConfiguration
 	}
 	ochConfig := pConfig.OutboundConfig().Settings(config.TypeOutbound)
-	och, err := ochFactory.Create(vpoint, ochConfig)
+	och, err := ochFactory.Create(ochConfig)
 	if err != nil {
 		log.Error("Failed to create outbound connection handler: %v", err)
 		return nil, err
@@ -66,22 +51,6 @@ func NewPoint(pConfig config.PointConfig) (*Point, error) {
 	return vpoint, nil
 }
 
-type InboundConnectionHandlerFactory interface {
-	Create(vp *Point, config interface{}) (InboundConnectionHandler, error)
-}
-
-type InboundConnectionHandler interface {
-	Listen(port uint16) error
-}
-
-type OutboundConnectionHandlerFactory interface {
-	Create(VP *Point, config interface{}) (OutboundConnectionHandler, error)
-}
-
-type OutboundConnectionHandler interface {
-	Dispatch(firstPacket v2net.Packet, ray OutboundRay) error
-}
-
 // Start starts the Point server, and return any error during the process.
 // In the case of any errors, the state of the server is unpredicatable.
 func (vp *Point) Start() error {
@@ -100,8 +69,8 @@ func (vp *Point) Start() error {
 	})
 }
 
-func (p *Point) DispatchToOutbound(packet v2net.Packet) InboundRay {
-	ray := NewRay()
-	go p.och.Dispatch(packet, ray)
-	return ray
+func (p *Point) DispatchToOutbound(packet v2net.Packet) ray.InboundRay {
+	direct := ray.NewRay()
+	go p.och.Dispatch(packet, direct)
+	return direct
 }

+ 2 - 2
proxy/freedom/freedom.go

@@ -4,10 +4,10 @@ import (
 	"net"
 	"sync"
 
-	"github.com/v2ray/v2ray-core"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
 type FreedomConnection struct {
@@ -17,7 +17,7 @@ func NewFreedomConnection() *FreedomConnection {
 	return &FreedomConnection{}
 }
 
-func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray core.OutboundRay) error {
+func (vconn *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error {
 	conn, err := net.Dial(firstPacket.Destination().Network(), firstPacket.Destination().Address().String())
 	log.Info("Freedom: Opening connection to %s", firstPacket.Destination().String())
 	if err != nil {

+ 3 - 3
proxy/freedom/freedomfactory.go

@@ -1,16 +1,16 @@
 package freedom
 
 import (
-	"github.com/v2ray/v2ray-core"
+	"github.com/v2ray/v2ray-core/proxy"
 )
 
 type FreedomFactory struct {
 }
 
-func (factory FreedomFactory) Create(vp *core.Point, config interface{}) (core.OutboundConnectionHandler, error) {
+func (factory FreedomFactory) Create(config interface{}) (proxy.OutboundConnectionHandler, error) {
 	return NewFreedomConnection(), nil
 }
 
 func init() {
-	core.RegisterOutboundConnectionHandlerFactory("freedom", FreedomFactory{})
+	proxy.RegisterOutboundConnectionHandlerFactory("freedom", FreedomFactory{})
 }

+ 13 - 0
proxy/inbound_connection.go

@@ -0,0 +1,13 @@
+package proxy
+
+import (
+	"github.com/v2ray/v2ray-core/app"
+)
+
+type InboundConnectionHandlerFactory interface {
+	Create(dispatch app.PacketDispatcher, config interface{}) (InboundConnectionHandler, error)
+}
+
+type InboundConnectionHandler interface {
+	Listen(port uint16) error
+}

+ 14 - 0
proxy/outbound_connection.go

@@ -0,0 +1,14 @@
+package proxy
+
+import (
+	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/transport/ray"
+)
+
+type OutboundConnectionHandlerFactory interface {
+	Create(config interface{}) (OutboundConnectionHandler, error)
+}
+
+type OutboundConnectionHandler interface {
+	Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error
+}

+ 34 - 0
proxy/proxy_cache.go

@@ -0,0 +1,34 @@
+package proxy
+
+var (
+	inboundFactories  = make(map[string]InboundConnectionHandlerFactory)
+	outboundFactories = make(map[string]OutboundConnectionHandlerFactory)
+)
+
+func RegisterInboundConnectionHandlerFactory(name string, factory InboundConnectionHandlerFactory) error {
+	// TODO check name
+	inboundFactories[name] = factory
+	return nil
+}
+
+func RegisterOutboundConnectionHandlerFactory(name string, factory OutboundConnectionHandlerFactory) error {
+	// TODO check name
+	outboundFactories[name] = factory
+	return nil
+}
+
+func GetInboundConnectionHandlerFactory(name string) InboundConnectionHandlerFactory {
+	factory, found := inboundFactories[name]
+	if !found {
+		return nil
+	}
+	return factory
+}
+
+func GetOutboundConnectionHandlerFactory(name string) OutboundConnectionHandlerFactory {
+	factory, found := outboundFactories[name]
+	if !found {
+		return nil
+	}
+	return factory
+}

+ 8 - 8
proxy/socks/socks.go

@@ -8,7 +8,7 @@ import (
 	"sync"
 	"time"
 
-	"github.com/v2ray/v2ray-core"
+	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
@@ -24,15 +24,15 @@ var (
 
 // SocksServer is a SOCKS 5 proxy server
 type SocksServer struct {
-	accepting bool
-	vPoint    *core.Point
-	config    *jsonconfig.SocksConfig
+	accepting  bool
+	dispatcher app.PacketDispatcher
+	config     *jsonconfig.SocksConfig
 }
 
-func NewSocksServer(vp *core.Point, config *jsonconfig.SocksConfig) *SocksServer {
+func NewSocksServer(dispatcher app.PacketDispatcher, config *jsonconfig.SocksConfig) *SocksServer {
 	return &SocksServer{
-		vPoint: vp,
-		config: config,
+		dispatcher: dispatcher,
+		config:     config,
 	}
 }
 
@@ -249,7 +249,7 @@ func (server *SocksServer) handleSocks4(reader io.Reader, writer io.Writer, auth
 }
 
 func (server *SocksServer) transport(reader io.Reader, writer io.Writer, firstPacket v2net.Packet) {
-	ray := server.vPoint.DispatchToOutbound(firstPacket)
+	ray := server.dispatcher.DispatchToOutbound(firstPacket)
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
 

+ 5 - 4
proxy/socks/socksfactory.go

@@ -1,19 +1,20 @@
 package socks
 
 import (
-	"github.com/v2ray/v2ray-core"
+	"github.com/v2ray/v2ray-core/app"
+	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/socks/config/json"
 )
 
 type SocksServerFactory struct {
 }
 
-func (factory SocksServerFactory) Create(vp *core.Point, rawConfig interface{}) (core.InboundConnectionHandler, error) {
+func (factory SocksServerFactory) Create(dispatcher app.PacketDispatcher, rawConfig interface{}) (proxy.InboundConnectionHandler, error) {
 	config := rawConfig.(*json.SocksConfig)
 	config.Initialize()
-	return NewSocksServer(vp, config), nil
+	return NewSocksServer(dispatcher, config), nil
 }
 
 func init() {
-	core.RegisterInboundConnectionHandlerFactory("socks", SocksServerFactory{})
+	proxy.RegisterInboundConnectionHandlerFactory("socks", SocksServerFactory{})
 }

+ 1 - 1
proxy/socks/udp.go

@@ -63,7 +63,7 @@ func (server *SocksServer) AcceptPackets(conn *net.UDPConn) error {
 }
 
 func (server *SocksServer) handlePacket(conn *net.UDPConn, packet v2net.Packet, clientAddr *net.UDPAddr, targetAddr v2net.Address) {
-	ray := server.vPoint.DispatchToOutbound(packet)
+	ray := server.dispatcher.DispatchToOutbound(packet)
 	close(ray.InboundInput())
 
 	for data := range ray.InboundOutput() {

+ 9 - 8
proxy/vmess/vmessin.go

@@ -6,25 +6,26 @@ import (
 	"net"
 	"sync"
 
-	"github.com/v2ray/v2ray-core"
+	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/vmess/protocol"
 	"github.com/v2ray/v2ray-core/proxy/vmess/protocol/user"
 )
 
 type VMessInboundHandler struct {
-	vPoint     *core.Point
+	dispatcher app.PacketDispatcher
 	clients    user.UserSet
 	accepting  bool
 	udpEnabled bool
 }
 
-func NewVMessInboundHandler(vp *core.Point, clients user.UserSet, udpEnabled bool) *VMessInboundHandler {
+func NewVMessInboundHandler(dispatcher app.PacketDispatcher, clients user.UserSet, udpEnabled bool) *VMessInboundHandler {
 	return &VMessInboundHandler{
-		vPoint:     vp,
+		dispatcher: dispatcher,
 		clients:    clients,
 		udpEnabled: udpEnabled,
 	}
@@ -77,7 +78,7 @@ func (handler *VMessInboundHandler) HandleConnection(connection *net.TCPConn) er
 	log.Access(connection.RemoteAddr().String(), request.Address.String(), log.AccessAccepted, "")
 	log.Debug("VMessIn: Received request for %s", request.Address.String())
 
-	ray := handler.vPoint.DispatchToOutbound(v2net.NewPacket(request.Destination(), nil, true))
+	ray := handler.dispatcher.DispatchToOutbound(v2net.NewPacket(request.Destination(), nil, true))
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
 	var readFinish, writeFinish sync.Mutex
@@ -135,7 +136,7 @@ func handleOutput(request *protocol.VMessRequest, writer io.Writer, output <-cha
 type VMessInboundHandlerFactory struct {
 }
 
-func (factory *VMessInboundHandlerFactory) Create(vp *core.Point, rawConfig interface{}) (core.InboundConnectionHandler, error) {
+func (factory *VMessInboundHandlerFactory) Create(dispatcher app.PacketDispatcher, rawConfig interface{}) (proxy.InboundConnectionHandler, error) {
 	config := rawConfig.(*VMessInboundConfig)
 
 	allowedClients := user.NewTimedUserSet()
@@ -148,9 +149,9 @@ func (factory *VMessInboundHandlerFactory) Create(vp *core.Point, rawConfig inte
 		allowedClients.AddUser(user)
 	}
 
-	return NewVMessInboundHandler(vp, allowedClients, config.UDPEnabled), nil
+	return NewVMessInboundHandler(dispatcher, allowedClients, config.UDPEnabled), nil
 }
 
 func init() {
-	core.RegisterInboundConnectionHandlerFactory("vmess", &VMessInboundHandlerFactory{})
+	proxy.RegisterInboundConnectionHandlerFactory("vmess", &VMessInboundHandlerFactory{})
 }

+ 1 - 1
proxy/vmess/vmessin_udp.go

@@ -73,7 +73,7 @@ func (handler *VMessInboundHandler) AcceptPackets(conn *net.UDPConn) {
 }
 
 func (handler *VMessInboundHandler) handlePacket(conn *net.UDPConn, request *protocol.VMessRequest, packet v2net.Packet, clientAddr *net.UDPAddr) {
-	ray := handler.vPoint.DispatchToOutbound(packet)
+	ray := handler.dispatcher.DispatchToOutbound(packet)
 	close(ray.InboundInput())
 
 	responseKey := md5.Sum(request.RequestKey)

+ 8 - 9
proxy/vmess/vmessout.go

@@ -8,13 +8,14 @@ import (
 	"net"
 	"sync"
 
-	"github.com/v2ray/v2ray-core"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	v2io "github.com/v2ray/v2ray-core/common/io"
 	"github.com/v2ray/v2ray-core/common/log"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/proxy"
 	"github.com/v2ray/v2ray-core/proxy/vmess/protocol"
 	"github.com/v2ray/v2ray-core/proxy/vmess/protocol/user"
+	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
 const (
@@ -28,14 +29,12 @@ type VNextServer struct {
 }
 
 type VMessOutboundHandler struct {
-	vPoint       *core.Point
 	vNextList    []*VNextServer
 	vNextListUDP []*VNextServer
 }
 
-func NewVMessOutboundHandler(vp *core.Point, vNextList, vNextListUDP []*VNextServer) *VMessOutboundHandler {
+func NewVMessOutboundHandler(vNextList, vNextListUDP []*VNextServer) *VMessOutboundHandler {
 	return &VMessOutboundHandler{
-		vPoint:       vp,
 		vNextList:    vNextList,
 		vNextListUDP: vNextListUDP,
 	}
@@ -64,7 +63,7 @@ func pickVNext(serverList []*VNextServer) (v2net.Destination, user.User) {
 	return vNext.Destination, vNextUser
 }
 
-func (handler *VMessOutboundHandler) Dispatch(firstPacket v2net.Packet, ray core.OutboundRay) error {
+func (handler *VMessOutboundHandler) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error {
 	vNextList := handler.vNextList
 	if firstPacket.Destination().IsUDP() {
 		vNextList = handler.vNextListUDP
@@ -91,7 +90,7 @@ func (handler *VMessOutboundHandler) Dispatch(firstPacket v2net.Packet, ray core
 	return startCommunicate(request, vNextAddress, ray, firstPacket)
 }
 
-func startCommunicate(request *protocol.VMessRequest, dest v2net.Destination, ray core.OutboundRay, firstPacket v2net.Packet) error {
+func startCommunicate(request *protocol.VMessRequest, dest v2net.Destination, ray ray.OutboundRay, firstPacket v2net.Packet) error {
 	conn, err := net.Dial(dest.Network(), dest.Address().String())
 	if err != nil {
 		log.Error("Failed to open %s: %v", dest.String(), err)
@@ -199,7 +198,7 @@ func handleResponse(conn net.Conn, request *protocol.VMessRequest, output chan<-
 type VMessOutboundHandlerFactory struct {
 }
 
-func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig interface{}) (core.OutboundConnectionHandler, error) {
+func (factory *VMessOutboundHandlerFactory) Create(rawConfig interface{}) (proxy.OutboundConnectionHandler, error) {
 	config := rawConfig.(*VMessOutboundConfig)
 	servers := make([]*VNextServer, 0, len(config.VNextList))
 	udpServers := make([]*VNextServer, 0, len(config.VNextList))
@@ -222,9 +221,9 @@ func (factory *VMessOutboundHandlerFactory) Create(vp *core.Point, rawConfig int
 
 		}
 	}
-	return NewVMessOutboundHandler(vp, servers, udpServers), nil
+	return NewVMessOutboundHandler(servers, udpServers), nil
 }
 
 func init() {
-	core.RegisterOutboundConnectionHandlerFactory("vmess", &VMessOutboundHandlerFactory{})
+	proxy.RegisterOutboundConnectionHandlerFactory("vmess", &VMessOutboundHandlerFactory{})
 }

+ 2 - 1
release/server/main.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 
 	"github.com/v2ray/v2ray-core"
+	"github.com/v2ray/v2ray-core/app/point"
 	"github.com/v2ray/v2ray-core/common/log"
 	jsonconf "github.com/v2ray/v2ray-core/config/json"
 
@@ -58,7 +59,7 @@ func main() {
 		log.InitAccessLogger(config.LogConfig().AccessLog())
 	}
 
-	vPoint, err := core.NewPoint(config)
+	vPoint, err := point.NewPoint(config)
 	if err != nil {
 		log.Error("Failed to create Point server: %v", err)
 		return

+ 6 - 5
testing/mocks/inboundhandler.go

@@ -3,16 +3,17 @@ package mocks
 import (
 	"bytes"
 
-	"github.com/v2ray/v2ray-core"
+	"github.com/v2ray/v2ray-core/app"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/proxy"
 )
 
 type InboundConnectionHandler struct {
 	Data2Send    []byte
 	DataReturned *bytes.Buffer
 	Port         uint16
-	Server       *core.Point
+	Dispatcher   app.PacketDispatcher
 }
 
 func (handler *InboundConnectionHandler) Listen(port uint16) error {
@@ -21,7 +22,7 @@ func (handler *InboundConnectionHandler) Listen(port uint16) error {
 }
 
 func (handler *InboundConnectionHandler) Communicate(packet v2net.Packet) error {
-	ray := handler.Server.DispatchToOutbound(packet)
+	ray := handler.Dispatcher.DispatchToOutbound(packet)
 
 	input := ray.InboundInput()
 	output := ray.InboundOutput()
@@ -36,7 +37,7 @@ func (handler *InboundConnectionHandler) Communicate(packet v2net.Packet) error
 	return nil
 }
 
-func (handler *InboundConnectionHandler) Create(point *core.Point, config interface{}) (core.InboundConnectionHandler, error) {
-	handler.Server = point
+func (handler *InboundConnectionHandler) Create(dispatcher app.PacketDispatcher, config interface{}) (proxy.InboundConnectionHandler, error) {
+	handler.Dispatcher = dispatcher
 	return handler, nil
 }

+ 4 - 3
testing/mocks/outboundhandler.go

@@ -3,9 +3,10 @@ package mocks
 import (
 	"bytes"
 
-	"github.com/v2ray/v2ray-core"
 	"github.com/v2ray/v2ray-core/common/alloc"
 	v2net "github.com/v2ray/v2ray-core/common/net"
+	"github.com/v2ray/v2ray-core/proxy"
+	"github.com/v2ray/v2ray-core/transport/ray"
 )
 
 type OutboundConnectionHandler struct {
@@ -14,7 +15,7 @@ type OutboundConnectionHandler struct {
 	Destination v2net.Destination
 }
 
-func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray core.OutboundRay) error {
+func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.OutboundRay) error {
 	input := ray.OutboundInput()
 	output := ray.OutboundOutput()
 
@@ -42,6 +43,6 @@ func (handler *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray core
 	return nil
 }
 
-func (handler *OutboundConnectionHandler) Create(point *core.Point, config interface{}) (core.OutboundConnectionHandler, error) {
+func (handler *OutboundConnectionHandler) Create(config interface{}) (proxy.OutboundConnectionHandler, error) {
 	return handler, nil
 }

+ 18 - 13
ray.go → transport/ray/ray.go

@@ -1,4 +1,4 @@
-package core
+package ray
 
 import (
 	"github.com/v2ray/v2ray-core/common/alloc"
@@ -8,14 +8,8 @@ const (
 	bufferSize = 16
 )
 
-// Ray is an internal tranport channel bewteen inbound and outbound connection.
-type Ray struct {
-	Input  chan *alloc.Buffer
-	Output chan *alloc.Buffer
-}
-
-func NewRay() *Ray {
-	return &Ray{
+func NewRay() Ray {
+	return &directRay{
 		Input:  make(chan *alloc.Buffer, bufferSize),
 		Output: make(chan *alloc.Buffer, bufferSize),
 	}
@@ -46,18 +40,29 @@ type InboundRay interface {
 	InboundOutput() <-chan *alloc.Buffer
 }
 
-func (ray *Ray) OutboundInput() <-chan *alloc.Buffer {
+// Ray is an internal tranport channel bewteen inbound and outbound connection.
+type Ray interface {
+	InboundRay
+	OutboundRay
+}
+
+type directRay struct {
+	Input  chan *alloc.Buffer
+	Output chan *alloc.Buffer
+}
+
+func (ray *directRay) OutboundInput() <-chan *alloc.Buffer {
 	return ray.Input
 }
 
-func (ray *Ray) OutboundOutput() chan<- *alloc.Buffer {
+func (ray *directRay) OutboundOutput() chan<- *alloc.Buffer {
 	return ray.Output
 }
 
-func (ray *Ray) InboundInput() chan<- *alloc.Buffer {
+func (ray *directRay) InboundInput() chan<- *alloc.Buffer {
 	return ray.Input
 }
 
-func (ray *Ray) InboundOutput() <-chan *alloc.Buffer {
+func (ray *directRay) InboundOutput() <-chan *alloc.Buffer {
 	return ray.Output
 }