kcp_test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package kcp_test
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "io"
  6. "testing"
  7. "time"
  8. "github.com/v2fly/v2ray-core/v5/common/environment"
  9. "github.com/v2fly/v2ray-core/v5/common/environment/envctx"
  10. "github.com/v2fly/v2ray-core/v5/common/environment/systemnetworkimpl"
  11. "github.com/v2fly/v2ray-core/v5/common/environment/transientstorageimpl"
  12. "github.com/google/go-cmp/cmp"
  13. "golang.org/x/sync/errgroup"
  14. "github.com/v2fly/v2ray-core/v5/common"
  15. "github.com/v2fly/v2ray-core/v5/common/errors"
  16. "github.com/v2fly/v2ray-core/v5/common/net"
  17. "github.com/v2fly/v2ray-core/v5/transport/internet"
  18. . "github.com/v2fly/v2ray-core/v5/transport/internet/kcp"
  19. )
  20. func TestDialAndListen(t *testing.T) {
  21. ctx := context.Background()
  22. defaultNetworkImpl := systemnetworkimpl.NewSystemNetworkDefault()
  23. rootEnv := environment.NewRootEnvImpl(ctx, transientstorageimpl.NewScopedTransientStorageImpl(), defaultNetworkImpl.Dialer(), defaultNetworkImpl.Listener())
  24. proxyEnvironment := rootEnv.ProxyEnvironment("o")
  25. transportEnvironment, err := proxyEnvironment.NarrowScopeToTransport("kcp")
  26. if err != nil {
  27. t.Fatal(err)
  28. }
  29. ctx = envctx.ContextWithEnvironment(ctx, transportEnvironment)
  30. listener, err := NewListener(ctx, net.LocalHostIP, net.Port(0), &internet.MemoryStreamConfig{
  31. ProtocolName: "mkcp",
  32. ProtocolSettings: &Config{},
  33. }, func(conn internet.Connection) {
  34. go func(c internet.Connection) {
  35. payload := make([]byte, 4096)
  36. for {
  37. nBytes, err := c.Read(payload)
  38. if err != nil {
  39. break
  40. }
  41. for idx, b := range payload[:nBytes] {
  42. payload[idx] = b ^ 'c'
  43. }
  44. c.Write(payload[:nBytes])
  45. }
  46. c.Close()
  47. }(conn)
  48. })
  49. common.Must(err)
  50. defer listener.Close()
  51. port := net.Port(listener.Addr().(*net.UDPAddr).Port)
  52. var errg errgroup.Group
  53. for i := 0; i < 10; i++ {
  54. errg.Go(func() error {
  55. clientConn, err := DialKCP(ctx, net.UDPDestination(net.LocalHostIP, port), &internet.MemoryStreamConfig{
  56. ProtocolName: "mkcp",
  57. ProtocolSettings: &Config{},
  58. })
  59. if err != nil {
  60. return err
  61. }
  62. defer clientConn.Close()
  63. clientSend := make([]byte, 1024*1024)
  64. rand.Read(clientSend)
  65. go clientConn.Write(clientSend)
  66. clientReceived := make([]byte, 1024*1024)
  67. common.Must2(io.ReadFull(clientConn, clientReceived))
  68. clientExpected := make([]byte, 1024*1024)
  69. for idx, b := range clientSend {
  70. clientExpected[idx] = b ^ 'c'
  71. }
  72. if r := cmp.Diff(clientReceived, clientExpected); r != "" {
  73. return errors.New(r)
  74. }
  75. return nil
  76. })
  77. }
  78. if err := errg.Wait(); err != nil {
  79. t.Fatal(err)
  80. }
  81. for i := 0; i < 60 && listener.ActiveConnections() > 0; i++ {
  82. time.Sleep(500 * time.Millisecond)
  83. }
  84. if v := listener.ActiveConnections(); v != 0 {
  85. t.Error("active connections: ", v)
  86. }
  87. }