output.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package kcp
  2. import (
  3. "io"
  4. "sync"
  5. "github.com/v2ray/v2ray-core/common/alloc"
  6. v2io "github.com/v2ray/v2ray-core/common/io"
  7. )
  8. type SegmentWriter interface {
  9. Write(seg Segment)
  10. }
  11. type BufferedSegmentWriter struct {
  12. sync.Mutex
  13. mtu uint32
  14. buffer *alloc.Buffer
  15. writer v2io.Writer
  16. }
  17. func NewSegmentWriter(writer *AuthenticationWriter) *BufferedSegmentWriter {
  18. return &BufferedSegmentWriter{
  19. mtu: writer.Mtu(),
  20. writer: writer,
  21. }
  22. }
  23. func (this *BufferedSegmentWriter) Write(seg Segment) {
  24. this.Lock()
  25. defer this.Unlock()
  26. nBytes := seg.ByteSize()
  27. if uint32(this.buffer.Len()+nBytes) > this.mtu {
  28. this.FlushWithoutLock()
  29. }
  30. if this.buffer == nil {
  31. this.buffer = alloc.NewSmallBuffer().Clear()
  32. }
  33. this.buffer.Value = seg.Bytes(this.buffer.Value)
  34. }
  35. func (this *BufferedSegmentWriter) FlushWithoutLock() {
  36. go this.writer.Write(this.buffer)
  37. this.buffer = nil
  38. }
  39. func (this *BufferedSegmentWriter) Flush() {
  40. this.Lock()
  41. defer this.Unlock()
  42. if this.buffer.Len() == 0 {
  43. return
  44. }
  45. this.FlushWithoutLock()
  46. }
  47. type AuthenticationWriter struct {
  48. Authenticator Authenticator
  49. Writer io.Writer
  50. }
  51. func (this *AuthenticationWriter) Write(payload *alloc.Buffer) error {
  52. defer payload.Release()
  53. this.Authenticator.Seal(payload)
  54. _, err := this.Writer.Write(payload.Value)
  55. return err
  56. }
  57. func (this *AuthenticationWriter) Release() {}
  58. func (this *AuthenticationWriter) Mtu() uint32 {
  59. return effectiveConfig.Mtu - uint32(this.Authenticator.HeaderSize())
  60. }