dialer.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package quic
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. quic "github.com/lucas-clemente/quic-go"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/net"
  9. "v2ray.com/core/transport/internet"
  10. "v2ray.com/core/transport/internet/tls"
  11. )
  12. type clientSessions struct {
  13. access sync.Mutex
  14. sessions map[net.Destination][]quic.Session
  15. }
  16. func removeInactiveSessions(sessions []quic.Session) []quic.Session {
  17. lastActive := 0
  18. for _, s := range sessions {
  19. active := true
  20. select {
  21. case <-s.Context().Done():
  22. active = false
  23. default:
  24. }
  25. if active {
  26. sessions[lastActive] = s
  27. lastActive++
  28. }
  29. }
  30. if lastActive < len(sessions) {
  31. for i := lastActive; i < len(sessions); i++ {
  32. sessions[i] = nil
  33. }
  34. sessions = sessions[:lastActive]
  35. }
  36. return sessions
  37. }
  38. func openStream(sessions []quic.Session) (quic.Stream, net.Addr, error) {
  39. for _, s := range sessions {
  40. stream, err := s.OpenStream()
  41. if err != nil {
  42. newError("failed to create stream").Base(err).WriteToLog()
  43. continue
  44. }
  45. return stream, s.LocalAddr(), nil
  46. }
  47. return nil, nil, nil
  48. }
  49. func (s *clientSessions) openConnection(destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (internet.Connection, error) {
  50. s.access.Lock()
  51. defer s.access.Unlock()
  52. if s.sessions == nil {
  53. s.sessions = make(map[net.Destination][]quic.Session)
  54. }
  55. dest := net.DestinationFromAddr(destAddr)
  56. var sessions []quic.Session
  57. if s, found := s.sessions[dest]; found {
  58. sessions = s
  59. }
  60. sessions = removeInactiveSessions(sessions)
  61. s.sessions[dest] = sessions
  62. stream, local, err := openStream(sessions)
  63. if err != nil {
  64. return nil, err
  65. }
  66. if stream != nil {
  67. return &interConn{
  68. stream: stream,
  69. local: local,
  70. remote: destAddr,
  71. }, nil
  72. }
  73. rawConn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
  74. IP: []byte{0, 0, 0, 0},
  75. Port: 0,
  76. }, sockopt)
  77. if err != nil {
  78. return nil, err
  79. }
  80. quicConfig := &quic.Config{
  81. ConnectionIDLength: 12,
  82. KeepAlive: true,
  83. HandshakeTimeout: time.Second * 4,
  84. IdleTimeout: time.Second * 60,
  85. MaxReceiveStreamFlowControlWindow: 256 * 1024,
  86. MaxReceiveConnectionFlowControlWindow: 2 * 1024 * 1024,
  87. MaxIncomingUniStreams: -1,
  88. }
  89. conn, err := wrapSysConn(rawConn, config)
  90. if err != nil {
  91. rawConn.Close()
  92. return nil, err
  93. }
  94. session, err := quic.DialContext(context.Background(), conn, destAddr, "", tlsConfig.GetTLSConfig(tls.WithDestination(dest)), quicConfig)
  95. if err != nil {
  96. rawConn.Close()
  97. return nil, err
  98. }
  99. s.sessions[dest] = append(sessions, session)
  100. stream, err = session.OpenStream()
  101. if err != nil {
  102. return nil, err
  103. }
  104. return &interConn{
  105. stream: stream,
  106. local: session.LocalAddr(),
  107. remote: destAddr,
  108. }, nil
  109. }
  110. var client clientSessions
  111. func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) {
  112. tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
  113. if tlsConfig == nil {
  114. tlsConfig = &tls.Config{
  115. ServerName: internalDomain,
  116. AllowInsecure: true,
  117. }
  118. }
  119. destAddr, err := net.ResolveUDPAddr("udp", dest.NetAddr())
  120. if err != nil {
  121. return nil, err
  122. }
  123. config := streamSettings.ProtocolSettings.(*Config)
  124. return client.openConnection(destAddr, config, tlsConfig, streamSettings.SocketSettings)
  125. }
  126. func init() {
  127. common.Must(internet.RegisterTransportDialer(protocolName, Dial))
  128. }