writer.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package mux
  2. import (
  3. "v2ray.com/core/common"
  4. "v2ray.com/core/common/buf"
  5. "v2ray.com/core/common/errors"
  6. "v2ray.com/core/common/net"
  7. "v2ray.com/core/common/protocol"
  8. "v2ray.com/core/common/serial"
  9. )
  10. type Writer struct {
  11. dest net.Destination
  12. writer buf.Writer
  13. id uint16
  14. followup bool
  15. hasError bool
  16. transferType protocol.TransferType
  17. }
  18. func NewWriter(id uint16, dest net.Destination, writer buf.Writer, transferType protocol.TransferType) *Writer {
  19. return &Writer{
  20. id: id,
  21. dest: dest,
  22. writer: writer,
  23. followup: false,
  24. transferType: transferType,
  25. }
  26. }
  27. func NewResponseWriter(id uint16, writer buf.Writer, transferType protocol.TransferType) *Writer {
  28. return &Writer{
  29. id: id,
  30. writer: writer,
  31. followup: true,
  32. transferType: transferType,
  33. }
  34. }
  35. func (w *Writer) getNextFrameMeta() FrameMetadata {
  36. meta := FrameMetadata{
  37. SessionID: w.id,
  38. Target: w.dest,
  39. }
  40. if w.followup {
  41. meta.SessionStatus = SessionStatusKeep
  42. } else {
  43. w.followup = true
  44. meta.SessionStatus = SessionStatusNew
  45. }
  46. return meta
  47. }
  48. func (w *Writer) writeMetaOnly() error {
  49. meta := w.getNextFrameMeta()
  50. b := buf.New()
  51. if err := meta.WriteTo(b); err != nil {
  52. return err
  53. }
  54. return w.writer.WriteMultiBuffer(buf.MultiBuffer{b})
  55. }
  56. func writeMetaWithFrame(writer buf.Writer, meta FrameMetadata, data buf.MultiBuffer) error {
  57. frame := buf.New()
  58. if err := meta.WriteTo(frame); err != nil {
  59. return err
  60. }
  61. if _, err := serial.WriteUint16(frame, uint16(data.Len())); err != nil {
  62. return err
  63. }
  64. if len(data)+1 > 64*1024*1024 {
  65. return errors.New("value too large")
  66. }
  67. sliceSize := len(data) + 1
  68. mb2 := make(buf.MultiBuffer, 0, sliceSize)
  69. mb2 = append(mb2, frame)
  70. mb2 = append(mb2, data...)
  71. return writer.WriteMultiBuffer(mb2)
  72. }
  73. func (w *Writer) writeData(mb buf.MultiBuffer) error {
  74. meta := w.getNextFrameMeta()
  75. meta.Option.Set(OptionData)
  76. return writeMetaWithFrame(w.writer, meta, mb)
  77. }
  78. // WriteMultiBuffer implements buf.Writer.
  79. func (w *Writer) WriteMultiBuffer(mb buf.MultiBuffer) error {
  80. defer buf.ReleaseMulti(mb)
  81. if mb.IsEmpty() {
  82. return w.writeMetaOnly()
  83. }
  84. for !mb.IsEmpty() {
  85. var chunk buf.MultiBuffer
  86. if w.transferType == protocol.TransferTypeStream {
  87. mb, chunk = buf.SplitSize(mb, 8*1024)
  88. } else {
  89. mb2, b := buf.SplitFirst(mb)
  90. mb = mb2
  91. chunk = buf.MultiBuffer{b}
  92. }
  93. if err := w.writeData(chunk); err != nil {
  94. return err
  95. }
  96. }
  97. return nil
  98. }
  99. // Close implements common.Closable.
  100. func (w *Writer) Close() error {
  101. meta := FrameMetadata{
  102. SessionID: w.id,
  103. SessionStatus: SessionStatusEnd,
  104. }
  105. if w.hasError {
  106. meta.Option.Set(OptionError)
  107. }
  108. frame := buf.New()
  109. common.Must(meta.WriteTo(frame))
  110. w.writer.WriteMultiBuffer(buf.MultiBuffer{frame})
  111. return nil
  112. }