server.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. package mux
  2. import (
  3. "context"
  4. "io"
  5. "v2ray.com/core"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/common/errors"
  8. "v2ray.com/core/common/log"
  9. "v2ray.com/core/common/net"
  10. "v2ray.com/core/common/protocol"
  11. "v2ray.com/core/common/session"
  12. "v2ray.com/core/common/vio"
  13. "v2ray.com/core/features/routing"
  14. "v2ray.com/core/transport/pipe"
  15. )
  16. type Server struct {
  17. dispatcher routing.Dispatcher
  18. }
  19. // NewServer creates a new mux.Server.
  20. func NewServer(ctx context.Context) *Server {
  21. s := &Server{}
  22. core.RequireFeatures(ctx, func(d routing.Dispatcher) {
  23. s.dispatcher = d
  24. })
  25. return s
  26. }
  27. // Type implements common.HasType.
  28. func (s *Server) Type() interface{} {
  29. return s.dispatcher.Type()
  30. }
  31. // Dispatch impliments routing.Dispatcher
  32. func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
  33. if dest.Address != muxCoolAddress {
  34. return s.dispatcher.Dispatch(ctx, dest)
  35. }
  36. opts := pipe.OptionsFromContext(ctx)
  37. uplinkReader, uplinkWriter := pipe.New(opts...)
  38. downlinkReader, downlinkWriter := pipe.New(opts...)
  39. worker := &ServerWorker{
  40. dispatcher: s.dispatcher,
  41. link: &vio.Link{
  42. Reader: uplinkReader,
  43. Writer: downlinkWriter,
  44. },
  45. sessionManager: NewSessionManager(),
  46. }
  47. go worker.run(ctx)
  48. return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
  49. }
  50. // Start implements common.Runnable.
  51. func (s *Server) Start() error {
  52. return nil
  53. }
  54. // Close implements common.Closable.
  55. func (s *Server) Close() error {
  56. return nil
  57. }
  58. type ServerWorker struct {
  59. dispatcher routing.Dispatcher
  60. link *vio.Link
  61. sessionManager *SessionManager
  62. }
  63. func handle(ctx context.Context, s *Session, output buf.Writer) {
  64. writer := NewResponseWriter(s.ID, output, s.transferType)
  65. if err := buf.Copy(s.input, writer); err != nil {
  66. newError("session ", s.ID, " ends.").Base(err).WriteToLog(session.ExportIDToError(ctx))
  67. writer.hasError = true
  68. }
  69. writer.Close()
  70. s.Close()
  71. }
  72. func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
  73. if meta.Option.Has(OptionData) {
  74. return drain(NewStreamReader(reader))
  75. }
  76. return nil
  77. }
  78. func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
  79. newError("received request for ", meta.Target).WriteToLog(session.ExportIDToError(ctx))
  80. {
  81. msg := &log.AccessMessage{
  82. To: meta.Target,
  83. Status: log.AccessAccepted,
  84. Reason: "",
  85. }
  86. if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Source.IsValid() {
  87. msg.From = inbound.Source
  88. }
  89. log.Record(msg)
  90. }
  91. link, err := w.dispatcher.Dispatch(ctx, meta.Target)
  92. if err != nil {
  93. if meta.Option.Has(OptionData) {
  94. drain(NewStreamReader(reader))
  95. }
  96. return newError("failed to dispatch request.").Base(err)
  97. }
  98. s := &Session{
  99. input: link.Reader,
  100. output: link.Writer,
  101. parent: w.sessionManager,
  102. ID: meta.SessionID,
  103. transferType: protocol.TransferTypeStream,
  104. }
  105. if meta.Target.Network == net.Network_UDP {
  106. s.transferType = protocol.TransferTypePacket
  107. }
  108. w.sessionManager.Add(s)
  109. go handle(ctx, s, w.link.Writer)
  110. if !meta.Option.Has(OptionData) {
  111. return nil
  112. }
  113. rr := s.NewReader(reader)
  114. if err := buf.Copy(rr, s.output); err != nil {
  115. drain(rr)
  116. pipe.CloseError(s.input)
  117. return s.Close()
  118. }
  119. return nil
  120. }
  121. func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
  122. if !meta.Option.Has(OptionData) {
  123. return nil
  124. }
  125. if s, found := w.sessionManager.Get(meta.SessionID); found {
  126. rr := s.NewReader(reader)
  127. if err := buf.Copy(rr, s.output); err != nil {
  128. drain(rr)
  129. pipe.CloseError(s.input)
  130. return s.Close()
  131. }
  132. return nil
  133. }
  134. return drain(NewStreamReader(reader))
  135. }
  136. func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
  137. if s, found := w.sessionManager.Get(meta.SessionID); found {
  138. if meta.Option.Has(OptionError) {
  139. pipe.CloseError(s.input)
  140. pipe.CloseError(s.output)
  141. }
  142. s.Close()
  143. }
  144. if meta.Option.Has(OptionData) {
  145. return drain(NewStreamReader(reader))
  146. }
  147. return nil
  148. }
  149. func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error {
  150. var meta FrameMetadata
  151. err := meta.Unmarshal(reader)
  152. if err != nil {
  153. return newError("failed to read metadata").Base(err)
  154. }
  155. switch meta.SessionStatus {
  156. case SessionStatusKeepAlive:
  157. err = w.handleStatusKeepAlive(&meta, reader)
  158. case SessionStatusEnd:
  159. err = w.handleStatusEnd(&meta, reader)
  160. case SessionStatusNew:
  161. err = w.handleStatusNew(ctx, &meta, reader)
  162. case SessionStatusKeep:
  163. err = w.handleStatusKeep(&meta, reader)
  164. default:
  165. status := meta.SessionStatus
  166. return newError("unknown status: ", status).AtError()
  167. }
  168. if err != nil {
  169. return newError("failed to process data").Base(err)
  170. }
  171. return nil
  172. }
  173. func (w *ServerWorker) run(ctx context.Context) {
  174. input := w.link.Reader
  175. reader := &buf.BufferedReader{Reader: input}
  176. defer w.sessionManager.Close() // nolint: errcheck
  177. for {
  178. select {
  179. case <-ctx.Done():
  180. return
  181. default:
  182. err := w.handleFrame(ctx, reader)
  183. if err != nil {
  184. if errors.Cause(err) != io.EOF {
  185. newError("unexpected EOF").Base(err).WriteToLog(session.ExportIDToError(ctx))
  186. pipe.CloseError(input)
  187. }
  188. return
  189. }
  190. }
  191. }
  192. }