reader.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package mux
  2. import (
  3. "io"
  4. "v2ray.com/core/common/buf"
  5. "v2ray.com/core/common/serial"
  6. )
  7. // ReadMetadata reads FrameMetadata from the given reader.
  8. func ReadMetadata(reader io.Reader) (*FrameMetadata, error) {
  9. metaLen, err := serial.ReadUint16(reader)
  10. if err != nil {
  11. return nil, err
  12. }
  13. if metaLen > 512 {
  14. return nil, newError("invalid metalen ", metaLen).AtWarning()
  15. }
  16. b := buf.New()
  17. defer b.Release()
  18. if err := b.Reset(buf.ReadFullFrom(reader, int(metaLen))); err != nil {
  19. return nil, err
  20. }
  21. return ReadFrameFrom(b.Bytes())
  22. }
  23. // PacketReader is an io.Reader that reads whole chunk of Mux frames every time.
  24. type PacketReader struct {
  25. reader io.Reader
  26. eof bool
  27. }
  28. // NewPacketReader creates a new PacketReader.
  29. func NewPacketReader(reader io.Reader) *PacketReader {
  30. return &PacketReader{
  31. reader: reader,
  32. eof: false,
  33. }
  34. }
  35. // Read implements buf.Reader.
  36. func (r *PacketReader) Read() (buf.MultiBuffer, error) {
  37. if r.eof {
  38. return nil, io.EOF
  39. }
  40. size, err := serial.ReadUint16(r.reader)
  41. if err != nil {
  42. return nil, err
  43. }
  44. var b *buf.Buffer
  45. if size <= buf.Size {
  46. b = buf.New()
  47. } else {
  48. b = buf.NewLocal(int(size))
  49. }
  50. if err := b.AppendSupplier(buf.ReadFullFrom(r.reader, int(size))); err != nil {
  51. b.Release()
  52. return nil, err
  53. }
  54. r.eof = true
  55. return buf.NewMultiBufferValue(b), nil
  56. }
  57. // StreamReader reads Mux frame as a stream.
  58. type StreamReader struct {
  59. reader io.Reader
  60. leftOver int
  61. }
  62. // NewStreamReader creates a new StreamReader.
  63. func NewStreamReader(reader io.Reader) *StreamReader {
  64. return &StreamReader{
  65. reader: reader,
  66. leftOver: -1,
  67. }
  68. }
  69. // Read implmenets buf.Reader.
  70. func (r *StreamReader) Read() (buf.MultiBuffer, error) {
  71. if r.leftOver == 0 {
  72. r.leftOver = -1
  73. return nil, io.EOF
  74. }
  75. if r.leftOver == -1 {
  76. size, err := serial.ReadUint16(r.reader)
  77. if err != nil {
  78. return nil, err
  79. }
  80. r.leftOver = int(size)
  81. }
  82. mb := buf.NewMultiBuffer()
  83. for r.leftOver > 0 {
  84. readLen := buf.Size
  85. if r.leftOver < readLen {
  86. readLen = r.leftOver
  87. }
  88. b := buf.New()
  89. if err := b.AppendSupplier(func(bb []byte) (int, error) {
  90. return r.reader.Read(bb[:readLen])
  91. }); err != nil {
  92. b.Release()
  93. mb.Release()
  94. return nil, err
  95. }
  96. r.leftOver -= b.Len()
  97. mb.Append(b)
  98. if b.Len() < readLen {
  99. break
  100. }
  101. }
  102. return mb, nil
  103. }