io.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package buf
  2. import (
  3. "io"
  4. "net"
  5. "os"
  6. "syscall"
  7. "time"
  8. )
  9. // Reader extends io.Reader with MultiBuffer.
  10. type Reader interface {
  11. // ReadMultiBuffer reads content from underlying reader, and put it into a MultiBuffer.
  12. ReadMultiBuffer() (MultiBuffer, error)
  13. }
  14. // ErrReadTimeout is an error that happens with IO timeout.
  15. var ErrReadTimeout = newError("IO timeout")
  16. // TimeoutReader is a reader that returns error if Read() operation takes longer than the given timeout.
  17. type TimeoutReader interface {
  18. ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
  19. }
  20. // Writer extends io.Writer with MultiBuffer.
  21. type Writer interface {
  22. // WriteMultiBuffer writes a MultiBuffer into underlying writer.
  23. // Caller relinquish the ownership of MultiBuffer after calling this method.
  24. WriteMultiBuffer(MultiBuffer) error
  25. }
  26. // WriteAllBytes ensures all bytes are written into the given writer.
  27. func WriteAllBytes(writer io.Writer, payload []byte) error {
  28. for len(payload) > 0 {
  29. n, err := writer.Write(payload)
  30. if err != nil {
  31. return err
  32. }
  33. payload = payload[n:]
  34. }
  35. return nil
  36. }
  37. func isPacketReader(reader io.Reader) bool {
  38. _, ok := reader.(net.PacketConn)
  39. return ok
  40. }
  41. // NewReader creates a new Reader.
  42. // The Reader instance doesn't take the ownership of reader.
  43. func NewReader(reader io.Reader) Reader {
  44. if mr, ok := reader.(Reader); ok {
  45. return mr
  46. }
  47. if isPacketReader(reader) {
  48. return &PacketReader{
  49. Reader: reader,
  50. }
  51. }
  52. _, isFile := reader.(*os.File)
  53. if !isFile && useReadv {
  54. if sc, ok := reader.(syscall.Conn); ok {
  55. rawConn, err := sc.SyscallConn()
  56. if err != nil {
  57. newError("failed to get sysconn").Base(err).WriteToLog()
  58. } else {
  59. return NewReadVReader(reader, rawConn)
  60. }
  61. }
  62. }
  63. return &SingleReader{
  64. Reader: reader,
  65. }
  66. }
  67. // NewPacketReader creates a new PacketReader based on the given reader.
  68. func NewPacketReader(reader io.Reader) Reader {
  69. if mr, ok := reader.(Reader); ok {
  70. return mr
  71. }
  72. return &PacketReader{
  73. Reader: reader,
  74. }
  75. }
  76. func isPacketWriter(writer io.Writer) bool {
  77. if _, ok := writer.(net.PacketConn); ok {
  78. return true
  79. }
  80. // If the writer doesn't implement syscall.Conn, it is probably not a TCP connection.
  81. if _, ok := writer.(syscall.Conn); !ok {
  82. return true
  83. }
  84. return false
  85. }
  86. // NewWriter creates a new Writer.
  87. func NewWriter(writer io.Writer) Writer {
  88. if mw, ok := writer.(Writer); ok {
  89. return mw
  90. }
  91. if isPacketWriter(writer) {
  92. return &SequentialWriter{
  93. Writer: writer,
  94. }
  95. }
  96. return &BufferToBytesWriter{
  97. Writer: writer,
  98. }
  99. }