dialer.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. package kcp
  2. import (
  3. "context"
  4. "crypto/cipher"
  5. "crypto/tls"
  6. "net"
  7. "sync"
  8. "sync/atomic"
  9. "v2ray.com/core/common"
  10. "v2ray.com/core/common/buf"
  11. "v2ray.com/core/common/dice"
  12. "v2ray.com/core/common/errors"
  13. "v2ray.com/core/common/log"
  14. v2net "v2ray.com/core/common/net"
  15. "v2ray.com/core/transport/internet"
  16. "v2ray.com/core/transport/internet/internal"
  17. v2tls "v2ray.com/core/transport/internet/tls"
  18. )
  19. var (
  20. globalConv = uint32(dice.Roll(65536))
  21. globalPool = internal.NewConnectionPool()
  22. )
  23. type ClientConnection struct {
  24. sync.RWMutex
  25. net.Conn
  26. id internal.ConnectionID
  27. input func([]Segment)
  28. reader PacketReader
  29. writer PacketWriter
  30. }
  31. func (o *ClientConnection) Overhead() int {
  32. o.RLock()
  33. defer o.RUnlock()
  34. if o.writer == nil {
  35. return 0
  36. }
  37. return o.writer.Overhead()
  38. }
  39. func (o *ClientConnection) Write(b []byte) (int, error) {
  40. o.RLock()
  41. defer o.RUnlock()
  42. if o.writer == nil {
  43. return len(b), nil
  44. }
  45. return o.writer.Write(b)
  46. }
  47. func (o *ClientConnection) Read([]byte) (int, error) {
  48. panic("KCP|ClientConnection: Read should not be called.")
  49. }
  50. func (o *ClientConnection) Id() internal.ConnectionID {
  51. return o.id
  52. }
  53. func (o *ClientConnection) Close() error {
  54. return o.Conn.Close()
  55. }
  56. func (o *ClientConnection) Reset(inputCallback func([]Segment)) {
  57. o.Lock()
  58. o.input = inputCallback
  59. o.Unlock()
  60. }
  61. func (o *ClientConnection) ResetSecurity(header internet.PacketHeader, security cipher.AEAD) {
  62. o.Lock()
  63. if o.reader == nil {
  64. o.reader = new(KCPPacketReader)
  65. }
  66. o.reader.(*KCPPacketReader).Header = header
  67. o.reader.(*KCPPacketReader).Security = security
  68. if o.writer == nil {
  69. o.writer = new(KCPPacketWriter)
  70. }
  71. o.writer.(*KCPPacketWriter).Header = header
  72. o.writer.(*KCPPacketWriter).Security = security
  73. o.writer.(*KCPPacketWriter).Writer = o.Conn
  74. o.Unlock()
  75. }
  76. func (o *ClientConnection) Run() {
  77. payload := buf.NewSmall()
  78. defer payload.Release()
  79. for {
  80. err := payload.Reset(buf.ReadFrom(o.Conn))
  81. if err != nil {
  82. payload.Release()
  83. return
  84. }
  85. o.RLock()
  86. if o.input != nil {
  87. segments := o.reader.Read(payload.Bytes())
  88. if len(segments) > 0 {
  89. o.input(segments)
  90. }
  91. }
  92. o.RUnlock()
  93. }
  94. }
  95. func DialKCP(ctx context.Context, dest v2net.Destination) (internet.Connection, error) {
  96. dest.Network = v2net.Network_UDP
  97. log.Info("KCP|Dialer: Dialing KCP to ", dest)
  98. src := internet.DialerSourceFromContext(ctx)
  99. id := internal.NewConnectionID(src, dest)
  100. conn := globalPool.Get(id)
  101. if conn == nil {
  102. rawConn, err := internet.DialSystem(src, dest)
  103. if err != nil {
  104. log.Error("KCP|Dialer: Failed to dial to dest: ", err)
  105. return nil, err
  106. }
  107. c := &ClientConnection{
  108. Conn: rawConn,
  109. id: id,
  110. }
  111. go c.Run()
  112. conn = c
  113. }
  114. kcpSettings := internet.TransportSettingsFromContext(ctx).(*Config)
  115. clientConn := conn.(*ClientConnection)
  116. header, err := kcpSettings.GetPackerHeader()
  117. if err != nil {
  118. return nil, errors.Base(err).Message("KCP|Dialer: Failed to create packet header.")
  119. }
  120. security, err := kcpSettings.GetSecurity()
  121. if err != nil {
  122. return nil, errors.Base(err).Message("KCP|Dialer: Failed to create security.")
  123. }
  124. clientConn.ResetSecurity(header, security)
  125. conv := uint16(atomic.AddUint32(&globalConv, 1))
  126. session := NewConnection(conv, clientConn, globalPool, kcpSettings)
  127. var iConn internet.Connection
  128. iConn = session
  129. if securitySettings := internet.SecuritySettingsFromContext(ctx); securitySettings != nil {
  130. switch securitySettings := securitySettings.(type) {
  131. case *v2tls.Config:
  132. config := securitySettings.GetTLSConfig()
  133. if dest.Address.Family().IsDomain() {
  134. config.ServerName = dest.Address.Domain()
  135. }
  136. tlsConn := tls.Client(iConn, config)
  137. iConn = UnreusableConnection{Conn: tlsConn}
  138. }
  139. }
  140. return iConn, nil
  141. }
  142. func init() {
  143. common.Must(internet.RegisterTransportDialer(internet.TransportProtocol_MKCP, DialKCP))
  144. }