hub.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package assembly
  2. import (
  3. "context"
  4. gonet "net"
  5. "time"
  6. "github.com/v2fly/v2ray-core/v5/transport/internet/transportcommon"
  7. "github.com/v2fly/v2ray-core/v5/common"
  8. "github.com/v2fly/v2ray-core/v5/common/net"
  9. "github.com/v2fly/v2ray-core/v5/common/serial"
  10. "github.com/v2fly/v2ray-core/v5/transport/internet"
  11. "github.com/v2fly/v2ray-core/v5/transport/internet/request"
  12. )
  13. type server struct {
  14. tripper request.RoundTripperServer
  15. assembler request.SessionAssemblerServer
  16. addConn internet.ConnHandler
  17. streamSettings *internet.MemoryStreamConfig
  18. addr net.Address
  19. port net.Port
  20. }
  21. func (s server) Listen(ctx context.Context) (net.Listener, error) {
  22. return transportcommon.ListenWithSecuritySettings(ctx, s.addr, s.port, s.streamSettings)
  23. }
  24. func (s server) AutoImplListener() request.Listener {
  25. return s
  26. }
  27. func (s server) Close() error {
  28. if err := s.tripper.Close(); err != nil {
  29. return newError("failed to close tripper").Base(err)
  30. }
  31. return nil
  32. }
  33. func (s server) Addr() net.Addr {
  34. // Unimplemented
  35. return nil
  36. }
  37. type serverConnection struct {
  38. request.Session
  39. }
  40. func (s serverConnection) LocalAddr() gonet.Addr {
  41. return &net.UnixAddr{Name: "unimplemented"}
  42. }
  43. func (s serverConnection) RemoteAddr() gonet.Addr {
  44. return &net.UnixAddr{Name: "unimplemented"}
  45. }
  46. func (s serverConnection) SetDeadline(t time.Time) error {
  47. // Unimplemented
  48. return nil
  49. }
  50. func (s serverConnection) SetReadDeadline(t time.Time) error {
  51. // Unimplemented
  52. return nil
  53. }
  54. func (s serverConnection) SetWriteDeadline(t time.Time) error {
  55. // Unimplemented
  56. return nil
  57. }
  58. func (s server) OnNewSession(ctx context.Context, sess request.Session, opts ...request.SessionOption) error {
  59. s.addConn(&serverConnection{sess})
  60. return nil
  61. }
  62. func (s server) SessionReceiver() request.SessionReceiver {
  63. return s
  64. }
  65. func (s server) TripperReceiver() request.TripperReceiver {
  66. return s.assembler
  67. }
  68. func listenRequest(ctx context.Context, address net.Address, port net.Port, streamSettings *internet.MemoryStreamConfig, addConn internet.ConnHandler) (internet.Listener, error) {
  69. transportConfiguration := streamSettings.ProtocolSettings.(*Config)
  70. serverAssembly := &server{addConn: addConn}
  71. assemblerConfigInstance, err := serial.GetInstanceOf(transportConfiguration.Assembler)
  72. if err != nil {
  73. return nil, newError("failed to get config instance of assembler").Base(err)
  74. }
  75. assembler, err := common.CreateObject(ctx, assemblerConfigInstance)
  76. if err != nil {
  77. return nil, newError("failed to create assembler").Base(err)
  78. }
  79. if typedAssembler, ok := assembler.(request.SessionAssemblerServer); !ok {
  80. return nil, newError("failed to type assert assembler to SessionAssemblerServer")
  81. } else {
  82. serverAssembly.assembler = typedAssembler
  83. }
  84. roundtripperConfigInstance, err := serial.GetInstanceOf(transportConfiguration.Roundtripper)
  85. if err != nil {
  86. return nil, newError("failed to get config instance of roundtripper").Base(err)
  87. }
  88. roundtripper, err := common.CreateObject(ctx, roundtripperConfigInstance)
  89. if err != nil {
  90. return nil, newError("failed to create roundtripper").Base(err)
  91. }
  92. if typedRoundtripper, ok := roundtripper.(request.RoundTripperServer); !ok {
  93. return nil, newError("failed to type assert roundtripper to RoundTripperServer")
  94. } else {
  95. serverAssembly.tripper = typedRoundtripper
  96. }
  97. serverAssembly.addr = address
  98. serverAssembly.port = port
  99. serverAssembly.streamSettings = streamSettings
  100. serverAssembly.assembler.OnTransportServerAssemblyReady(serverAssembly)
  101. serverAssembly.tripper.OnTransportServerAssemblyReady(serverAssembly)
  102. if err := serverAssembly.tripper.Start(); err != nil {
  103. return nil, newError("failed to start tripper").Base(err)
  104. }
  105. return serverAssembly, nil
  106. }
  107. func init() {
  108. common.Must(internet.RegisterTransportListener(protocolName, listenRequest))
  109. }