pipe_test.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package pipe_test
  2. import (
  3. "context"
  4. "io"
  5. "sync"
  6. "testing"
  7. "time"
  8. "github.com/google/go-cmp/cmp"
  9. "v2ray.com/core/common"
  10. "v2ray.com/core/common/buf"
  11. "v2ray.com/core/common/task"
  12. . "v2ray.com/core/transport/pipe"
  13. . "v2ray.com/ext/assert"
  14. )
  15. func TestPipeReadWrite(t *testing.T) {
  16. pReader, pWriter := New(WithSizeLimit(1024))
  17. b := buf.New()
  18. b.WriteString("abcd")
  19. common.Must(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}))
  20. b2 := buf.New()
  21. b2.WriteString("efg")
  22. common.Must(pWriter.WriteMultiBuffer(buf.MultiBuffer{b2}))
  23. rb, err := pReader.ReadMultiBuffer()
  24. common.Must(err)
  25. if r := cmp.Diff(rb.String(), "abcdefg"); r != "" {
  26. t.Error(r)
  27. }
  28. }
  29. func TestPipeInterrupt(t *testing.T) {
  30. pReader, pWriter := New(WithSizeLimit(1024))
  31. payload := []byte{'a', 'b', 'c', 'd'}
  32. b := buf.New()
  33. b.Write(payload)
  34. common.Must(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}))
  35. pWriter.Interrupt()
  36. rb, err := pReader.ReadMultiBuffer()
  37. if err != io.ErrClosedPipe {
  38. t.Fatal("expect io.ErrClosePipe, but got ", err)
  39. }
  40. if !rb.IsEmpty() {
  41. t.Fatal("expect empty buffer, but got ", rb.Len())
  42. }
  43. }
  44. func TestPipeClose(t *testing.T) {
  45. assert := With(t)
  46. pReader, pWriter := New(WithSizeLimit(1024))
  47. payload := []byte{'a', 'b', 'c', 'd'}
  48. b := buf.New()
  49. b.Write(payload)
  50. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
  51. assert(pWriter.Close(), IsNil)
  52. rb, err := pReader.ReadMultiBuffer()
  53. assert(err, IsNil)
  54. assert(rb.String(), Equals, b.String())
  55. rb, err = pReader.ReadMultiBuffer()
  56. assert(err, Equals, io.EOF)
  57. assert(rb.IsEmpty(), IsTrue)
  58. }
  59. func TestPipeLimitZero(t *testing.T) {
  60. assert := With(t)
  61. pReader, pWriter := New(WithSizeLimit(0))
  62. bb := buf.New()
  63. bb.Write([]byte{'a', 'b'})
  64. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{bb}), IsNil)
  65. err := task.Run(context.Background(), func() error {
  66. b := buf.New()
  67. b.Write([]byte{'c', 'd'})
  68. return pWriter.WriteMultiBuffer(buf.MultiBuffer{b})
  69. }, func() error {
  70. time.Sleep(time.Second)
  71. var container buf.MultiBufferContainer
  72. if err := buf.Copy(pReader, &container); err != nil {
  73. return err
  74. }
  75. assert(container.String(), Equals, "abcd")
  76. return nil
  77. }, func() error {
  78. time.Sleep(time.Second * 2)
  79. pWriter.Close()
  80. return nil
  81. })
  82. assert(err, IsNil)
  83. }
  84. func TestPipeWriteMultiThread(t *testing.T) {
  85. assert := With(t)
  86. pReader, pWriter := New(WithSizeLimit(0))
  87. var wg sync.WaitGroup
  88. for i := 0; i < 10; i++ {
  89. wg.Add(1)
  90. go func() {
  91. b := buf.New()
  92. b.WriteString("abcd")
  93. pWriter.WriteMultiBuffer(buf.MultiBuffer{b})
  94. wg.Done()
  95. }()
  96. }
  97. time.Sleep(time.Millisecond * 100)
  98. pWriter.Close()
  99. wg.Wait()
  100. b, err := pReader.ReadMultiBuffer()
  101. assert(err, IsNil)
  102. assert(b[0].Bytes(), Equals, []byte{'a', 'b', 'c', 'd'})
  103. }
  104. func TestInterfaces(t *testing.T) {
  105. assert := With(t)
  106. assert((*Reader)(nil), Implements, (*buf.Reader)(nil))
  107. assert((*Reader)(nil), Implements, (*buf.TimeoutReader)(nil))
  108. }
  109. func BenchmarkPipeReadWrite(b *testing.B) {
  110. reader, writer := New(WithoutSizeLimit())
  111. a := buf.New()
  112. a.Extend(buf.Size)
  113. c := buf.MultiBuffer{a}
  114. b.ResetTimer()
  115. for i := 0; i < b.N; i++ {
  116. common.Must(writer.WriteMultiBuffer(c))
  117. d, err := reader.ReadMultiBuffer()
  118. common.Must(err)
  119. c = d
  120. }
  121. }