transport.go 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package net
  2. import (
  3. "io"
  4. )
  5. const (
  6. minBufferSizeKilo = 2
  7. maxBufferSizeKilo = 128
  8. )
  9. func ReadFrom(reader io.Reader, sizeInKilo int) ([]byte, error) {
  10. buffer := make([]byte, sizeInKilo<<10)
  11. nBytes, err := reader.Read(buffer)
  12. if nBytes == 0 {
  13. return nil, err
  14. }
  15. return buffer[:nBytes], err
  16. }
  17. func roundUp(size int) int {
  18. if size <= minBufferSizeKilo {
  19. return minBufferSizeKilo
  20. }
  21. if size >= maxBufferSizeKilo {
  22. return maxBufferSizeKilo
  23. }
  24. size--
  25. size |= size >> 1
  26. size |= size >> 2
  27. size |= size >> 4
  28. return size + 1
  29. }
  30. // ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF.
  31. func ReaderToChan(stream chan<- []byte, reader io.Reader) error {
  32. bufferSizeKilo := 2
  33. for {
  34. data, err := ReadFrom(reader, bufferSizeKilo)
  35. if len(data) > 0 {
  36. stream <- data
  37. }
  38. if err != nil {
  39. return err
  40. }
  41. if bufferSizeKilo == maxBufferSizeKilo {
  42. continue
  43. }
  44. dataLenKilo := len(data) >> 10
  45. if dataLenKilo == bufferSizeKilo {
  46. bufferSizeKilo <<= 1
  47. } else {
  48. bufferSizeKilo = roundUp(dataLenKilo)
  49. }
  50. }
  51. }
  52. // ChanToWriter dumps all content from a given chan to a writer until the chan is closed.
  53. func ChanToWriter(writer io.Writer, stream <-chan []byte) error {
  54. for buffer := range stream {
  55. _, err := writer.Write(buffer)
  56. if err != nil {
  57. return err
  58. }
  59. }
  60. return nil
  61. }