output.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package kcp
  2. import (
  3. "io"
  4. "sync"
  5. "v2ray.com/core/common/alloc"
  6. v2io "v2ray.com/core/common/io"
  7. "v2ray.com/core/transport/internet"
  8. )
  9. type SegmentWriter interface {
  10. Write(seg Segment)
  11. }
  12. type BufferedSegmentWriter struct {
  13. sync.Mutex
  14. mtu uint32
  15. buffer *alloc.Buffer
  16. writer v2io.Writer
  17. }
  18. func NewSegmentWriter(writer *AuthenticationWriter) *BufferedSegmentWriter {
  19. return &BufferedSegmentWriter{
  20. mtu: writer.Mtu(),
  21. writer: writer,
  22. }
  23. }
  24. func (this *BufferedSegmentWriter) Write(seg Segment) {
  25. this.Lock()
  26. defer this.Unlock()
  27. nBytes := seg.ByteSize()
  28. if uint32(this.buffer.Len()+nBytes) > this.mtu {
  29. this.FlushWithoutLock()
  30. }
  31. if this.buffer == nil {
  32. this.buffer = alloc.NewLocalBuffer(2048).Clear()
  33. }
  34. this.buffer.Value = seg.Bytes(this.buffer.Value)
  35. }
  36. func (this *BufferedSegmentWriter) FlushWithoutLock() {
  37. go this.writer.Write(this.buffer)
  38. this.buffer = nil
  39. }
  40. func (this *BufferedSegmentWriter) Flush() {
  41. this.Lock()
  42. defer this.Unlock()
  43. if this.buffer.Len() == 0 {
  44. return
  45. }
  46. this.FlushWithoutLock()
  47. }
  48. type AuthenticationWriter struct {
  49. Authenticator internet.Authenticator
  50. Writer io.Writer
  51. Config *Config
  52. }
  53. func (this *AuthenticationWriter) Write(payload *alloc.Buffer) error {
  54. defer payload.Release()
  55. this.Authenticator.Seal(payload)
  56. _, err := this.Writer.Write(payload.Value)
  57. return err
  58. }
  59. func (this *AuthenticationWriter) Release() {}
  60. func (this *AuthenticationWriter) Mtu() uint32 {
  61. return this.Config.Mtu.GetValue() - uint32(this.Authenticator.Overhead())
  62. }