readv_reader.go 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. package buf
  2. import (
  3. "io"
  4. "syscall"
  5. "unsafe"
  6. )
  7. type ReadVReader struct {
  8. io.Reader
  9. rawConn syscall.RawConn
  10. nBuf int32
  11. }
  12. func NewReadVReader(reader io.Reader, rawConn syscall.RawConn) *ReadVReader {
  13. return &ReadVReader{
  14. Reader: reader,
  15. rawConn: rawConn,
  16. nBuf: 1,
  17. }
  18. }
  19. func allocN(n int32) []*Buffer {
  20. bs := make([]*Buffer, 0, n)
  21. for i := int32(0); i < n; i++ {
  22. bs = append(bs, New())
  23. }
  24. return bs
  25. }
  26. func (r *ReadVReader) ReadMultiBuffer() (MultiBuffer, error) {
  27. bs := allocN(r.nBuf)
  28. iovecs := make([]syscall.Iovec, r.nBuf)
  29. for idx, b := range bs {
  30. iovecs[idx] = syscall.Iovec{
  31. Base: &(b.v[0]),
  32. }
  33. iovecs[idx].SetLen(int(Size))
  34. }
  35. var nBytes int
  36. err := r.rawConn.Read(func(fd uintptr) bool {
  37. n, _, e := syscall.Syscall(syscall.SYS_READV, fd, uintptr(unsafe.Pointer(&iovecs[0])), uintptr(len(iovecs)))
  38. if e != 0 {
  39. return false
  40. }
  41. nBytes = int(n)
  42. return true
  43. })
  44. if err != nil {
  45. mb := MultiBuffer(bs)
  46. mb.Release()
  47. return nil, err
  48. }
  49. if nBytes == 0 {
  50. mb := MultiBuffer(bs)
  51. mb.Release()
  52. return nil, io.EOF
  53. }
  54. nBuf := 0
  55. for nBuf < len(bs) {
  56. if nBytes <= 0 {
  57. break
  58. }
  59. end := int32(nBytes)
  60. if end > Size {
  61. end = Size
  62. }
  63. bs[nBuf].end = end
  64. nBytes -= int(end)
  65. nBuf++
  66. }
  67. for i := nBuf; i < len(bs); i++ {
  68. bs[i].Release()
  69. bs[i] = nil
  70. }
  71. if int32(nBuf) == r.nBuf && nBuf < 128 {
  72. r.nBuf *= 4
  73. } else {
  74. r.nBuf = int32(nBuf)
  75. }
  76. return MultiBuffer(bs[:nBuf]), nil
  77. }