dialer.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package kcp
  2. import (
  3. "crypto/tls"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "v2ray.com/core/common/alloc"
  8. "v2ray.com/core/common/dice"
  9. "v2ray.com/core/common/log"
  10. v2net "v2ray.com/core/common/net"
  11. "v2ray.com/core/transport/internet"
  12. "v2ray.com/core/transport/internet/internal"
  13. v2tls "v2ray.com/core/transport/internet/tls"
  14. )
  15. var (
  16. globalConv = uint32(dice.Roll(65536))
  17. globalPool = internal.NewConnectionPool()
  18. )
  19. type ClientConnection struct {
  20. sync.Mutex
  21. net.Conn
  22. id internal.ConnectionId
  23. input func([]byte)
  24. auth internet.Authenticator
  25. }
  26. func (o *ClientConnection) Read([]byte) (int, error) {
  27. panic("KCP|ClientConnection: Read should not be called.")
  28. }
  29. func (o *ClientConnection) Id() internal.ConnectionId {
  30. return o.id
  31. }
  32. func (o *ClientConnection) Close() error {
  33. return o.Conn.Close()
  34. }
  35. func (o *ClientConnection) Reset(auth internet.Authenticator, inputCallback func([]byte)) {
  36. o.Lock()
  37. o.input = inputCallback
  38. o.auth = auth
  39. o.Unlock()
  40. }
  41. func (o *ClientConnection) Run() {
  42. payload := alloc.NewSmallBuffer()
  43. defer payload.Release()
  44. for {
  45. nBytes, err := o.Conn.Read(payload.Value)
  46. if err != nil {
  47. payload.Release()
  48. return
  49. }
  50. payload.Slice(0, nBytes)
  51. o.Lock()
  52. if o.input != nil && o.auth.Open(payload) {
  53. o.input(payload.Value)
  54. }
  55. o.Unlock()
  56. payload.Reset()
  57. }
  58. }
  59. func DialKCP(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
  60. dest.Network = v2net.Network_UDP
  61. log.Info("KCP|Dialer: Dialing KCP to ", dest)
  62. id := internal.NewConnectionId(src, dest)
  63. conn := globalPool.Get(id)
  64. if conn == nil {
  65. rawConn, err := internet.DialToDest(src, dest)
  66. if err != nil {
  67. log.Error("KCP|Dialer: Failed to dial to dest: ", err)
  68. return nil, err
  69. }
  70. c := &ClientConnection{
  71. Conn: rawConn,
  72. id: id,
  73. }
  74. go c.Run()
  75. conn = c
  76. }
  77. networkSettings, err := options.Stream.GetEffectiveNetworkSettings()
  78. if err != nil {
  79. log.Error("KCP|Dialer: Failed to get KCP settings: ", err)
  80. return nil, err
  81. }
  82. kcpSettings := networkSettings.(*Config)
  83. cpip, err := kcpSettings.GetAuthenticator()
  84. if err != nil {
  85. log.Error("KCP|Dialer: Failed to create authenticator: ", err)
  86. return nil, err
  87. }
  88. conv := uint16(atomic.AddUint32(&globalConv, 1))
  89. session := NewConnection(conv, conn.(*ClientConnection), globalPool, cpip, kcpSettings)
  90. var iConn internet.Connection
  91. iConn = session
  92. if options.Stream != nil && options.Stream.HasSecuritySettings() {
  93. securitySettings, err := options.Stream.GetEffectiveSecuritySettings()
  94. if err != nil {
  95. log.Error("KCP|Dialer: Failed to get security settings: ", err)
  96. return nil, err
  97. }
  98. switch securitySettings := securitySettings.(type) {
  99. case *v2tls.Config:
  100. config := securitySettings.GetTLSConfig()
  101. if dest.Address.Family().IsDomain() {
  102. config.ServerName = dest.Address.Domain()
  103. }
  104. tlsConn := tls.Client(conn, config)
  105. iConn = v2tls.NewConnection(tlsConn)
  106. }
  107. }
  108. return iConn, nil
  109. }
  110. func init() {
  111. internet.KCPDialer = DialKCP
  112. }