reader.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package io
  2. import (
  3. "errors"
  4. "hash"
  5. "hash/fnv"
  6. "io"
  7. "github.com/v2ray/v2ray-core/common/alloc"
  8. "github.com/v2ray/v2ray-core/common/serial"
  9. "github.com/v2ray/v2ray-core/transport"
  10. )
  11. var (
  12. ErrorStreamCompleted = errors.New("Stream completed.")
  13. )
  14. // @Private
  15. type Validator struct {
  16. actualAuth hash.Hash32
  17. expectedAuth uint32
  18. }
  19. func NewValidator(expectedAuth uint32) *Validator {
  20. return &Validator{
  21. actualAuth: fnv.New32a(),
  22. expectedAuth: expectedAuth,
  23. }
  24. }
  25. func (this *Validator) Consume(b []byte) {
  26. this.actualAuth.Write(b)
  27. }
  28. func (this *Validator) Validate() bool {
  29. return this.actualAuth.Sum32() == this.expectedAuth
  30. }
  31. type AuthChunkReader struct {
  32. reader io.Reader
  33. last *alloc.Buffer
  34. chunkLength int
  35. validator *Validator
  36. }
  37. func NewAuthChunkReader(reader io.Reader) *AuthChunkReader {
  38. return &AuthChunkReader{
  39. reader: reader,
  40. chunkLength: -1,
  41. }
  42. }
  43. func (this *AuthChunkReader) Read() (*alloc.Buffer, error) {
  44. var buffer *alloc.Buffer
  45. if this.last != nil {
  46. buffer = this.last
  47. this.last = nil
  48. } else {
  49. buffer = alloc.NewBufferWithSize(4096).Clear()
  50. }
  51. if this.chunkLength == -1 {
  52. for buffer.Len() < 6 {
  53. _, err := buffer.FillFrom(this.reader)
  54. if err != nil {
  55. buffer.Release()
  56. return nil, err
  57. }
  58. }
  59. length := serial.BytesToUint16(buffer.Value[:2])
  60. this.chunkLength = int(length) - 4
  61. this.validator = NewValidator(serial.BytesToUint32(buffer.Value[2:6]))
  62. buffer.SliceFrom(6)
  63. } else if buffer.Len() < this.chunkLength {
  64. _, err := buffer.FillFrom(this.reader)
  65. if err != nil {
  66. buffer.Release()
  67. return nil, err
  68. }
  69. }
  70. if this.chunkLength == 0 {
  71. buffer.Release()
  72. return nil, ErrorStreamCompleted
  73. }
  74. if buffer.Len() < this.chunkLength {
  75. this.validator.Consume(buffer.Value)
  76. this.chunkLength -= buffer.Len()
  77. } else {
  78. this.validator.Consume(buffer.Value[:this.chunkLength])
  79. if !this.validator.Validate() {
  80. buffer.Release()
  81. return nil, transport.ErrorCorruptedPacket
  82. }
  83. leftLength := buffer.Len() - this.chunkLength
  84. if leftLength > 0 {
  85. this.last = alloc.NewBufferWithSize(leftLength + 4096).Clear()
  86. this.last.Append(buffer.Value[this.chunkLength:])
  87. buffer.Slice(0, this.chunkLength)
  88. }
  89. this.chunkLength = -1
  90. this.validator = nil
  91. }
  92. return buffer, nil
  93. }
  94. func (this *AuthChunkReader) Release() {
  95. this.reader = nil
  96. this.last.Release()
  97. this.last = nil
  98. this.validator = nil
  99. }