pipe_test.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. package pipe_test
  2. import (
  3. "errors"
  4. "io"
  5. "testing"
  6. "time"
  7. "github.com/google/go-cmp/cmp"
  8. "golang.org/x/sync/errgroup"
  9. "github.com/v2fly/v2ray-core/v5/common"
  10. "github.com/v2fly/v2ray-core/v5/common/buf"
  11. . "github.com/v2fly/v2ray-core/v5/transport/pipe"
  12. )
  13. func TestPipeReadWrite(t *testing.T) {
  14. pReader, pWriter := New(WithSizeLimit(1024))
  15. b := buf.New()
  16. b.WriteString("abcd")
  17. common.Must(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}))
  18. b2 := buf.New()
  19. b2.WriteString("efg")
  20. common.Must(pWriter.WriteMultiBuffer(buf.MultiBuffer{b2}))
  21. rb, err := pReader.ReadMultiBuffer()
  22. common.Must(err)
  23. if r := cmp.Diff(rb.String(), "abcdefg"); r != "" {
  24. t.Error(r)
  25. }
  26. }
  27. func TestPipeInterrupt(t *testing.T) {
  28. pReader, pWriter := New(WithSizeLimit(1024))
  29. payload := []byte{'a', 'b', 'c', 'd'}
  30. b := buf.New()
  31. b.Write(payload)
  32. common.Must(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}))
  33. pWriter.Interrupt()
  34. rb, err := pReader.ReadMultiBuffer()
  35. if err != io.ErrClosedPipe {
  36. t.Fatal("expect io.ErrClosePipe, but got ", err)
  37. }
  38. if !rb.IsEmpty() {
  39. t.Fatal("expect empty buffer, but got ", rb.Len())
  40. }
  41. }
  42. func TestPipeClose(t *testing.T) {
  43. pReader, pWriter := New(WithSizeLimit(1024))
  44. payload := []byte{'a', 'b', 'c', 'd'}
  45. b := buf.New()
  46. common.Must2(b.Write(payload))
  47. common.Must(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}))
  48. common.Must(pWriter.Close())
  49. rb, err := pReader.ReadMultiBuffer()
  50. common.Must(err)
  51. if rb.String() != string(payload) {
  52. t.Fatal("expect content ", string(payload), " but actually ", rb.String())
  53. }
  54. rb, err = pReader.ReadMultiBuffer()
  55. if err != io.EOF {
  56. t.Fatal("expected EOF, but got ", err)
  57. }
  58. if !rb.IsEmpty() {
  59. t.Fatal("expect empty buffer, but got ", rb.String())
  60. }
  61. }
  62. func TestPipeLimitZero(t *testing.T) {
  63. pReader, pWriter := New(WithSizeLimit(0))
  64. bb := buf.New()
  65. common.Must2(bb.Write([]byte{'a', 'b'}))
  66. common.Must(pWriter.WriteMultiBuffer(buf.MultiBuffer{bb}))
  67. var errg errgroup.Group
  68. errg.Go(func() error {
  69. b := buf.New()
  70. b.Write([]byte{'c', 'd'})
  71. return pWriter.WriteMultiBuffer(buf.MultiBuffer{b})
  72. })
  73. errg.Go(func() error {
  74. time.Sleep(time.Second)
  75. var container buf.MultiBufferContainer
  76. if err := buf.Copy(pReader, &container); err != nil {
  77. return err
  78. }
  79. if r := cmp.Diff(container.String(), "abcd"); r != "" {
  80. return errors.New(r)
  81. }
  82. return nil
  83. })
  84. errg.Go(func() error {
  85. time.Sleep(time.Second * 2)
  86. return pWriter.Close()
  87. })
  88. if err := errg.Wait(); err != nil {
  89. t.Error(err)
  90. }
  91. }
  92. func TestPipeWriteMultiThread(t *testing.T) {
  93. pReader, pWriter := New(WithSizeLimit(0))
  94. var errg errgroup.Group
  95. for i := 0; i < 10; i++ {
  96. errg.Go(func() error {
  97. b := buf.New()
  98. b.WriteString("abcd")
  99. return pWriter.WriteMultiBuffer(buf.MultiBuffer{b})
  100. })
  101. }
  102. time.Sleep(time.Millisecond * 100)
  103. pWriter.Close()
  104. errg.Wait()
  105. b, err := pReader.ReadMultiBuffer()
  106. common.Must(err)
  107. if r := cmp.Diff(b[0].Bytes(), []byte{'a', 'b', 'c', 'd'}); r != "" {
  108. t.Error(r)
  109. }
  110. }
  111. func TestInterfaces(t *testing.T) {
  112. _ = (buf.Reader)(new(Reader))
  113. _ = (buf.TimeoutReader)(new(Reader))
  114. _ = (common.Interruptible)(new(Reader))
  115. _ = (common.Interruptible)(new(Writer))
  116. _ = (common.Closable)(new(Writer))
  117. }
  118. func BenchmarkPipeReadWrite(b *testing.B) {
  119. reader, writer := New(WithoutSizeLimit())
  120. a := buf.New()
  121. a.Extend(buf.Size)
  122. c := buf.MultiBuffer{a}
  123. b.ResetTimer()
  124. for i := 0; i < b.N; i++ {
  125. common.Must(writer.WriteMultiBuffer(c))
  126. d, err := reader.ReadMultiBuffer()
  127. common.Must(err)
  128. c = d
  129. }
  130. }