server.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. package shadowsocks
  2. import (
  3. "context"
  4. "v2ray.com/core/app"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/common"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/bufio"
  9. "v2ray.com/core/common/errors"
  10. "v2ray.com/core/common/log"
  11. v2net "v2ray.com/core/common/net"
  12. "v2ray.com/core/common/protocol"
  13. "v2ray.com/core/common/signal"
  14. "v2ray.com/core/proxy"
  15. "v2ray.com/core/transport/internet"
  16. "v2ray.com/core/transport/internet/udp"
  17. )
  18. type Server struct {
  19. packetDispatcher dispatcher.PacketDispatcher
  20. config *ServerConfig
  21. user *protocol.User
  22. account *ShadowsocksAccount
  23. meta *proxy.InboundHandlerMeta
  24. accepting bool
  25. tcpHub *internet.TCPHub
  26. udpHub *udp.Hub
  27. udpServer *udp.Server
  28. }
  29. func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
  30. space := app.SpaceFromContext(ctx)
  31. if space == nil {
  32. return nil, errors.New("Shadowsocks|Server: No space in context.")
  33. }
  34. meta := proxy.InboundMetaFromContext(ctx)
  35. if meta == nil {
  36. return nil, errors.New("Shadowsocks|Server: No inbound meta in context.")
  37. }
  38. if config.GetUser() == nil {
  39. return nil, protocol.ErrUserMissing
  40. }
  41. rawAccount, err := config.User.GetTypedAccount()
  42. if err != nil {
  43. return nil, errors.Base(err).Message("Shadowsocks|Server: Failed to get user account.")
  44. }
  45. account := rawAccount.(*ShadowsocksAccount)
  46. s := &Server{
  47. config: config,
  48. meta: meta,
  49. user: config.GetUser(),
  50. account: account,
  51. }
  52. space.OnInitialize(func() error {
  53. s.packetDispatcher = dispatcher.FromSpace(space)
  54. if s.packetDispatcher == nil {
  55. return errors.New("Shadowsocks|Server: Dispatcher is not found in space.")
  56. }
  57. return nil
  58. })
  59. return s, nil
  60. }
  61. func (v *Server) Port() v2net.Port {
  62. return v.meta.Port
  63. }
  64. func (v *Server) Close() {
  65. v.accepting = false
  66. // TODO: synchronization
  67. if v.tcpHub != nil {
  68. v.tcpHub.Close()
  69. v.tcpHub = nil
  70. }
  71. if v.udpHub != nil {
  72. v.udpHub.Close()
  73. v.udpHub = nil
  74. }
  75. }
  76. func (v *Server) Start() error {
  77. if v.accepting {
  78. return nil
  79. }
  80. tcpHub, err := internet.ListenTCP(v.meta.Address, v.meta.Port, v.handleConnection, v.meta.StreamSettings)
  81. if err != nil {
  82. log.Error("Shadowsocks: Failed to listen TCP on ", v.meta.Address, ":", v.meta.Port, ": ", err)
  83. return err
  84. }
  85. v.tcpHub = tcpHub
  86. if v.config.UdpEnabled {
  87. v.udpServer = udp.NewServer(v.packetDispatcher)
  88. udpHub, err := udp.ListenUDP(v.meta.Address, v.meta.Port, udp.ListenOption{Callback: v.handlerUDPPayload})
  89. if err != nil {
  90. log.Error("Shadowsocks: Failed to listen UDP on ", v.meta.Address, ":", v.meta.Port, ": ", err)
  91. return err
  92. }
  93. v.udpHub = udpHub
  94. }
  95. v.accepting = true
  96. return nil
  97. }
  98. func (v *Server) handlerUDPPayload(payload *buf.Buffer, session *proxy.SessionInfo) {
  99. source := session.Source
  100. request, data, err := DecodeUDPPacket(v.user, payload)
  101. if err != nil {
  102. log.Info("Shadowsocks|Server: Skipping invalid UDP packet from: ", source, ": ", err)
  103. log.Access(source, "", log.AccessRejected, err)
  104. payload.Release()
  105. return
  106. }
  107. if request.Option.Has(RequestOptionOneTimeAuth) && v.account.OneTimeAuth == Account_Disabled {
  108. log.Info("Shadowsocks|Server: Client payload enables OTA but server doesn't allow it.")
  109. payload.Release()
  110. return
  111. }
  112. if !request.Option.Has(RequestOptionOneTimeAuth) && v.account.OneTimeAuth == Account_Enabled {
  113. log.Info("Shadowsocks|Server: Client payload disables OTA but server forces it.")
  114. payload.Release()
  115. return
  116. }
  117. dest := request.Destination()
  118. log.Access(source, dest, log.AccessAccepted, "")
  119. log.Info("Shadowsocks|Server: Tunnelling request to ", dest)
  120. v.udpServer.Dispatch(&proxy.SessionInfo{Source: source, Destination: dest, User: request.User, Inbound: v.meta}, data, func(destination v2net.Destination, payload *buf.Buffer) {
  121. defer payload.Release()
  122. data, err := EncodeUDPPacket(request, payload)
  123. if err != nil {
  124. log.Warning("Shadowsocks|Server: Failed to encode UDP packet: ", err)
  125. return
  126. }
  127. defer data.Release()
  128. v.udpHub.WriteTo(data.Bytes(), source)
  129. })
  130. }
  131. func (v *Server) handleConnection(conn internet.Connection) {
  132. defer conn.Close()
  133. conn.SetReusable(false)
  134. timedReader := v2net.NewTimeOutReader(16, conn)
  135. bufferedReader := bufio.NewReader(timedReader)
  136. request, bodyReader, err := ReadTCPSession(v.user, bufferedReader)
  137. if err != nil {
  138. log.Access(conn.RemoteAddr(), "", log.AccessRejected, err)
  139. log.Info("Shadowsocks|Server: Failed to create request from: ", conn.RemoteAddr(), ": ", err)
  140. return
  141. }
  142. bufferedReader.SetBuffered(false)
  143. userSettings := v.user.GetSettings()
  144. timedReader.SetTimeOut(userSettings.PayloadReadTimeout)
  145. dest := request.Destination()
  146. log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "")
  147. log.Info("Shadowsocks|Server: Tunnelling request to ", dest)
  148. ray := v.packetDispatcher.DispatchToOutbound(&proxy.SessionInfo{
  149. Source: v2net.DestinationFromAddr(conn.RemoteAddr()),
  150. Destination: dest,
  151. User: request.User,
  152. Inbound: v.meta,
  153. })
  154. requestDone := signal.ExecuteAsync(func() error {
  155. bufferedWriter := bufio.NewWriter(conn)
  156. responseWriter, err := WriteTCPResponse(request, bufferedWriter)
  157. if err != nil {
  158. log.Warning("Shadowsocks|Server: Failed to write response: ", err)
  159. return err
  160. }
  161. payload, err := ray.InboundOutput().Read()
  162. if err != nil {
  163. return err
  164. }
  165. if err := responseWriter.Write(payload); err != nil {
  166. return err
  167. }
  168. payload.Release()
  169. if err := bufferedWriter.SetBuffered(false); err != nil {
  170. return err
  171. }
  172. if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
  173. log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
  174. return err
  175. }
  176. return nil
  177. })
  178. responseDone := signal.ExecuteAsync(func() error {
  179. defer ray.InboundInput().Close()
  180. if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
  181. log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
  182. return err
  183. }
  184. return nil
  185. })
  186. if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
  187. log.Info("Shadowsocks|Server: Connection ends with ", err)
  188. ray.InboundInput().CloseError()
  189. ray.InboundOutput().CloseError()
  190. }
  191. }
  192. func init() {
  193. common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  194. return NewServer(ctx, config.(*ServerConfig))
  195. }))
  196. }