server.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package mtproto
  2. import (
  3. "context"
  4. "v2ray.com/core"
  5. "v2ray.com/core/common"
  6. "v2ray.com/core/common/buf"
  7. "v2ray.com/core/common/crypto"
  8. "v2ray.com/core/common/net"
  9. "v2ray.com/core/common/protocol"
  10. "v2ray.com/core/common/task"
  11. "v2ray.com/core/transport/internet"
  12. "v2ray.com/core/transport/pipe"
  13. )
  14. var (
  15. dcList = []net.Address{
  16. net.ParseAddress("149.154.175.50"),
  17. net.ParseAddress("149.154.167.51"),
  18. net.ParseAddress("149.154.175.100"),
  19. net.ParseAddress("149.154.167.91"),
  20. net.ParseAddress("149.154.171.5"),
  21. }
  22. )
  23. type Server struct {
  24. user *protocol.User
  25. account *Account
  26. }
  27. func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
  28. if len(config.User) == 0 {
  29. return nil, newError("no user configured.")
  30. }
  31. user := config.User[0]
  32. rawAccount, err := config.User[0].GetTypedAccount()
  33. if err != nil {
  34. return nil, newError("invalid account").Base(err)
  35. }
  36. account, ok := rawAccount.(*Account)
  37. if !ok {
  38. return nil, newError("not a MTProto account")
  39. }
  40. return &Server{
  41. user: user,
  42. account: account,
  43. }, nil
  44. }
  45. func (s *Server) Network() net.NetworkList {
  46. return net.NetworkList{
  47. Network: []net.Network{net.Network_TCP},
  48. }
  49. }
  50. func (s *Server) Process(ctx context.Context, network net.Network, conn internet.Connection, dispatcher core.Dispatcher) error {
  51. auth, err := ReadAuthentication(conn)
  52. if err != nil {
  53. return newError("failed to read authentication header").Base(err)
  54. }
  55. auth.ApplySecret(s.account.Secret)
  56. decryptor := crypto.NewAesCTRStream(auth.DecodingKey[:], auth.DecodingNonce[:])
  57. decryptor.XORKeyStream(auth.Header[:], auth.Header[:])
  58. dcID := auth.DataCenterID()
  59. if dcID >= uint16(len(dcList)) {
  60. return newError("invalid data center id: ", dcID)
  61. }
  62. dest := net.Destination{
  63. Network: net.Network_TCP,
  64. Address: dcList[dcID],
  65. Port: net.Port(443),
  66. }
  67. link, err := dispatcher.Dispatch(ctx, dest)
  68. if err != nil {
  69. return newError("failed to dispatch request to: ", dest).Base(err)
  70. }
  71. request := func() error {
  72. reader := buf.NewReader(crypto.NewCryptionReader(decryptor, conn))
  73. return buf.Copy(reader, link.Writer)
  74. }
  75. response := func() error {
  76. encryptor := crypto.NewAesCTRStream(auth.EncodingKey[:], auth.EncodingNonce[:])
  77. writer := buf.NewWriter(crypto.NewCryptionWriter(encryptor, conn))
  78. return buf.Copy(link.Reader, writer)
  79. }
  80. var responseDoneAndCloseWriter = task.Single(response, task.OnSuccess(task.Close(link.Writer)))
  81. if err := task.Run(task.WithContext(ctx), task.Parallel(request, responseDoneAndCloseWriter))(); err != nil {
  82. pipe.CloseError(link.Reader)
  83. pipe.CloseError(link.Writer)
  84. return newError("connection ends").Base(err)
  85. }
  86. return nil
  87. }
  88. func init() {
  89. common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  90. return NewServer(ctx, config.(*ServerConfig))
  91. }))
  92. }