|
|
@@ -10,12 +10,14 @@ import (
|
|
|
|
|
|
type SegmentWriter interface {
|
|
|
Write(seg Segment) error
|
|
|
+ Release()
|
|
|
}
|
|
|
|
|
|
type SimpleSegmentWriter struct {
|
|
|
sync.Mutex
|
|
|
buffer *buf.Buffer
|
|
|
writer io.Writer
|
|
|
+ closed bool
|
|
|
}
|
|
|
|
|
|
func NewSegmentWriter(writer io.Writer) SegmentWriter {
|
|
|
@@ -28,6 +30,9 @@ func NewSegmentWriter(writer io.Writer) SegmentWriter {
|
|
|
func (w *SimpleSegmentWriter) Write(seg Segment) error {
|
|
|
w.Lock()
|
|
|
defer w.Unlock()
|
|
|
+ if w.closed {
|
|
|
+ return io.ErrClosedPipe
|
|
|
+ }
|
|
|
|
|
|
w.buffer.Clear()
|
|
|
rawBytes := w.buffer.Extend(seg.ByteSize())
|
|
|
@@ -36,6 +41,13 @@ func (w *SimpleSegmentWriter) Write(seg Segment) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
+func (w *SimpleSegmentWriter) Release() {
|
|
|
+ w.Lock()
|
|
|
+ defer w.Unlock()
|
|
|
+ w.buffer.Release()
|
|
|
+ w.closed = true
|
|
|
+}
|
|
|
+
|
|
|
type RetryableWriter struct {
|
|
|
writer SegmentWriter
|
|
|
}
|
|
|
@@ -51,3 +63,7 @@ func (w *RetryableWriter) Write(seg Segment) error {
|
|
|
return w.writer.Write(seg)
|
|
|
})
|
|
|
}
|
|
|
+
|
|
|
+func (w *RetryableWriter) Release() {
|
|
|
+ w.writer.Release()
|
|
|
+}
|