pipe_test.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package pipe_test
  2. import (
  3. "io"
  4. "sync"
  5. "testing"
  6. "time"
  7. "v2ray.com/core/common"
  8. "v2ray.com/core/common/buf"
  9. "v2ray.com/core/common/task"
  10. . "v2ray.com/core/transport/pipe"
  11. . "v2ray.com/ext/assert"
  12. )
  13. func TestPipeReadWrite(t *testing.T) {
  14. assert := With(t)
  15. pReader, pWriter := New(WithSizeLimit(1024))
  16. payload := []byte{'a', 'b', 'c', 'd'}
  17. b := buf.New()
  18. b.Write(payload)
  19. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
  20. rb, err := pReader.ReadMultiBuffer()
  21. assert(err, IsNil)
  22. assert(rb.String(), Equals, b.String())
  23. }
  24. func TestPipeCloseError(t *testing.T) {
  25. assert := With(t)
  26. pReader, pWriter := New(WithSizeLimit(1024))
  27. payload := []byte{'a', 'b', 'c', 'd'}
  28. b := buf.New()
  29. b.Write(payload)
  30. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
  31. pWriter.CloseError()
  32. rb, err := pReader.ReadMultiBuffer()
  33. assert(err, Equals, io.ErrClosedPipe)
  34. assert(rb.IsEmpty(), IsTrue)
  35. }
  36. func TestPipeClose(t *testing.T) {
  37. assert := With(t)
  38. pReader, pWriter := New(WithSizeLimit(1024))
  39. payload := []byte{'a', 'b', 'c', 'd'}
  40. b := buf.New()
  41. b.Write(payload)
  42. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{b}), IsNil)
  43. assert(pWriter.Close(), IsNil)
  44. rb, err := pReader.ReadMultiBuffer()
  45. assert(err, IsNil)
  46. assert(rb.String(), Equals, b.String())
  47. rb, err = pReader.ReadMultiBuffer()
  48. assert(err, Equals, io.EOF)
  49. assert(rb.IsEmpty(), IsTrue)
  50. }
  51. func TestPipeLimitZero(t *testing.T) {
  52. assert := With(t)
  53. pReader, pWriter := New(WithSizeLimit(0))
  54. bb := buf.New()
  55. bb.Write([]byte{'a', 'b'})
  56. assert(pWriter.WriteMultiBuffer(buf.MultiBuffer{bb}), IsNil)
  57. err := task.Run(task.Parallel(func() error {
  58. b := buf.New()
  59. b.Write([]byte{'c', 'd'})
  60. return pWriter.WriteMultiBuffer(buf.MultiBuffer{b})
  61. }, func() error {
  62. time.Sleep(time.Second)
  63. var container buf.MultiBufferContainer
  64. if err := buf.Copy(pReader, &container); err != nil {
  65. return err
  66. }
  67. assert(container.String(), Equals, "abcd")
  68. return nil
  69. }, func() error {
  70. time.Sleep(time.Second * 2)
  71. pWriter.Close()
  72. return nil
  73. }))()
  74. assert(err, IsNil)
  75. }
  76. func TestPipeWriteMultiThread(t *testing.T) {
  77. assert := With(t)
  78. pReader, pWriter := New(WithSizeLimit(0))
  79. var wg sync.WaitGroup
  80. for i := 0; i < 10; i++ {
  81. wg.Add(1)
  82. go func() {
  83. b := buf.New()
  84. b.WriteString("abcd")
  85. pWriter.WriteMultiBuffer(buf.MultiBuffer{b})
  86. wg.Done()
  87. }()
  88. }
  89. time.Sleep(time.Millisecond * 100)
  90. pWriter.Close()
  91. wg.Wait()
  92. b, err := pReader.ReadMultiBuffer()
  93. assert(err, IsNil)
  94. assert(b[0].Bytes(), Equals, []byte{'a', 'b', 'c', 'd'})
  95. }
  96. func TestInterfaces(t *testing.T) {
  97. assert := With(t)
  98. assert((*Reader)(nil), Implements, (*buf.Reader)(nil))
  99. assert((*Reader)(nil), Implements, (*buf.TimeoutReader)(nil))
  100. }
  101. func BenchmarkPipeReadWrite(b *testing.B) {
  102. reader, writer := New(WithoutSizeLimit())
  103. a := buf.New()
  104. a.Extend(buf.Size)
  105. c := buf.MultiBuffer{a}
  106. b.ResetTimer()
  107. for i := 0; i < b.N; i++ {
  108. common.Must(writer.WriteMultiBuffer(c))
  109. d, err := reader.ReadMultiBuffer()
  110. common.Must(err)
  111. c = d
  112. }
  113. }