hub.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package assembly
  2. import (
  3. "context"
  4. gonet "net"
  5. "time"
  6. "github.com/v2fly/v2ray-core/v5/transport/internet/transportcommon"
  7. "github.com/v2fly/v2ray-core/v5/common"
  8. "github.com/v2fly/v2ray-core/v5/common/net"
  9. "github.com/v2fly/v2ray-core/v5/common/serial"
  10. "github.com/v2fly/v2ray-core/v5/transport/internet"
  11. "github.com/v2fly/v2ray-core/v5/transport/internet/request"
  12. )
  13. type server struct {
  14. tripper request.RoundTripperServer
  15. assembler request.SessionAssemblerServer
  16. addConn internet.ConnHandler
  17. streamSettings *internet.MemoryStreamConfig
  18. addr net.Address
  19. port net.Port
  20. }
  21. func (s server) Listen(ctx context.Context) (net.Listener, error) {
  22. return transportcommon.ListenWithSecuritySettings(ctx, s.addr, s.port, s.streamSettings)
  23. }
  24. func (s server) AutoImplListener() request.Listener {
  25. return s
  26. }
  27. func (s server) Close() error {
  28. if err := s.tripper.Close(); err != nil {
  29. return newError("failed to close tripper").Base(err)
  30. }
  31. if runnableAssembler, ok := s.assembler.(common.Runnable); ok {
  32. if err := runnableAssembler.Close(); err != nil {
  33. return newError("failed to close assembler").Base(err)
  34. }
  35. }
  36. return nil
  37. }
  38. func (s server) Addr() net.Addr {
  39. // Unimplemented
  40. return nil
  41. }
  42. type serverConnection struct {
  43. request.Session
  44. }
  45. func (s serverConnection) LocalAddr() gonet.Addr {
  46. return &net.UnixAddr{Name: "unimplemented"}
  47. }
  48. func (s serverConnection) RemoteAddr() gonet.Addr {
  49. return &net.UnixAddr{Name: "unimplemented"}
  50. }
  51. func (s serverConnection) SetDeadline(t time.Time) error {
  52. // Unimplemented
  53. return nil
  54. }
  55. func (s serverConnection) SetReadDeadline(t time.Time) error {
  56. // Unimplemented
  57. return nil
  58. }
  59. func (s serverConnection) SetWriteDeadline(t time.Time) error {
  60. // Unimplemented
  61. return nil
  62. }
  63. func (s server) OnNewSession(ctx context.Context, sess request.Session, opts ...request.SessionOption) error {
  64. s.addConn(&serverConnection{sess})
  65. return nil
  66. }
  67. func (s server) SessionReceiver() request.SessionReceiver {
  68. return s
  69. }
  70. func (s server) TripperReceiver() request.TripperReceiver {
  71. return s.assembler
  72. }
  73. func listenRequest(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
  74. transportConfiguration := streamSettings.ProtocolSettings.(*Config)
  75. serverAssembly := &server{addConn: addConn}
  76. assemblerConfigInstance, err := serial.GetInstanceOf(transportConfiguration.Assembler)
  77. if err != nil {
  78. return nil, newError("failed to get config instance of assembler").Base(err)
  79. }
  80. assembler, err := common.CreateObject(ctx, assemblerConfigInstance)
  81. if err != nil {
  82. return nil, newError("failed to create assembler").Base(err)
  83. }
  84. if typedAssembler, ok := assembler.(request.SessionAssemblerServer); !ok {
  85. return nil, newError("failed to type assert assembler to SessionAssemblerServer")
  86. } else {
  87. serverAssembly.assembler = typedAssembler
  88. }
  89. roundtripperConfigInstance, err := serial.GetInstanceOf(transportConfiguration.Roundtripper)
  90. if err != nil {
  91. return nil, newError("failed to get config instance of roundtripper").Base(err)
  92. }
  93. roundtripper, err := common.CreateObject(ctx, roundtripperConfigInstance)
  94. if err != nil {
  95. return nil, newError("failed to create roundtripper").Base(err)
  96. }
  97. if typedRoundtripper, ok := roundtripper.(request.RoundTripperServer); !ok {
  98. return nil, newError("failed to type assert roundtripper to RoundTripperServer")
  99. } else {
  100. serverAssembly.tripper = typedRoundtripper
  101. }
  102. serverAssembly.addr = address
  103. serverAssembly.port = port
  104. serverAssembly.streamSettings = streamSettings
  105. serverAssembly.assembler.OnTransportServerAssemblyReady(serverAssembly)
  106. serverAssembly.tripper.OnTransportServerAssemblyReady(serverAssembly)
  107. if err := serverAssembly.tripper.Start(); err != nil {
  108. return nil, newError("failed to start tripper").Base(err)
  109. }
  110. if runnableAssembler, ok := serverAssembly.assembler.(common.Runnable); ok {
  111. if err := runnableAssembler.Start(); err != nil {
  112. return nil, newError("failed to start assembler").Base(err)
  113. }
  114. }
  115. return serverAssembly, nil
  116. }
  117. func init() {
  118. common.Must(internet.RegisterTransportListener(protocolName, listenRequest))
  119. }