| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- package hysteria2
- import (
- "context"
- "sync"
- hyClient "github.com/apernet/hysteria/core/v2/client"
- hyProtocol "github.com/apernet/hysteria/core/v2/international/protocol"
- "github.com/apernet/quic-go/quicvarint"
- "github.com/v2fly/v2ray-core/v5/common"
- "github.com/v2fly/v2ray-core/v5/common/net"
- "github.com/v2fly/v2ray-core/v5/common/session"
- "github.com/v2fly/v2ray-core/v5/transport/internet"
- "github.com/v2fly/v2ray-core/v5/transport/internet/tls"
- )
- type dialerConf struct {
- net.Destination
- *internet.MemoryStreamConfig
- }
- var RunningClient map[dialerConf](hyClient.Client)
- var ClientMutex sync.Mutex
- var MBps uint64 = 1000000 / 8 // MByte
- func GetClientTLSConfig(dest net.Destination, streamSettings *internet.MemoryStreamConfig) (*hyClient.TLSConfig, error) {
- config := tls.ConfigFromStreamSettings(streamSettings)
- if config == nil {
- return nil, newError(Hy2MustNeedTLS)
- }
- tlsConfig := config.GetTLSConfig(tls.WithDestination(dest))
- return &hyClient.TLSConfig{
- RootCAs: tlsConfig.RootCAs,
- ServerName: tlsConfig.ServerName,
- InsecureSkipVerify: tlsConfig.InsecureSkipVerify,
- VerifyPeerCertificate: tlsConfig.VerifyPeerCertificate,
- }, nil
- }
- func ResolveAddress(dest net.Destination) (net.Addr, error) {
- var destAddr *net.UDPAddr
- if dest.Address.Family().IsIP() {
- destAddr = &net.UDPAddr{
- IP: dest.Address.IP(),
- Port: int(dest.Port),
- }
- } else {
- addr, err := net.ResolveUDPAddr("udp", dest.NetAddr())
- if err != nil {
- return nil, err
- }
- destAddr = addr
- }
- return destAddr, nil
- }
- type connFactory struct {
- hyClient.ConnFactory
- NewFunc func(addr net.Addr) (net.PacketConn, error)
- }
- func (f *connFactory) New(addr net.Addr) (net.PacketConn, error) {
- return f.NewFunc(addr)
- }
- func NewHyClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) (hyClient.Client, error) {
- tlsConfig, err := GetClientTLSConfig(dest, streamSettings)
- if err != nil {
- return nil, err
- }
- serverAddr, err := ResolveAddress(dest)
- if err != nil {
- return nil, err
- }
- config := streamSettings.ProtocolSettings.(*Config)
- client, _, err := hyClient.NewClient(&hyClient.Config{
- Auth: config.GetPassword(),
- TLSConfig: *tlsConfig,
- ServerAddr: serverAddr,
- ConnFactory: &connFactory{
- NewFunc: func(addr net.Addr) (net.PacketConn, error) {
- rawConn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
- IP: []byte{0, 0, 0, 0},
- Port: 0,
- }, streamSettings.SocketSettings)
- if err != nil {
- return nil, err
- }
- return rawConn.(*net.UDPConn), nil
- },
- },
- BandwidthConfig: hyClient.BandwidthConfig{MaxTx: config.Congestion.GetUpMbps() * MBps, MaxRx: config.GetCongestion().GetDownMbps() * MBps},
- })
- if err != nil {
- return nil, err
- }
- return client, nil
- }
- func CloseHyClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) error {
- ClientMutex.Lock()
- defer ClientMutex.Unlock()
- client, found := RunningClient[dialerConf{dest, streamSettings}]
- if found {
- delete(RunningClient, dialerConf{dest, streamSettings})
- return client.Close()
- }
- return nil
- }
- func GetHyClient(dest net.Destination, streamSettings *internet.MemoryStreamConfig) (hyClient.Client, error) {
- var err error
- var client hyClient.Client
- ClientMutex.Lock()
- client, found := RunningClient[dialerConf{dest, streamSettings}]
- ClientMutex.Unlock()
- if !found || !CheckHyClientHealthy(client) {
- if found {
- // retry
- CloseHyClient(dest, streamSettings)
- }
- client, err = NewHyClient(dest, streamSettings)
- if err != nil {
- return nil, err
- }
- ClientMutex.Lock()
- RunningClient[dialerConf{dest, streamSettings}] = client
- ClientMutex.Unlock()
- }
- return client, nil
- }
- func CheckHyClientHealthy(client hyClient.Client) bool {
- quicConn := client.GetQuicConn()
- if quicConn == nil {
- return false
- }
- select {
- case <-quicConn.Context().Done():
- return false
- default:
- }
- return true
- }
- func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) {
- config := streamSettings.ProtocolSettings.(*Config)
- client, err := GetHyClient(dest, streamSettings)
- if err != nil {
- CloseHyClient(dest, streamSettings)
- return nil, err
- }
- quicConn := client.GetQuicConn()
- conn := &HyConn{
- local: quicConn.LocalAddr(),
- remote: quicConn.RemoteAddr(),
- }
- outbound := session.OutboundFromContext(ctx)
- network := net.Network_TCP
- if outbound != nil {
- network = outbound.Target.Network
- }
- if network == net.Network_UDP && config.GetUseUdpExtension() { // only hysteria2 can use udpExtension
- conn.IsUDPExtension = true
- conn.IsServer = false
- conn.ClientUDPSession, err = client.UDP()
- if err != nil {
- CloseHyClient(dest, streamSettings)
- return nil, err
- }
- return conn, nil
- }
- conn.stream, err = client.OpenStream()
- if err != nil {
- CloseHyClient(dest, streamSettings)
- return nil, err
- }
- // write TCP frame type
- frameSize := int(quicvarint.Len(hyProtocol.FrameTypeTCPRequest))
- buf := make([]byte, frameSize)
- hyProtocol.VarintPut(buf, hyProtocol.FrameTypeTCPRequest)
- _, err = conn.stream.Write(buf)
- if err != nil {
- CloseHyClient(dest, streamSettings)
- return nil, err
- }
- return conn, nil
- }
- func init() {
- RunningClient = make(map[dialerConf]hyClient.Client)
- common.Must(internet.RegisterTransportDialer(protocolName, Dial))
- }
|