transport.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package net
  2. import (
  3. "io"
  4. "github.com/v2ray/v2ray-core/common/alloc"
  5. "github.com/v2ray/v2ray-core/common/crypto"
  6. "github.com/v2ray/v2ray-core/common/serial"
  7. "github.com/v2ray/v2ray-core/transport"
  8. )
  9. // ReadFrom reads from a reader and put all content to a buffer.
  10. // If buffer is nil, ReadFrom creates a new normal buffer.
  11. func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) {
  12. if buffer == nil {
  13. buffer = alloc.NewBuffer()
  14. }
  15. nBytes, err := reader.Read(buffer.Value)
  16. buffer.Slice(0, nBytes)
  17. return buffer, err
  18. }
  19. func ReadChunk(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) {
  20. if buffer == nil {
  21. buffer = alloc.NewBuffer()
  22. }
  23. if _, err := io.ReadFull(reader, buffer.Value[:2]); err != nil {
  24. alloc.Release(buffer)
  25. return nil, err
  26. }
  27. length := serial.BytesLiteral(buffer.Value[:2]).Uint16Value()
  28. if _, err := io.ReadFull(reader, buffer.Value[:length]); err != nil {
  29. alloc.Release(buffer)
  30. return nil, err
  31. }
  32. buffer.Slice(0, int(length))
  33. return buffer, nil
  34. }
  35. func ReadAuthenticatedChunk(reader io.Reader, auth crypto.Authenticator, buffer *alloc.Buffer) (*alloc.Buffer, error) {
  36. buffer, err := ReadChunk(reader, buffer)
  37. if err != nil {
  38. alloc.Release(buffer)
  39. return nil, err
  40. }
  41. authSize := auth.AuthBytes()
  42. authBytes := auth.Authenticate(nil, buffer.Value[authSize:])
  43. if !serial.BytesLiteral(authBytes).Equals(serial.BytesLiteral(buffer.Value[:authSize])) {
  44. alloc.Release(buffer)
  45. return nil, transport.CorruptedPacket
  46. }
  47. buffer.SliceFrom(authSize)
  48. return buffer, nil
  49. }
  50. // ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF.
  51. func ReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error {
  52. allocate := alloc.NewBuffer
  53. large := false
  54. for {
  55. buffer, err := ReadFrom(reader, allocate())
  56. if buffer.Len() > 0 {
  57. stream <- buffer
  58. } else {
  59. buffer.Release()
  60. }
  61. if err != nil {
  62. return err
  63. }
  64. if buffer.IsFull() && !large {
  65. allocate = alloc.NewLargeBuffer
  66. large = true
  67. } else if !buffer.IsFull() {
  68. allocate = alloc.NewBuffer
  69. large = false
  70. }
  71. }
  72. }
  73. // ChanToWriter dumps all content from a given chan to a writer until the chan is closed.
  74. func ChanToWriter(writer io.Writer, stream <-chan *alloc.Buffer) error {
  75. for buffer := range stream {
  76. nBytes, err := writer.Write(buffer.Value)
  77. if nBytes < buffer.Len() {
  78. _, err = writer.Write(buffer.Value[nBytes:])
  79. }
  80. buffer.Release()
  81. if err != nil {
  82. return err
  83. }
  84. }
  85. return nil
  86. }