writer.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package mux
  2. import (
  3. "v2ray.com/core/common"
  4. "v2ray.com/core/common/buf"
  5. "v2ray.com/core/common/net"
  6. "v2ray.com/core/common/protocol"
  7. "v2ray.com/core/common/serial"
  8. )
  9. type Writer struct {
  10. dest net.Destination
  11. writer buf.Writer
  12. id uint16
  13. followup bool
  14. hasError bool
  15. transferType protocol.TransferType
  16. }
  17. func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType) *Writer {
  18. return &Writer{
  19. id: id,
  20. dest: dest,
  21. writer: writer,
  22. followup: false,
  23. transferType: transferType,
  24. }
  25. }
  26. func NewResponseWriter(id uint16, writer buf.Writer, transferType protocol.TransferType) *Writer {
  27. return &Writer{
  28. id: id,
  29. writer: writer,
  30. followup: true,
  31. transferType: transferType,
  32. }
  33. }
  34. func (w *Writer) getNextFrameMeta() FrameMetadata {
  35. meta := FrameMetadata{
  36. SessionID: w.id,
  37. Target: w.dest,
  38. }
  39. if w.followup {
  40. meta.SessionStatus = SessionStatusKeep
  41. } else {
  42. w.followup = true
  43. meta.SessionStatus = SessionStatusNew
  44. }
  45. return meta
  46. }
  47. func (w *Writer) writeMetaOnly() error {
  48. meta := w.getNextFrameMeta()
  49. b := buf.New()
  50. if err := meta.WriteTo(b); err != nil {
  51. return err
  52. }
  53. return w.writer.WriteMultiBuffer(buf.NewMultiBufferValue(b))
  54. }
  55. func writeMetaWithFrame(writer buf.Writer, meta FrameMetadata, data buf.MultiBuffer) error {
  56. frame := buf.New()
  57. if err := meta.WriteTo(frame); err != nil {
  58. return err
  59. }
  60. if err := frame.AppendSupplier(serial.WriteUint16(uint16(data.Len()))); err != nil {
  61. return err
  62. }
  63. mb2 := buf.NewMultiBufferCap(int32(len(data)) + 1)
  64. mb2.Append(frame)
  65. mb2.AppendMulti(data)
  66. return writer.WriteMultiBuffer(mb2)
  67. }
  68. func (w *Writer) writeData(mb buf.MultiBuffer) error {
  69. meta := w.getNextFrameMeta()
  70. meta.Option.Set(OptionData)
  71. return writeMetaWithFrame(w.writer, meta, mb)
  72. }
  73. // WriteMultiBuffer implements buf.Writer.
  74. func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
  75. defer mb.Release()
  76. if mb.IsEmpty() {
  77. return w.writeMetaOnly()
  78. }
  79. for !mb.IsEmpty() {
  80. var chunk buf.MultiBuffer
  81. if w.transferType == protocol.TransferTypeStream {
  82. chunk = mb.SliceBySize(8 * 1024)
  83. } else {
  84. chunk = buf.NewMultiBufferValue(mb.SplitFirst())
  85. }
  86. if err := w.writeData(chunk); err != nil {
  87. return err
  88. }
  89. }
  90. return nil
  91. }
  92. // Close implements common.Closable.
  93. func (w *Writer) Close() error {
  94. meta := FrameMetadata{
  95. SessionID: w.id,
  96. SessionStatus: SessionStatusEnd,
  97. }
  98. if w.hasError {
  99. meta.Option.Set(OptionError)
  100. }
  101. frame := buf.New()
  102. common.Must(meta.WriteTo(frame))
  103. w.writer.WriteMultiBuffer(buf.NewMultiBufferValue(frame)) // nolint: errcheck
  104. return nil
  105. }