| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- package mux
- import (
- "context"
- "io"
- "v2ray.com/core"
- "v2ray.com/core/common/buf"
- "v2ray.com/core/common/errors"
- "v2ray.com/core/common/log"
- "v2ray.com/core/common/net"
- "v2ray.com/core/common/protocol"
- "v2ray.com/core/common/session"
- "v2ray.com/core/common/vio"
- "v2ray.com/core/features/routing"
- "v2ray.com/core/transport/pipe"
- )
- type Server struct {
- dispatcher routing.Dispatcher
- }
- // NewServer creates a new mux.Server.
- func NewServer(ctx context.Context) *Server {
- s := &Server{}
- core.RequireFeatures(ctx, func(d routing.Dispatcher) {
- s.dispatcher = d
- })
- return s
- }
- // Type implements common.HasType.
- func (s *Server) Type() interface{} {
- return s.dispatcher.Type()
- }
- // Dispatch impliments routing.Dispatcher
- func (s *Server) Dispatch(ctx context.Context, dest net.Destination) (*vio.Link, error) {
- if dest.Address != muxCoolAddress {
- return s.dispatcher.Dispatch(ctx, dest)
- }
- opts := pipe.OptionsFromContext(ctx)
- uplinkReader, uplinkWriter := pipe.New(opts...)
- downlinkReader, downlinkWriter := pipe.New(opts...)
- worker := &ServerWorker{
- dispatcher: s.dispatcher,
- link: &vio.Link{
- Reader: uplinkReader,
- Writer: downlinkWriter,
- },
- sessionManager: NewSessionManager(),
- }
- go worker.run(ctx)
- return &vio.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil
- }
- // Start implements common.Runnable.
- func (s *Server) Start() error {
- return nil
- }
- // Close implements common.Closable.
- func (s *Server) Close() error {
- return nil
- }
- type ServerWorker struct {
- dispatcher routing.Dispatcher
- link *vio.Link
- sessionManager *SessionManager
- }
- func handle(ctx context.Context, s *Session, output buf.Writer) {
- writer := NewResponseWriter(s.ID, output, s.transferType)
- if err := buf.Copy(s.input, writer); err != nil {
- newError("session ", s.ID, " ends.").Base(err).WriteToLog(session.ExportIDToError(ctx))
- writer.hasError = true
- }
- writer.Close()
- s.Close()
- }
- func (w *ServerWorker) handleStatusKeepAlive(meta *FrameMetadata, reader *buf.BufferedReader) error {
- if meta.Option.Has(OptionData) {
- return drain(NewStreamReader(reader))
- }
- return nil
- }
- func (w *ServerWorker) handleStatusNew(ctx context.Context, meta *FrameMetadata, reader *buf.BufferedReader) error {
- newError("received request for ", meta.Target).WriteToLog(session.ExportIDToError(ctx))
- {
- msg := &log.AccessMessage{
- To: meta.Target,
- Status: log.AccessAccepted,
- Reason: "",
- }
- if inbound := session.InboundFromContext(ctx); inbound != nil && inbound.Source.IsValid() {
- msg.From = inbound.Source
- }
- log.Record(msg)
- }
- link, err := w.dispatcher.Dispatch(ctx, meta.Target)
- if err != nil {
- if meta.Option.Has(OptionData) {
- drain(NewStreamReader(reader))
- }
- return newError("failed to dispatch request.").Base(err)
- }
- s := &Session{
- input: link.Reader,
- output: link.Writer,
- parent: w.sessionManager,
- ID: meta.SessionID,
- transferType: protocol.TransferTypeStream,
- }
- if meta.Target.Network == net.Network_UDP {
- s.transferType = protocol.TransferTypePacket
- }
- w.sessionManager.Add(s)
- go handle(ctx, s, w.link.Writer)
- if !meta.Option.Has(OptionData) {
- return nil
- }
- rr := s.NewReader(reader)
- if err := buf.Copy(rr, s.output); err != nil {
- drain(rr)
- pipe.CloseError(s.input)
- return s.Close()
- }
- return nil
- }
- func (w *ServerWorker) handleStatusKeep(meta *FrameMetadata, reader *buf.BufferedReader) error {
- if !meta.Option.Has(OptionData) {
- return nil
- }
- if s, found := w.sessionManager.Get(meta.SessionID); found {
- rr := s.NewReader(reader)
- if err := buf.Copy(rr, s.output); err != nil {
- drain(rr)
- pipe.CloseError(s.input)
- return s.Close()
- }
- return nil
- }
- return drain(NewStreamReader(reader))
- }
- func (w *ServerWorker) handleStatusEnd(meta *FrameMetadata, reader *buf.BufferedReader) error {
- if s, found := w.sessionManager.Get(meta.SessionID); found {
- if meta.Option.Has(OptionError) {
- pipe.CloseError(s.input)
- pipe.CloseError(s.output)
- }
- s.Close()
- }
- if meta.Option.Has(OptionData) {
- return drain(NewStreamReader(reader))
- }
- return nil
- }
- func (w *ServerWorker) handleFrame(ctx context.Context, reader *buf.BufferedReader) error {
- var meta FrameMetadata
- err := meta.Unmarshal(reader)
- if err != nil {
- return newError("failed to read metadata").Base(err)
- }
- switch meta.SessionStatus {
- case SessionStatusKeepAlive:
- err = w.handleStatusKeepAlive(&meta, reader)
- case SessionStatusEnd:
- err = w.handleStatusEnd(&meta, reader)
- case SessionStatusNew:
- err = w.handleStatusNew(ctx, &meta, reader)
- case SessionStatusKeep:
- err = w.handleStatusKeep(&meta, reader)
- default:
- status := meta.SessionStatus
- return newError("unknown status: ", status).AtError()
- }
- if err != nil {
- return newError("failed to process data").Base(err)
- }
- return nil
- }
- func (w *ServerWorker) run(ctx context.Context) {
- input := w.link.Reader
- reader := &buf.BufferedReader{Reader: input}
- defer w.sessionManager.Close() // nolint: errcheck
- for {
- select {
- case <-ctx.Done():
- return
- default:
- err := w.handleFrame(ctx, reader)
- if err != nil {
- if errors.Cause(err) != io.EOF {
- newError("unexpected EOF").Base(err).WriteToLog(session.ExportIDToError(ctx))
- pipe.CloseError(input)
- }
- return
- }
- }
- }
- }
|