dialer.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package kcp
  2. import (
  3. "crypto/tls"
  4. "net"
  5. "sync"
  6. "sync/atomic"
  7. "crypto/cipher"
  8. "v2ray.com/core/common"
  9. "v2ray.com/core/common/buf"
  10. "v2ray.com/core/common/dice"
  11. "v2ray.com/core/common/errors"
  12. "v2ray.com/core/common/log"
  13. v2net "v2ray.com/core/common/net"
  14. "v2ray.com/core/transport/internet"
  15. "v2ray.com/core/transport/internet/internal"
  16. v2tls "v2ray.com/core/transport/internet/tls"
  17. )
  18. var (
  19. globalConv = uint32(dice.Roll(65536))
  20. globalPool = internal.NewConnectionPool()
  21. )
  22. type ClientConnection struct {
  23. sync.RWMutex
  24. net.Conn
  25. id internal.ConnectionID
  26. input func([]Segment)
  27. reader PacketReader
  28. writer PacketWriter
  29. }
  30. func (o *ClientConnection) Overhead() int {
  31. o.RLock()
  32. defer o.RUnlock()
  33. if o.writer == nil {
  34. return 0
  35. }
  36. return o.writer.Overhead()
  37. }
  38. func (o *ClientConnection) Write(b []byte) (int, error) {
  39. o.RLock()
  40. defer o.RUnlock()
  41. if o.writer == nil {
  42. return len(b), nil
  43. }
  44. return o.writer.Write(b)
  45. }
  46. func (o *ClientConnection) Read([]byte) (int, error) {
  47. panic("KCP|ClientConnection: Read should not be called.")
  48. }
  49. func (o *ClientConnection) Id() internal.ConnectionID {
  50. return o.id
  51. }
  52. func (o *ClientConnection) Close() error {
  53. return o.Conn.Close()
  54. }
  55. func (o *ClientConnection) Reset(inputCallback func([]Segment)) {
  56. o.Lock()
  57. o.input = inputCallback
  58. o.Unlock()
  59. }
  60. func (o *ClientConnection) ResetSecurity(header internet.PacketHeader, security cipher.AEAD) {
  61. o.Lock()
  62. if o.reader == nil {
  63. o.reader = new(KCPPacketReader)
  64. }
  65. o.reader.(*KCPPacketReader).Header = header
  66. o.reader.(*KCPPacketReader).Security = security
  67. if o.writer == nil {
  68. o.writer = new(KCPPacketWriter)
  69. }
  70. o.writer.(*KCPPacketWriter).Header = header
  71. o.writer.(*KCPPacketWriter).Security = security
  72. o.writer.(*KCPPacketWriter).Writer = o.Conn
  73. o.Unlock()
  74. }
  75. func (o *ClientConnection) Run() {
  76. payload := buf.NewSmall()
  77. defer payload.Release()
  78. for {
  79. err := payload.Reset(buf.ReadFrom(o.Conn))
  80. if err != nil {
  81. payload.Release()
  82. return
  83. }
  84. o.RLock()
  85. if o.input != nil {
  86. segments := o.reader.Read(payload.Bytes())
  87. if len(segments) > 0 {
  88. o.input(segments)
  89. }
  90. }
  91. o.RUnlock()
  92. }
  93. }
  94. func DialKCP(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
  95. dest.Network = v2net.Network_UDP
  96. log.Info("KCP|Dialer: Dialing KCP to ", dest)
  97. id := internal.NewConnectionID(src, dest)
  98. conn := globalPool.Get(id)
  99. if conn == nil {
  100. rawConn, err := internet.DialSystem(src, dest)
  101. if err != nil {
  102. log.Error("KCP|Dialer: Failed to dial to dest: ", err)
  103. return nil, err
  104. }
  105. c := &ClientConnection{
  106. Conn: rawConn,
  107. id: id,
  108. }
  109. go c.Run()
  110. conn = c
  111. }
  112. networkSettings, err := options.Stream.GetEffectiveNetworkSettings()
  113. if err != nil {
  114. log.Error("KCP|Dialer: Failed to get KCP settings: ", err)
  115. return nil, err
  116. }
  117. kcpSettings := networkSettings.(*Config)
  118. clientConn := conn.(*ClientConnection)
  119. header, err := kcpSettings.GetPackerHeader()
  120. if err != nil {
  121. return nil, errors.Base(err).Message("KCP|Dialer: Failed to create packet header.")
  122. }
  123. security, err := kcpSettings.GetSecurity()
  124. if err != nil {
  125. return nil, errors.Base(err).Message("KCP|Dialer: Failed to create security.")
  126. }
  127. clientConn.ResetSecurity(header, security)
  128. conv := uint16(atomic.AddUint32(&globalConv, 1))
  129. session := NewConnection(conv, clientConn, globalPool, kcpSettings)
  130. var iConn internet.Connection
  131. iConn = session
  132. if options.Stream != nil && options.Stream.HasSecuritySettings() {
  133. securitySettings, err := options.Stream.GetEffectiveSecuritySettings()
  134. if err != nil {
  135. log.Error("KCP|Dialer: Failed to get security settings: ", err)
  136. return nil, err
  137. }
  138. switch securitySettings := securitySettings.(type) {
  139. case *v2tls.Config:
  140. config := securitySettings.GetTLSConfig()
  141. if dest.Address.Family().IsDomain() {
  142. config.ServerName = dest.Address.Domain()
  143. }
  144. tlsConn := tls.Client(iConn, config)
  145. iConn = UnreusableConnection{Conn: tlsConn}
  146. }
  147. }
  148. return iConn, nil
  149. }
  150. func init() {
  151. common.Must(internet.RegisterNetworkDialer(v2net.Network_KCP, DialKCP))
  152. }