writer.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package mux
  2. import (
  3. "runtime"
  4. "v2ray.com/core/common/buf"
  5. "v2ray.com/core/common/net"
  6. "v2ray.com/core/common/protocol"
  7. "v2ray.com/core/common/serial"
  8. )
  9. type Writer struct {
  10. dest net.Destination
  11. writer buf.Writer
  12. id uint16
  13. followup bool
  14. transferType protocol.TransferType
  15. }
  16. func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType) *Writer {
  17. return &Writer{
  18. id: id,
  19. dest: dest,
  20. writer: writer,
  21. followup: false,
  22. transferType: transferType,
  23. }
  24. }
  25. func NewResponseWriter(id uint16, writer buf.Writer, transferType protocol.TransferType) *Writer {
  26. return &Writer{
  27. id: id,
  28. writer: writer,
  29. followup: true,
  30. transferType: transferType,
  31. }
  32. }
  33. func (w *Writer) getNextFrameMeta() FrameMetadata {
  34. meta := FrameMetadata{
  35. SessionID: w.id,
  36. Target: w.dest,
  37. }
  38. if w.followup {
  39. meta.SessionStatus = SessionStatusKeep
  40. } else {
  41. w.followup = true
  42. meta.SessionStatus = SessionStatusNew
  43. }
  44. return meta
  45. }
  46. func (w *Writer) writeMetaOnly() error {
  47. meta := w.getNextFrameMeta()
  48. b := buf.New()
  49. if err := b.AppendSupplier(meta.AsSupplier()); err != nil {
  50. return err
  51. }
  52. runtime.KeepAlive(meta)
  53. return w.writer.Write(buf.NewMultiBufferValue(b))
  54. }
  55. func (w *Writer) writeData(mb buf.MultiBuffer) error {
  56. meta := w.getNextFrameMeta()
  57. meta.Option.Set(OptionData)
  58. frame := buf.New()
  59. if err := frame.AppendSupplier(meta.AsSupplier()); err != nil {
  60. return err
  61. }
  62. runtime.KeepAlive(meta)
  63. if err := frame.AppendSupplier(serial.WriteUint16(uint16(mb.Len()))); err != nil {
  64. return err
  65. }
  66. mb2 := buf.NewMultiBuffer()
  67. mb2.Append(frame)
  68. mb2.AppendMulti(mb)
  69. return w.writer.Write(mb2)
  70. }
  71. // Write implements buf.MultiBufferWriter.
  72. func (w *Writer) Write(mb buf.MultiBuffer) error {
  73. if mb.IsEmpty() {
  74. return w.writeMetaOnly()
  75. }
  76. if w.transferType == protocol.TransferTypeStream {
  77. const chunkSize = 8 * 1024
  78. for !mb.IsEmpty() {
  79. slice := mb.SliceBySize(chunkSize)
  80. if err := w.writeData(slice); err != nil {
  81. return err
  82. }
  83. }
  84. } else {
  85. for _, b := range mb {
  86. if err := w.writeData(buf.NewMultiBufferValue(b)); err != nil {
  87. return err
  88. }
  89. }
  90. }
  91. return nil
  92. }
  93. func (w *Writer) Close() {
  94. meta := FrameMetadata{
  95. SessionID: w.id,
  96. SessionStatus: SessionStatusEnd,
  97. }
  98. frame := buf.New()
  99. frame.AppendSupplier(meta.AsSupplier())
  100. runtime.KeepAlive(meta)
  101. w.writer.Write(buf.NewMultiBufferValue(frame))
  102. }