dialer.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package kcp
  2. import (
  3. "crypto/tls"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "crypto/cipher"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/dice"
  10. "v2ray.com/core/common/errors"
  11. "v2ray.com/core/common/log"
  12. v2net "v2ray.com/core/common/net"
  13. "v2ray.com/core/transport/internet"
  14. "v2ray.com/core/transport/internet/internal"
  15. v2tls "v2ray.com/core/transport/internet/tls"
  16. )
  17. var (
  18. globalConv = uint32(dice.Roll(65536))
  19. globalPool = internal.NewConnectionPool()
  20. )
  21. type ClientConnection struct {
  22. sync.RWMutex
  23. net.Conn
  24. id internal.ConnectionID
  25. input func([]Segment)
  26. reader PacketReader
  27. writer PacketWriter
  28. }
  29. func (o *ClientConnection) Overhead() int {
  30. o.RLock()
  31. defer o.RUnlock()
  32. if o.writer == nil {
  33. return 0
  34. }
  35. return o.writer.Overhead()
  36. }
  37. func (o *ClientConnection) Write(b []byte) (int, error) {
  38. o.RLock()
  39. defer o.RUnlock()
  40. if o.writer == nil {
  41. return len(b), nil
  42. }
  43. return o.writer.Write(b)
  44. }
  45. func (o *ClientConnection) Read([]byte) (int, error) {
  46. panic("KCP|ClientConnection: Read should not be called.")
  47. }
  48. func (o *ClientConnection) Id() internal.ConnectionID {
  49. return o.id
  50. }
  51. func (o *ClientConnection) Close() error {
  52. return o.Conn.Close()
  53. }
  54. func (o *ClientConnection) Reset(inputCallback func([]Segment)) {
  55. o.Lock()
  56. o.input = inputCallback
  57. o.Unlock()
  58. }
  59. func (o *ClientConnection) ResetSecurity(header internet.PacketHeader, security cipher.AEAD) {
  60. o.Lock()
  61. if o.reader == nil {
  62. o.reader = new(KCPPacketReader)
  63. }
  64. o.reader.(*KCPPacketReader).Header = header
  65. o.reader.(*KCPPacketReader).Security = security
  66. if o.writer == nil {
  67. o.writer = new(KCPPacketWriter)
  68. }
  69. o.writer.(*KCPPacketWriter).Header = header
  70. o.writer.(*KCPPacketWriter).Security = security
  71. o.writer.(*KCPPacketWriter).Writer = o.Conn
  72. o.Unlock()
  73. }
  74. func (o *ClientConnection) Run() {
  75. payload := buf.NewSmall()
  76. defer payload.Release()
  77. for {
  78. err := payload.Reset(buf.ReadFrom(o.Conn))
  79. if err != nil {
  80. payload.Release()
  81. return
  82. }
  83. o.RLock()
  84. if o.input != nil {
  85. segments := o.reader.Read(payload.Bytes())
  86. if len(segments) > 0 {
  87. o.input(segments)
  88. }
  89. }
  90. o.RUnlock()
  91. }
  92. }
  93. func DialKCP(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
  94. dest.Network = v2net.Network_UDP
  95. log.Info("KCP|Dialer: Dialing KCP to ", dest)
  96. id := internal.NewConnectionID(src, dest)
  97. conn := globalPool.Get(id)
  98. if conn == nil {
  99. rawConn, err := internet.DialToDest(src, dest)
  100. if err != nil {
  101. log.Error("KCP|Dialer: Failed to dial to dest: ", err)
  102. return nil, err
  103. }
  104. c := &ClientConnection{
  105. Conn: rawConn,
  106. id: id,
  107. }
  108. go c.Run()
  109. conn = c
  110. }
  111. networkSettings, err := options.Stream.GetEffectiveNetworkSettings()
  112. if err != nil {
  113. log.Error("KCP|Dialer: Failed to get KCP settings: ", err)
  114. return nil, err
  115. }
  116. kcpSettings := networkSettings.(*Config)
  117. clientConn := conn.(*ClientConnection)
  118. header, err := kcpSettings.GetPackerHeader()
  119. if err != nil {
  120. return nil, errors.Base(err).Message("KCP|Dialer: Failed to create packet header.")
  121. }
  122. security, err := kcpSettings.GetSecurity()
  123. if err != nil {
  124. return nil, errors.Base(err).Message("KCP|Dialer: Failed to create security.")
  125. }
  126. clientConn.ResetSecurity(header, security)
  127. conv := uint16(atomic.AddUint32(&globalConv, 1))
  128. session := NewConnection(conv, clientConn, globalPool, kcpSettings)
  129. var iConn internet.Connection
  130. iConn = session
  131. if options.Stream != nil && options.Stream.HasSecuritySettings() {
  132. securitySettings, err := options.Stream.GetEffectiveSecuritySettings()
  133. if err != nil {
  134. log.Error("KCP|Dialer: Failed to get security settings: ", err)
  135. return nil, err
  136. }
  137. switch securitySettings := securitySettings.(type) {
  138. case *v2tls.Config:
  139. config := securitySettings.GetTLSConfig()
  140. if dest.Address.Family().IsDomain() {
  141. config.ServerName = dest.Address.Domain()
  142. }
  143. tlsConn := tls.Client(iConn, config)
  144. iConn = v2tls.NewConnection(tlsConn)
  145. }
  146. }
  147. return iConn, nil
  148. }
  149. func init() {
  150. internet.KCPDialer = DialKCP
  151. }