pipe_test.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package pipe_test
  2. import (
  3. "io"
  4. "testing"
  5. "time"
  6. "v2ray.com/core/common/task"
  7. "v2ray.com/core/common/buf"
  8. . "v2ray.com/core/transport/pipe"
  9. . "v2ray.com/ext/assert"
  10. )
  11. func TestPipeReadWrite(t *testing.T) {
  12. assert := With(t)
  13. pReader, pWriter := New(WithSizeLimit(1024))
  14. payload := []byte{'a', 'b', 'c', 'd'}
  15. b := buf.New()
  16. b.Write(payload)
  17. assert(pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b)), IsNil)
  18. rb, err := pReader.ReadMultiBuffer()
  19. assert(err, IsNil)
  20. assert(rb.String(), Equals, b.String())
  21. }
  22. func TestPipeCloseError(t *testing.T) {
  23. assert := With(t)
  24. pReader, pWriter := New(WithSizeLimit(1024))
  25. payload := []byte{'a', 'b', 'c', 'd'}
  26. b := buf.New()
  27. b.Write(payload)
  28. assert(pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b)), IsNil)
  29. pWriter.CloseError()
  30. rb, err := pReader.ReadMultiBuffer()
  31. assert(err, Equals, io.ErrClosedPipe)
  32. assert(rb.IsEmpty(), IsTrue)
  33. }
  34. func TestPipeClose(t *testing.T) {
  35. assert := With(t)
  36. pReader, pWriter := New(WithSizeLimit(1024))
  37. payload := []byte{'a', 'b', 'c', 'd'}
  38. b := buf.New()
  39. b.Write(payload)
  40. assert(pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b)), IsNil)
  41. assert(pWriter.Close(), IsNil)
  42. rb, err := pReader.ReadMultiBuffer()
  43. assert(err, IsNil)
  44. assert(rb.String(), Equals, b.String())
  45. rb, err = pReader.ReadMultiBuffer()
  46. assert(err, Equals, io.EOF)
  47. assert(rb.IsEmpty(), IsTrue)
  48. }
  49. func TestPipeLimitZero(t *testing.T) {
  50. assert := With(t)
  51. pReader, pWriter := New(WithSizeLimit(0))
  52. bb := buf.New()
  53. bb.Write([]byte{'a', 'b'})
  54. assert(pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(bb)), IsNil)
  55. err := task.Run(task.Parallel(func() error {
  56. b := buf.New()
  57. b.Write([]byte{'c', 'd'})
  58. return pWriter.WriteMultiBuffer(buf.NewMultiBufferValue(b))
  59. }, func() error {
  60. time.Sleep(time.Second)
  61. rb, err := pReader.ReadMultiBuffer()
  62. if err != nil {
  63. return err
  64. }
  65. assert(rb.String(), Equals, "ab")
  66. rb, err = pReader.ReadMultiBuffer()
  67. if err != nil {
  68. return err
  69. }
  70. assert(rb.String(), Equals, "cd")
  71. return nil
  72. }))()
  73. assert(err, IsNil)
  74. }