writer.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package mux
  2. import (
  3. "v2ray.com/core/common/buf"
  4. "v2ray.com/core/common/net"
  5. "v2ray.com/core/common/serial"
  6. )
  7. type MuxWriter struct {
  8. id uint16
  9. dest net.Destination
  10. writer buf.Writer
  11. followup bool
  12. }
  13. func NewWriter(id uint16, dest net.Destination, writer buf.Writer) *MuxWriter {
  14. return &MuxWriter{
  15. id: id,
  16. dest: dest,
  17. writer: writer,
  18. }
  19. }
  20. func (w *MuxWriter) writeInternal(b *buf.Buffer) error {
  21. meta := FrameMetadata{
  22. SessionID: w.id,
  23. Target: w.dest,
  24. }
  25. if w.followup {
  26. meta.SessionStatus = SessionStatusKeep
  27. } else {
  28. w.followup = true
  29. meta.SessionStatus = SessionStatusNew
  30. }
  31. if b.Len() > 0 {
  32. meta.Option.Add(OptionData)
  33. }
  34. frame := buf.New()
  35. frame.AppendSupplier(meta.AsSupplier())
  36. if b.Len() > 0 {
  37. frame.AppendSupplier(serial.WriteUint16(0))
  38. lengthBytes := frame.BytesFrom(-2)
  39. nBytes, err := frame.Write(b.Bytes())
  40. if err != nil {
  41. frame.Release()
  42. return err
  43. }
  44. serial.Uint16ToBytes(uint16(nBytes), lengthBytes[:0])
  45. b.SliceFrom(nBytes)
  46. }
  47. return w.writer.Write(frame)
  48. }
  49. func (w *MuxWriter) Write(b *buf.Buffer) error {
  50. defer b.Release()
  51. if err := w.writeInternal(b); err != nil {
  52. return err
  53. }
  54. for !b.IsEmpty() {
  55. if err := w.writeInternal(b); err != nil {
  56. return err
  57. }
  58. }
  59. return nil
  60. }
  61. func (w *MuxWriter) Close() {
  62. meta := FrameMetadata{
  63. SessionID: w.id,
  64. Target: w.dest,
  65. SessionStatus: SessionStatusEnd,
  66. }
  67. frame := buf.New()
  68. frame.AppendSupplier(meta.AsSupplier())
  69. w.writer.Write(frame)
  70. }