pipe_test.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package pipe_test
  2. import (
  3. "io"
  4. "sync"
  5. "testing"
  6. "time"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/task"
  9. . "v2ray.com/core/transport/pipe"
  10. . "v2ray.com/ext/assert"
  11. )
  12. func TestPipeReadWrite(t *testing.T) {
  13. assert := With(t)
  14. pReader, pWriter := New(WithSizeLimit(1024))
  15. payload := []byte{'a', 'b', 'c', 'd'}
  16. b := buf.New()
  17. b.Write(payload)
  18. assert(pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b)), IsNil)
  19. rb, err := pReader.ReadMultiBuffer()
  20. assert(err, IsNil)
  21. assert(rb.String(), Equals, b.String())
  22. }
  23. func TestPipeCloseError(t *testing.T) {
  24. assert := With(t)
  25. pReader, pWriter := New(WithSizeLimit(1024))
  26. payload := []byte{'a', 'b', 'c', 'd'}
  27. b := buf.New()
  28. b.Write(payload)
  29. assert(pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b)), IsNil)
  30. pWriter.CloseError()
  31. rb, err := pReader.ReadMultiBuffer()
  32. assert(err, Equals, io.ErrClosedPipe)
  33. assert(rb.IsEmpty(), IsTrue)
  34. }
  35. func TestPipeClose(t *testing.T) {
  36. assert := With(t)
  37. pReader, pWriter := New(WithSizeLimit(1024))
  38. payload := []byte{'a', 'b', 'c', 'd'}
  39. b := buf.New()
  40. b.Write(payload)
  41. assert(pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b)), IsNil)
  42. assert(pWriter.Close(), IsNil)
  43. rb, err := pReader.ReadMultiBuffer()
  44. assert(err, IsNil)
  45. assert(rb.String(), Equals, b.String())
  46. rb, err = pReader.ReadMultiBuffer()
  47. assert(err, Equals, io.EOF)
  48. assert(rb.IsEmpty(), IsTrue)
  49. }
  50. func TestPipeLimitZero(t *testing.T) {
  51. assert := With(t)
  52. pReader, pWriter := New(WithSizeLimit(0))
  53. bb := buf.New()
  54. bb.Write([]byte{'a', 'b'})
  55. assert(pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(bb)), IsNil)
  56. err := task.Run(task.Parallel(func() error {
  57. b := buf.New()
  58. b.Write([]byte{'c', 'd'})
  59. return pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b))
  60. }, func() error {
  61. time.Sleep(time.Second)
  62. rb, err := pReader.ReadMultiBuffer()
  63. if err != nil {
  64. return err
  65. }
  66. assert(rb.String(), Equals, "ab")
  67. rb, err = pReader.ReadMultiBuffer()
  68. if err != nil {
  69. return err
  70. }
  71. assert(rb.String(), Equals, "cd")
  72. return nil
  73. }))()
  74. assert(err, IsNil)
  75. }
  76. func TestPipeWriteMultiThread(t *testing.T) {
  77. assert := With(t)
  78. pReader, pWriter := New(WithSizeLimit(0))
  79. var wg sync.WaitGroup
  80. for i := 0; i < 10; i++ {
  81. wg.Add(1)
  82. go func() {
  83. b := buf.New()
  84. b.WriteBytes('a', 'b', 'c', 'd')
  85. pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b))
  86. wg.Done()
  87. }()
  88. }
  89. time.Sleep(time.Millisecond * 100)
  90. pWriter.Close()
  91. wg.Wait()
  92. b, err := pReader.ReadMultiBuffer()
  93. assert(err, IsNil)
  94. assert(b[0].Bytes(), Equals, []byte{'a', 'b', 'c', 'd'})
  95. }
  96. func TestInterfaces(t *testing.T) {
  97. assert := With(t)
  98. assert((*Reader)(nil), Implements, (*buf.Reader)(nil))
  99. assert((*Reader)(nil), Implements, (*buf.TimeoutReader)(nil))
  100. }