writer.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package mux
  2. import (
  3. "runtime"
  4. "v2ray.com/core/common/buf"
  5. "v2ray.com/core/common/net"
  6. "v2ray.com/core/common/serial"
  7. )
  8. type Writer struct {
  9. id uint16
  10. dest net.Destination
  11. writer buf.Writer
  12. followup bool
  13. }
  14. func NewWriter(id uint16, dest net.Destination, writer buf.Writer) *Writer {
  15. return &Writer{
  16. id: id,
  17. dest: dest,
  18. writer: writer,
  19. followup: false,
  20. }
  21. }
  22. func NewResponseWriter(id uint16, writer buf.Writer) *Writer {
  23. return &Writer{
  24. id: id,
  25. writer: writer,
  26. followup: true,
  27. }
  28. }
  29. func (w *Writer) getNextFrameMeta() FrameMetadata {
  30. meta := FrameMetadata{
  31. SessionID: w.id,
  32. Target: w.dest,
  33. }
  34. if w.followup {
  35. meta.SessionStatus = SessionStatusKeep
  36. } else {
  37. w.followup = true
  38. meta.SessionStatus = SessionStatusNew
  39. }
  40. return meta
  41. }
  42. func (w *Writer) writeMetaOnly() error {
  43. meta := w.getNextFrameMeta()
  44. b := buf.New()
  45. if err := b.AppendSupplier(meta.AsSupplier()); err != nil {
  46. return err
  47. }
  48. runtime.KeepAlive(meta)
  49. return w.writer.Write(buf.NewMultiBufferValue(b))
  50. }
  51. func (w *Writer) writeData(mb buf.MultiBuffer) error {
  52. meta := w.getNextFrameMeta()
  53. meta.Option.Add(OptionData)
  54. frame := buf.New()
  55. if err := frame.AppendSupplier(meta.AsSupplier()); err != nil {
  56. return err
  57. }
  58. runtime.KeepAlive(meta)
  59. mb2 := buf.NewMultiBuffer()
  60. mb2.Append(frame)
  61. if err := frame.AppendSupplier(serial.WriteUint16(uint16(mb.Len()))); err != nil {
  62. return err
  63. }
  64. mb2.AppendMulti(mb)
  65. return w.writer.Write(mb2)
  66. }
  67. func (w *Writer) Write(mb buf.MultiBuffer) error {
  68. if mb.IsEmpty() {
  69. return w.writeMetaOnly()
  70. }
  71. const chunkSize = 8 * 1024
  72. for !mb.IsEmpty() {
  73. slice := mb.SliceBySize(chunkSize)
  74. if err := w.writeData(slice); err != nil {
  75. return err
  76. }
  77. }
  78. return nil
  79. }
  80. func (w *Writer) Close() {
  81. meta := FrameMetadata{
  82. SessionID: w.id,
  83. SessionStatus: SessionStatusEnd,
  84. }
  85. frame := buf.New()
  86. frame.AppendSupplier(meta.AsSupplier())
  87. runtime.KeepAlive(meta)
  88. w.writer.Write(buf.NewMultiBufferValue(frame))
  89. }