chunk.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package crypto
  2. import (
  3. "io"
  4. "v2ray.com/core/common/buf"
  5. "v2ray.com/core/common/serial"
  6. )
  7. type ChunkSizeDecoder interface {
  8. SizeBytes() int
  9. Decode([]byte) (uint16, error)
  10. }
  11. type ChunkSizeEncoder interface {
  12. SizeBytes() int
  13. Encode(uint16, []byte) []byte
  14. }
  15. type PlainChunkSizeParser struct{}
  16. func (PlainChunkSizeParser) SizeBytes() int {
  17. return 2
  18. }
  19. func (PlainChunkSizeParser) Encode(size uint16, b []byte) []byte {
  20. return serial.Uint16ToBytes(size, b)
  21. }
  22. func (PlainChunkSizeParser) Decode(b []byte) (uint16, error) {
  23. return serial.BytesToUint16(b), nil
  24. }
  25. type ChunkStreamReader struct {
  26. sizeDecoder ChunkSizeDecoder
  27. reader buf.Reader
  28. buffer []byte
  29. leftOver buf.MultiBuffer
  30. leftOverSize uint16
  31. }
  32. func NewChunkStreamReader(sizeDecoder ChunkSizeDecoder, reader io.Reader) *ChunkStreamReader {
  33. return &ChunkStreamReader{
  34. sizeDecoder: sizeDecoder,
  35. reader: buf.NewReader(reader),
  36. buffer: make([]byte, sizeDecoder.SizeBytes()),
  37. }
  38. }
  39. func (r *ChunkStreamReader) readAtLeast(size int) error {
  40. mb := r.leftOver
  41. for mb.Len() < size {
  42. extra, err := r.reader.Read()
  43. if err != nil {
  44. mb.Release()
  45. return err
  46. }
  47. mb.AppendMulti(extra)
  48. }
  49. r.leftOver = mb
  50. return nil
  51. }
  52. func (r *ChunkStreamReader) readSize() (uint16, error) {
  53. if r.sizeDecoder.SizeBytes() > r.leftOver.Len() {
  54. if err := r.readAtLeast(r.sizeDecoder.SizeBytes() - r.leftOver.Len()); err != nil {
  55. return 0, err
  56. }
  57. }
  58. r.leftOver.Read(r.buffer)
  59. return r.sizeDecoder.Decode(r.buffer)
  60. }
  61. func (r *ChunkStreamReader) Read() (buf.MultiBuffer, error) {
  62. size := int(r.leftOverSize)
  63. if size == 0 {
  64. nextSize, err := r.readSize()
  65. if err != nil {
  66. return nil, err
  67. }
  68. if nextSize == 0 {
  69. return nil, io.EOF
  70. }
  71. size = int(nextSize)
  72. }
  73. if r.leftOver.IsEmpty() {
  74. if err := r.readAtLeast(1); err != nil {
  75. return nil, err
  76. }
  77. }
  78. if size >= r.leftOver.Len() {
  79. mb := r.leftOver
  80. r.leftOverSize = uint16(size - r.leftOver.Len())
  81. r.leftOver = nil
  82. return mb, nil
  83. }
  84. mb := r.leftOver.SliceBySize(size)
  85. if mb.Len() != size {
  86. b := buf.New()
  87. b.AppendSupplier(buf.ReadFullFrom(&r.leftOver, size-mb.Len()))
  88. mb.Append(b)
  89. }
  90. r.leftOverSize = 0
  91. return mb, nil
  92. }
  93. type ChunkStreamWriter struct {
  94. sizeEncoder ChunkSizeEncoder
  95. writer buf.Writer
  96. }
  97. func NewChunkStreamWriter(sizeEncoder ChunkSizeEncoder, writer io.Writer) *ChunkStreamWriter {
  98. return &ChunkStreamWriter{
  99. sizeEncoder: sizeEncoder,
  100. writer: buf.NewWriter(writer),
  101. }
  102. }
  103. func (w *ChunkStreamWriter) Write(mb buf.MultiBuffer) error {
  104. mb2Write := buf.NewMultiBuffer()
  105. const sliceSize = 8192
  106. for {
  107. slice := mb.SliceBySize(sliceSize)
  108. b := buf.New()
  109. b.AppendSupplier(func(buffer []byte) (int, error) {
  110. w.sizeEncoder.Encode(uint16(slice.Len()), buffer[:0])
  111. return w.sizeEncoder.SizeBytes(), nil
  112. })
  113. mb2Write.Append(b)
  114. mb2Write.AppendMulti(slice)
  115. if mb.IsEmpty() {
  116. break
  117. }
  118. }
  119. return w.writer.Write(mb2Write)
  120. }