server.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package simple
  2. import (
  3. "bytes"
  4. "context"
  5. "sync"
  6. "github.com/v2fly/v2ray-core/v5/common"
  7. "github.com/v2fly/v2ray-core/v5/transport/internet/request"
  8. )
  9. func newServer(config *ServerConfig) request.SessionAssemblerServer {
  10. return &simpleAssemblerServer{}
  11. }
  12. type simpleAssemblerServer struct {
  13. sessions sync.Map
  14. assembly request.TransportServerAssembly
  15. }
  16. func (s *simpleAssemblerServer) OnTransportServerAssemblyReady(assembly request.TransportServerAssembly) {
  17. s.assembly = assembly
  18. }
  19. func (s *simpleAssemblerServer) OnRoundTrip(ctx context.Context, req request.Request, opts ...request.RoundTripperOption,
  20. ) (resp request.Response, err error) {
  21. connectionID := req.ConnectionTag
  22. session := newSimpleAssemblerServerSession(ctx)
  23. loadedSession, loaded := s.sessions.LoadOrStore(string(connectionID), session)
  24. if loaded {
  25. session = loadedSession.(*simpleAssemblerServerSession)
  26. } else {
  27. if err := s.assembly.SessionReceiver().OnNewSession(ctx, session); err != nil {
  28. return request.Response{}, newError("failed to create new session").Base(err)
  29. }
  30. }
  31. return session.OnRoundTrip(ctx, req, opts...)
  32. }
  33. func newSimpleAssemblerServerSession(ctx context.Context) *simpleAssemblerServerSession {
  34. sessionCtx, finish := context.WithCancel(ctx)
  35. return &simpleAssemblerServerSession{
  36. readBuffer: bytes.NewBuffer(nil),
  37. readChan: make(chan []byte, 16),
  38. requestProcessed: make(chan struct{}),
  39. writeLock: new(sync.Mutex),
  40. writeBuffer: bytes.NewBuffer(nil),
  41. maxWriteSize: 4096,
  42. ctx: sessionCtx,
  43. finish: finish,
  44. }
  45. }
  46. type simpleAssemblerServerSession struct {
  47. maxWriteSize int
  48. readBuffer *bytes.Buffer
  49. readChan chan []byte
  50. requestProcessed chan struct{}
  51. writeLock *sync.Mutex
  52. writeBuffer *bytes.Buffer
  53. ctx context.Context
  54. finish func()
  55. }
  56. func (s *simpleAssemblerServerSession) Read(p []byte) (n int, err error) {
  57. if s.readBuffer.Len() == 0 {
  58. select {
  59. case <-s.ctx.Done():
  60. return 0, s.ctx.Err()
  61. case data := <-s.readChan:
  62. s.readBuffer.Write(data)
  63. }
  64. }
  65. return s.readBuffer.Read(p)
  66. }
  67. func (s *simpleAssemblerServerSession) Write(p []byte) (n int, err error) {
  68. s.writeLock.Lock()
  69. n, err = s.writeBuffer.Write(p)
  70. length := s.writeBuffer.Len()
  71. s.writeLock.Unlock()
  72. if err != nil {
  73. return 0, err
  74. }
  75. if length > s.maxWriteSize {
  76. select {
  77. case <-s.requestProcessed:
  78. case <-s.ctx.Done():
  79. return 0, s.ctx.Err()
  80. }
  81. }
  82. return
  83. }
  84. func (s *simpleAssemblerServerSession) Close() error {
  85. s.finish()
  86. return nil
  87. }
  88. func (s *simpleAssemblerServerSession) OnRoundTrip(ctx context.Context, req request.Request, opts ...request.RoundTripperOption,
  89. ) (resp request.Response, err error) {
  90. if req.Data != nil && len(req.Data) > 0 {
  91. select {
  92. case <-s.ctx.Done():
  93. return request.Response{}, s.ctx.Err()
  94. case s.readChan <- req.Data:
  95. }
  96. }
  97. s.writeLock.Lock()
  98. nextWrite := s.writeBuffer.Next(s.maxWriteSize)
  99. data := make([]byte, len(nextWrite))
  100. copy(data, nextWrite)
  101. s.writeLock.Unlock()
  102. select {
  103. case s.requestProcessed <- struct{}{}:
  104. case <-s.ctx.Done():
  105. return request.Response{}, s.ctx.Err()
  106. default:
  107. }
  108. return request.Response{Data: data}, nil
  109. }
  110. func init() {
  111. common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  112. serverConfig, ok := config.(*ServerConfig)
  113. if !ok {
  114. return nil, newError("not a SimpleServerConfig")
  115. }
  116. return newServer(serverConfig), nil
  117. }))
  118. }