@@ -19,16 +19,26 @@ var (
)
func fetchInput(ctx context.Context, input io.Reader, reader PacketReader, conn *Connection) {
- payload := buf.New()
- defer payload.Release()
-
- for {
- err := payload.Reset(buf.ReadFrom(input))
- if err != nil {
- payload.Release()
- return
+ cache := make(chan *buf.Buffer, 1024)
+ go func() {
+ for {
+ payload := buf.New()
+ if err := payload.Reset(buf.ReadFrom(input)); err != nil {
+ payload.Release()
+ close(cache)
+ return
+ }
+ select {
+ case cache <- payload:
+ default:
}
+ }()
+
+ for payload := range cache {
segments := reader.Read(payload.Bytes())
if len(segments) > 0 {
conn.Input(segments)