| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- package inbound
- import (
- "context"
- "github.com/mustafaturan/bus"
- "github.com/v2fly/v2ray-core/v5/common"
- "github.com/v2fly/v2ray-core/v5/common/environment"
- "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
- "github.com/v2fly/v2ray-core/v5/common/net"
- "github.com/v2fly/v2ray-core/v5/common/session"
- "github.com/v2fly/v2ray-core/v5/common/signal/done"
- "github.com/v2fly/v2ray-core/v5/features/routing"
- "github.com/v2fly/v2ray-core/v5/transport/internet"
- "github.com/xiaokangwang/VLite/interfaces"
- "github.com/xiaokangwang/VLite/interfaces/ibus"
- "github.com/xiaokangwang/VLite/transport"
- udpsctpserver "github.com/xiaokangwang/VLite/transport/packetsctp/sctprelay"
- "github.com/xiaokangwang/VLite/transport/packetuni/puniServer"
- "github.com/xiaokangwang/VLite/transport/udp/udpServer"
- "github.com/xiaokangwang/VLite/transport/udp/udpuni/udpunis"
- "github.com/xiaokangwang/VLite/transport/uni/uniserver"
- "github.com/xiaokangwang/VLite/workers/server"
- "io"
- gonet "net"
- "strconv"
- "sync"
- )
- //go:generate go run github.com/v2fly/v2ray-core/v5/common/errors/errorgen
- func NewUDPInboundHandler(ctx context.Context, config *UDPProtocolConfig) (*Handler, error) {
- proxyEnvironment := envctx.EnvironmentFromContext(ctx).(environment.ProxyEnvironment)
- statusInstance, err := createStatusFromConfig(config)
- if err != nil {
- return nil, newError("unable to initialize vlite").Base(err)
- }
- proxyEnvironment.TransientStorage().Put(ctx, "status", statusInstance)
- return &Handler{ctx: ctx}, nil
- }
- type Handler struct {
- ctx context.Context
- }
- func (h *Handler) Network() []net.Network {
- list := []net.Network{net.Network_UDP}
- return list
- }
- type status struct {
- config *UDPProtocolConfig
- password []byte
- msgbus *bus.Bus
- ctx context.Context
- transport transport.UnderlayTransportListener
- access sync.Mutex
- }
- func (s *status) RelayStream(conn io.ReadWriteCloser, ctx context.Context) {
- }
- func (s *status) Connection(conn gonet.Conn, connctx context.Context) context.Context {
- S_S2CTraffic := make(chan server.UDPServerTxToClientTraffic, 8)
- S_S2CDataTraffic := make(chan server.UDPServerTxToClientDataTraffic, 8)
- S_C2STraffic := make(chan server.UDPServerRxFromClientTraffic, 8)
- S_S2CTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
- S_S2CDataTraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
- S_C2STraffic2 := make(chan interfaces.TrafficWithChannelTag, 8)
- go func(ctx context.Context) {
- for {
- select {
- case data := <-S_S2CTraffic:
- S_S2CTraffic2 <- interfaces.TrafficWithChannelTag(data)
- case <-ctx.Done():
- return
- }
- }
- }(connctx)
- go func(ctx context.Context) {
- for {
- select {
- case data := <-S_S2CDataTraffic:
- S_S2CDataTraffic2 <- interfaces.TrafficWithChannelTag(data)
- case <-ctx.Done():
- return
- }
- }
- }(connctx)
- go func(ctx context.Context) {
- for {
- select {
- case data := <-S_C2STraffic2:
- S_C2STraffic <- server.UDPServerRxFromClientTraffic(data)
- case <-ctx.Done():
- return
- }
- }
- }(connctx)
- if s.config.EnableStabilization && s.config.EnableRenegotiation {
- relay := udpsctpserver.NewPacketRelayServer(conn, S_S2CTraffic2, S_S2CDataTraffic2, S_C2STraffic2, s, s.password, connctx)
- udpserver := server.UDPServer(connctx, S_S2CTraffic, S_S2CDataTraffic, S_C2STraffic, relay)
- _ = udpserver
- } else {
- relay := puniServer.NewPacketUniServer(S_S2CTraffic2, S_S2CDataTraffic2, S_C2STraffic2, s, s.password, connctx)
- relay.OnAutoCarrier(conn, connctx)
- udpserver := server.UDPServer(connctx, S_S2CTraffic, S_S2CDataTraffic, S_C2STraffic, relay)
- _ = udpserver
- }
- return nil
- }
- func createStatusFromConfig(config *UDPProtocolConfig) (*status, error) {
- s := &status{ctx: context.Background(), config: config}
- s.password = []byte(config.Password)
- s.msgbus = ibus.NewMessageBus()
- s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsMessageBus, s.msgbus)
- if config.ScramblePacket {
- s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPShouldMask, true)
- }
- if s.config.EnableFec {
- s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPFECEnabled, true)
- }
- s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUDPMask, string(s.password))
- if config.HandshakeMaskingPaddingSize != 0 {
- ctxv := &interfaces.ExtraOptionsUsePacketArmorValue{PacketArmorPaddingTo: int(config.HandshakeMaskingPaddingSize), UsePacketArmor: true}
- s.ctx = context.WithValue(s.ctx, interfaces.ExtraOptionsUsePacketArmor, ctxv)
- }
- return s, nil
- }
- func enableInterface(s *status) error {
- s.transport = s
- if s.config.EnableStabilization {
- s.transport = uniserver.NewUnifiedConnectionTransportHub(s, s.ctx)
- }
- s.transport = udpunis.NewUdpUniServer(string(s.password), s.ctx, s.transport)
- return nil
- }
- func (h *Handler) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher routing.Dispatcher) error {
- proxyEnvironment := envctx.EnvironmentFromContext(h.ctx).(environment.ProxyEnvironment)
- statusInstanceIfce, err := proxyEnvironment.TransientStorage().Get(ctx, "status")
- if err != nil {
- return newError("uninitialized handler").Base(err)
- }
- statusInstance := statusInstanceIfce.(*status)
- err = h.ensureStarted(statusInstance)
- if err != nil {
- return newError("unable to initialize").Base(err)
- }
- finish := done.New()
- conn = newUDPConnAdaptor(conn, finish)
- var initialData [1600]byte
- c, err := conn.Read(initialData[:])
- if err != nil {
- return newError("unable to read initial data").Base(err)
- }
- connID := session.IDFromContext(ctx)
- vconn, connctx := udpServer.PrepareIncomingUDPConnection(conn, statusInstance.ctx, initialData[:c], strconv.FormatInt(int64(connID), 10))
- connctx = statusInstance.transport.Connection(vconn, connctx)
- if connctx == nil {
- return newError("invalid connection discarded")
- }
- <-finish.Wait()
- return nil
- }
- func (h *Handler) ensureStarted(s *status) error {
- s.access.Lock()
- defer s.access.Unlock()
- if s.transport == nil {
- err := enableInterface(s)
- if err != nil {
- return err
- }
- }
- return nil
- }
- func init() {
- common.Must(common.RegisterConfig((*UDPProtocolConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
- return NewUDPInboundHandler(ctx, config.(*UDPProtocolConfig))
- }))
- }
|