dialer.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package quic
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. quic "github.com/lucas-clemente/quic-go"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/net"
  9. "v2ray.com/core/transport/internet"
  10. "v2ray.com/core/transport/internet/tls"
  11. )
  12. type clientSessions struct {
  13. access sync.Mutex
  14. sessions map[net.Destination][]quic.Session
  15. }
  16. func removeInactiveSessions(sessions []quic.Session) []quic.Session {
  17. lastActive := 0
  18. for _, s := range sessions {
  19. active := true
  20. select {
  21. case <-s.Context().Done():
  22. active = false
  23. default:
  24. }
  25. if active {
  26. sessions[lastActive] = s
  27. lastActive++
  28. }
  29. }
  30. if lastActive < len(sessions) {
  31. for i := lastActive; i < len(sessions); i++ {
  32. sessions[i] = nil
  33. }
  34. sessions = sessions[:lastActive]
  35. }
  36. return sessions
  37. }
  38. func openStream(sessions []quic.Session) (quic.Stream, net.Addr, error) {
  39. for _, s := range sessions {
  40. stream, err := s.OpenStream()
  41. if err != nil {
  42. newError("failed to create stream").Base(err).WriteToLog()
  43. continue
  44. }
  45. return stream, s.LocalAddr(), nil
  46. }
  47. return nil, nil, nil
  48. }
  49. func (s *clientSessions) openConnection(destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (internet.Connection, error) {
  50. s.access.Lock()
  51. defer s.access.Unlock()
  52. if s.sessions == nil {
  53. s.sessions = make(map[net.Destination][]quic.Session)
  54. }
  55. dest := net.DestinationFromAddr(destAddr)
  56. var sessions []quic.Session
  57. if s, found := s.sessions[dest]; found {
  58. sessions = s
  59. }
  60. sessions = removeInactiveSessions(sessions)
  61. s.sessions[dest] = sessions
  62. stream, local, err := openStream(sessions)
  63. if err != nil {
  64. return nil, err
  65. }
  66. if stream != nil {
  67. return &interConn{
  68. stream: stream,
  69. local: local,
  70. remote: destAddr,
  71. }, nil
  72. }
  73. rawConn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{
  74. IP: []byte{0, 0, 0, 0},
  75. Port: 0,
  76. }, sockopt)
  77. if err != nil {
  78. return nil, err
  79. }
  80. quicConfig := &quic.Config{
  81. ConnectionIDLength: 12,
  82. HandshakeTimeout: time.Second * 4,
  83. IdleTimeout: time.Second * 60,
  84. MaxReceiveStreamFlowControlWindow: 512 * 1024,
  85. MaxReceiveConnectionFlowControlWindow: 2 * 1024 * 1024,
  86. MaxIncomingUniStreams: -1,
  87. }
  88. conn, err := wrapSysConn(rawConn, config)
  89. if err != nil {
  90. rawConn.Close()
  91. return nil, err
  92. }
  93. session, err := quic.DialContext(context.Background(), conn, destAddr, "", tlsConfig.GetTLSConfig(tls.WithDestination(dest)), quicConfig)
  94. if err != nil {
  95. rawConn.Close()
  96. return nil, err
  97. }
  98. s.sessions[dest] = append(sessions, session)
  99. stream, err = session.OpenStream()
  100. if err != nil {
  101. return nil, err
  102. }
  103. return &interConn{
  104. stream: stream,
  105. local: session.LocalAddr(),
  106. remote: destAddr,
  107. }, nil
  108. }
  109. var client clientSessions
  110. func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (internet.Connection, error) {
  111. tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
  112. if tlsConfig == nil {
  113. tlsConfig = &tls.Config{
  114. ServerName: internalDomain,
  115. AllowInsecure: true,
  116. }
  117. }
  118. destAddr, err := net.ResolveUDPAddr("udp", dest.NetAddr())
  119. if err != nil {
  120. return nil, err
  121. }
  122. config := streamSettings.ProtocolSettings.(*Config)
  123. return client.openConnection(destAddr, config, tlsConfig, streamSettings.SocketSettings)
  124. }
  125. func init() {
  126. common.Must(internet.RegisterTransportDialer(protocolName, Dial))
  127. }