pipe_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package pipe_test
  2. import (
  3. "io"
  4. "sync"
  5. "testing"
  6. "time"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/task"
  10. . "v2ray.com/core/transport/pipe"
  11. . "v2ray.com/ext/assert"
  12. )
  13. func TestPipeReadWrite(t *testing.T) {
  14. assert := With(t)
  15. pReader, pWriter := New(WithSizeLimit(1024))
  16. b := buf.New()
  17. b.WriteString("abcd")
  18. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
  19. b2 := buf.New()
  20. b2.WriteString("efg")
  21. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b2}), IsNil)
  22. rb, err := pReader.ReadMultiBuffer()
  23. assert(err, IsNil)
  24. assert(rb.String(), Equals, "abcdefg")
  25. }
  26. func TestPipeCloseError(t *testing.T) {
  27. assert := With(t)
  28. pReader, pWriter := New(WithSizeLimit(1024))
  29. payload := []byte{'a', 'b', 'c', 'd'}
  30. b := buf.New()
  31. b.Write(payload)
  32. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
  33. pWriter.CloseError()
  34. rb, err := pReader.ReadMultiBuffer()
  35. assert(err, Equals, io.ErrClosedPipe)
  36. assert(rb.IsEmpty(), IsTrue)
  37. }
  38. func TestPipeClose(t *testing.T) {
  39. assert := With(t)
  40. pReader, pWriter := New(WithSizeLimit(1024))
  41. payload := []byte{'a', 'b', 'c', 'd'}
  42. b := buf.New()
  43. b.Write(payload)
  44. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
  45. assert(pWriter.Close(), IsNil)
  46. rb, err := pReader.ReadMultiBuffer()
  47. assert(err, IsNil)
  48. assert(rb.String(), Equals, b.String())
  49. rb, err = pReader.ReadMultiBuffer()
  50. assert(err, Equals, io.EOF)
  51. assert(rb.IsEmpty(), IsTrue)
  52. }
  53. func TestPipeLimitZero(t *testing.T) {
  54. assert := With(t)
  55. pReader, pWriter := New(WithSizeLimit(0))
  56. bb := buf.New()
  57. bb.Write([]byte{'a', 'b'})
  58. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{bb}), IsNil)
  59. err := task.Run(task.Parallel(func() error {
  60. b := buf.New()
  61. b.Write([]byte{'c', 'd'})
  62. return pWriter.WriteMultiBuffer(buf.MultiBuffer{b})
  63. }, func() error {
  64. time.Sleep(time.Second)
  65. var container buf.MultiBufferContainer
  66. if err := buf.Copy(pReader, &container); err != nil {
  67. return err
  68. }
  69. assert(container.String(), Equals, "abcd")
  70. return nil
  71. }, func() error {
  72. time.Sleep(time.Second * 2)
  73. pWriter.Close()
  74. return nil
  75. }))()
  76. assert(err, IsNil)
  77. }
  78. func TestPipeWriteMultiThread(t *testing.T) {
  79. assert := With(t)
  80. pReader, pWriter := New(WithSizeLimit(0))
  81. var wg sync.WaitGroup
  82. for i := 0; i < 10; i++ {
  83. wg.Add(1)
  84. go func() {
  85. b := buf.New()
  86. b.WriteString("abcd")
  87. pWriter.WriteMultiBuffer(buf.MultiBuffer{b})
  88. wg.Done()
  89. }()
  90. }
  91. time.Sleep(time.Millisecond * 100)
  92. pWriter.Close()
  93. wg.Wait()
  94. b, err := pReader.ReadMultiBuffer()
  95. assert(err, IsNil)
  96. assert(b[0].Bytes(), Equals, []byte{'a', 'b', 'c', 'd'})
  97. }
  98. func TestInterfaces(t *testing.T) {
  99. assert := With(t)
  100. assert((*Reader)(nil), Implements, (*buf.Reader)(nil))
  101. assert((*Reader)(nil), Implements, (*buf.TimeoutReader)(nil))
  102. }
  103. func BenchmarkPipeReadWrite(b *testing.B) {
  104. reader, writer := New(WithoutSizeLimit())
  105. a := buf.New()
  106. a.Extend(buf.Size)
  107. c := buf.MultiBuffer{a}
  108. b.ResetTimer()
  109. for i := 0; i < b.N; i++ {
  110. common.Must(writer.WriteMultiBuffer(c))
  111. d, err := reader.ReadMultiBuffer()
  112. common.Must(err)
  113. c = d
  114. }
  115. }