writer.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package mux
  2. import (
  3. "v2ray.com/core/common"
  4. "v2ray.com/core/common/buf"
  5. "v2ray.com/core/common/net"
  6. "v2ray.com/core/common/protocol"
  7. "v2ray.com/core/common/serial"
  8. )
  9. type Writer struct {
  10. dest net.Destination
  11. writer buf.Writer
  12. id uint16
  13. followup bool
  14. transferType protocol.TransferType
  15. }
  16. func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType) *Writer {
  17. return &Writer{
  18. id: id,
  19. dest: dest,
  20. writer: writer,
  21. followup: false,
  22. transferType: transferType,
  23. }
  24. }
  25. func NewResponseWriter(id uint16, writer buf.Writer, transferType protocol.TransferType) *Writer {
  26. return &Writer{
  27. id: id,
  28. writer: writer,
  29. followup: true,
  30. transferType: transferType,
  31. }
  32. }
  33. func (w *Writer) getNextFrameMeta() FrameMetadata {
  34. meta := FrameMetadata{
  35. SessionID: w.id,
  36. Target: w.dest,
  37. }
  38. if w.followup {
  39. meta.SessionStatus = SessionStatusKeep
  40. } else {
  41. w.followup = true
  42. meta.SessionStatus = SessionStatusNew
  43. }
  44. return meta
  45. }
  46. func (w *Writer) writeMetaOnly() error {
  47. meta := w.getNextFrameMeta()
  48. b := buf.New()
  49. if err := b.Reset(meta.AsSupplier()); err != nil {
  50. return err
  51. }
  52. return w.writer.Write(buf.NewMultiBufferValue(b))
  53. }
  54. func (w *Writer) writeData(mb buf.MultiBuffer) error {
  55. meta := w.getNextFrameMeta()
  56. meta.Option.Set(OptionData)
  57. frame := buf.New()
  58. if err := frame.Reset(meta.AsSupplier()); err != nil {
  59. return err
  60. }
  61. if err := frame.AppendSupplier(serial.WriteUint16(uint16(mb.Len()))); err != nil {
  62. return err
  63. }
  64. mb2 := buf.NewMultiBuffer()
  65. mb2.Append(frame)
  66. mb2.AppendMulti(mb)
  67. return w.writer.Write(mb2)
  68. }
  69. // Write implements buf.MultiBufferWriter.
  70. func (w *Writer) Write(mb buf.MultiBuffer) error {
  71. defer mb.Release()
  72. if mb.IsEmpty() {
  73. return w.writeMetaOnly()
  74. }
  75. for !mb.IsEmpty() {
  76. var chunk buf.MultiBuffer
  77. if w.transferType == protocol.TransferTypeStream {
  78. chunk = mb.SliceBySize(8 * 1024)
  79. } else {
  80. chunk = buf.NewMultiBufferValue(mb.SplitFirst())
  81. }
  82. if err := w.writeData(chunk); err != nil {
  83. return err
  84. }
  85. }
  86. return nil
  87. }
  88. func (w *Writer) Close() {
  89. meta := FrameMetadata{
  90. SessionID: w.id,
  91. SessionStatus: SessionStatusEnd,
  92. }
  93. frame := buf.New()
  94. common.Must(frame.Reset(meta.AsSupplier()))
  95. w.writer.Write(buf.NewMultiBufferValue(frame))
  96. }