kcp_test.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package kcp_test
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "io"
  6. "testing"
  7. "time"
  8. "github.com/v2fly/v2ray-core/v5/common/environment/deferredpersistentstorage"
  9. "github.com/v2fly/v2ray-core/v5/common/environment/filesystemimpl"
  10. "github.com/v2fly/v2ray-core/v5/common/environment"
  11. "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
  12. "github.com/v2fly/v2ray-core/v5/common/environment/systemnetworkimpl"
  13. "github.com/v2fly/v2ray-core/v5/common/environment/transientstorageimpl"
  14. "github.com/google/go-cmp/cmp"
  15. "golang.org/x/sync/errgroup"
  16. "github.com/v2fly/v2ray-core/v5/common"
  17. "github.com/v2fly/v2ray-core/v5/common/errors"
  18. "github.com/v2fly/v2ray-core/v5/common/net"
  19. "github.com/v2fly/v2ray-core/v5/transport/internet"
  20. . "github.com/v2fly/v2ray-core/v5/transport/internet/kcp"
  21. )
  22. func TestDialAndListen(t *testing.T) {
  23. ctx := context.Background()
  24. defaultNetworkImpl := systemnetworkimpl.NewSystemNetworkDefault()
  25. defaultFilesystemImpl := filesystemimpl.NewDefaultFileSystemDefaultImpl()
  26. deferredPersistentStorageImpl := deferredpersistentstorage.NewDeferredPersistentStorage(ctx)
  27. rootEnv := environment.NewRootEnvImpl(ctx,
  28. transientstorageimpl.NewScopedTransientStorageImpl(), defaultNetworkImpl.Dialer(), defaultNetworkImpl.Listener(),
  29. defaultFilesystemImpl, deferredPersistentStorageImpl)
  30. proxyEnvironment := rootEnv.ProxyEnvironment("o")
  31. transportEnvironment, err := proxyEnvironment.NarrowScopeToTransport("kcp")
  32. if err != nil {
  33. t.Fatal(err)
  34. }
  35. ctx = envctx.ContextWithEnvironment(ctx, transportEnvironment)
  36. listener, err := NewListener(ctx, net.LocalHostIP, net.Port(0), &internet.MemoryStreamConfig{
  37. ProtocolName: "mkcp",
  38. ProtocolSettings: &Config{},
  39. }, func(conn internet.Connection) {
  40. go func(c internet.Connection) {
  41. payload := make([]byte, 4096)
  42. for {
  43. nBytes, err := c.Read(payload)
  44. if err != nil {
  45. break
  46. }
  47. for idx, b := range payload[:nBytes] {
  48. payload[idx] = b ^ 'c'
  49. }
  50. c.Write(payload[:nBytes])
  51. }
  52. c.Close()
  53. }(conn)
  54. })
  55. common.Must(err)
  56. defer listener.Close()
  57. port := net.Port(listener.Addr().(*net.UDPAddr).Port)
  58. var errg errgroup.Group
  59. for i := 0; i < 10; i++ {
  60. errg.Go(func() error {
  61. clientConn, err := DialKCP(ctx, net.UDPDestination(net.LocalHostIP, port), &internet.MemoryStreamConfig{
  62. ProtocolName: "mkcp",
  63. ProtocolSettings: &Config{},
  64. })
  65. if err != nil {
  66. return err
  67. }
  68. defer clientConn.Close()
  69. clientSend := make([]byte, 1024*1024)
  70. rand.Read(clientSend)
  71. go clientConn.Write(clientSend)
  72. clientReceived := make([]byte, 1024*1024)
  73. common.Must2(io.ReadFull(clientConn, clientReceived))
  74. clientExpected := make([]byte, 1024*1024)
  75. for idx, b := range clientSend {
  76. clientExpected[idx] = b ^ 'c'
  77. }
  78. if r := cmp.Diff(clientReceived, clientExpected); r != "" {
  79. return errors.New(r)
  80. }
  81. return nil
  82. })
  83. }
  84. if err := errg.Wait(); err != nil {
  85. t.Fatal(err)
  86. }
  87. for i := 0; i < 60 && listener.ActiveConnections() > 0; i++ {
  88. time.Sleep(500 * time.Millisecond)
  89. }
  90. if v := listener.ActiveConnections(); v != 0 {
  91. t.Error("active connections: ", v)
  92. }
  93. }