server.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package shadowsocks
  2. import (
  3. "context"
  4. "v2ray.com/core/app"
  5. "v2ray.com/core/app/dispatcher"
  6. "v2ray.com/core/common"
  7. "v2ray.com/core/common/buf"
  8. "v2ray.com/core/common/bufio"
  9. "v2ray.com/core/common/errors"
  10. "v2ray.com/core/common/log"
  11. "v2ray.com/core/common/net"
  12. "v2ray.com/core/common/protocol"
  13. "v2ray.com/core/common/signal"
  14. "v2ray.com/core/proxy"
  15. "v2ray.com/core/transport/internet"
  16. "v2ray.com/core/transport/internet/udp"
  17. )
  18. type Server struct {
  19. packetDispatcher dispatcher.Interface
  20. config *ServerConfig
  21. user *protocol.User
  22. account *ShadowsocksAccount
  23. meta *proxy.InboundHandlerMeta
  24. accepting bool
  25. tcpHub *internet.TCPHub
  26. udpHub *udp.Hub
  27. udpServer *udp.Server
  28. }
  29. func NewServer(ctx context.Context, config *ServerConfig) (*Server, error) {
  30. space := app.SpaceFromContext(ctx)
  31. if space == nil {
  32. return nil, errors.New("Shadowsocks|Server: No space in context.")
  33. }
  34. meta := proxy.InboundMetaFromContext(ctx)
  35. if meta == nil {
  36. return nil, errors.New("Shadowsocks|Server: No inbound meta in context.")
  37. }
  38. if config.GetUser() == nil {
  39. return nil, protocol.ErrUserMissing
  40. }
  41. rawAccount, err := config.User.GetTypedAccount()
  42. if err != nil {
  43. return nil, errors.Base(err).Message("Shadowsocks|Server: Failed to get user account.")
  44. }
  45. account := rawAccount.(*ShadowsocksAccount)
  46. s := &Server{
  47. config: config,
  48. meta: meta,
  49. user: config.GetUser(),
  50. account: account,
  51. }
  52. space.OnInitialize(func() error {
  53. s.packetDispatcher = dispatcher.FromSpace(space)
  54. if s.packetDispatcher == nil {
  55. return errors.New("Shadowsocks|Server: Dispatcher is not found in space.")
  56. }
  57. return nil
  58. })
  59. return s, nil
  60. }
  61. func (v *Server) Network() net.NetworkList {
  62. list := net.NetworkList{
  63. Network: []net.Network{net.Network_TCP},
  64. }
  65. if v.config.UdpEnabled {
  66. list.Network = append(list.Network, net.Network_UDP)
  67. }
  68. return list
  69. }
  70. func (v *Server) Port() net.Port {
  71. return v.meta.Port
  72. }
  73. func (v *Server) Close() {
  74. v.accepting = false
  75. // TODO: synchronization
  76. if v.tcpHub != nil {
  77. v.tcpHub.Close()
  78. v.tcpHub = nil
  79. }
  80. if v.udpHub != nil {
  81. v.udpHub.Close()
  82. v.udpHub = nil
  83. }
  84. }
  85. func (v *Server) Start() error {
  86. if v.accepting {
  87. return nil
  88. }
  89. tcpHub, err := internet.ListenTCP(v.meta.Address, v.meta.Port, v.handleConnection, v.meta.StreamSettings)
  90. if err != nil {
  91. log.Error("Shadowsocks: Failed to listen TCP on ", v.meta.Address, ":", v.meta.Port, ": ", err)
  92. return err
  93. }
  94. v.tcpHub = tcpHub
  95. if v.config.UdpEnabled {
  96. v.udpServer = udp.NewServer(v.packetDispatcher)
  97. udpHub, err := udp.ListenUDP(v.meta.Address, v.meta.Port, udp.ListenOption{Callback: v.handlerUDPPayload})
  98. if err != nil {
  99. log.Error("Shadowsocks: Failed to listen UDP on ", v.meta.Address, ":", v.meta.Port, ": ", err)
  100. return err
  101. }
  102. v.udpHub = udpHub
  103. }
  104. v.accepting = true
  105. return nil
  106. }
  107. func (v *Server) handlerUDPPayload(payload *buf.Buffer, session *proxy.SessionInfo) {
  108. source := session.Source
  109. request, data, err := DecodeUDPPacket(v.user, payload)
  110. if err != nil {
  111. log.Info("Shadowsocks|Server: Skipping invalid UDP packet from: ", source, ": ", err)
  112. log.Access(source, "", log.AccessRejected, err)
  113. payload.Release()
  114. return
  115. }
  116. if request.Option.Has(RequestOptionOneTimeAuth) && v.account.OneTimeAuth == Account_Disabled {
  117. log.Info("Shadowsocks|Server: Client payload enables OTA but server doesn't allow it.")
  118. payload.Release()
  119. return
  120. }
  121. if !request.Option.Has(RequestOptionOneTimeAuth) && v.account.OneTimeAuth == Account_Enabled {
  122. log.Info("Shadowsocks|Server: Client payload disables OTA but server forces it.")
  123. payload.Release()
  124. return
  125. }
  126. dest := request.Destination()
  127. log.Access(source, dest, log.AccessAccepted, "")
  128. log.Info("Shadowsocks|Server: Tunnelling request to ", dest)
  129. v.udpServer.Dispatch(&proxy.SessionInfo{Source: source, Destination: dest, User: request.User, Inbound: v.meta}, data, func(destination net.Destination, payload *buf.Buffer) {
  130. defer payload.Release()
  131. data, err := EncodeUDPPacket(request, payload)
  132. if err != nil {
  133. log.Warning("Shadowsocks|Server: Failed to encode UDP packet: ", err)
  134. return
  135. }
  136. defer data.Release()
  137. v.udpHub.WriteTo(data.Bytes(), source)
  138. })
  139. }
  140. func (v *Server) handleConnection(conn internet.Connection) {
  141. defer conn.Close()
  142. conn.SetReusable(false)
  143. timedReader := net.NewTimeOutReader(16, conn)
  144. bufferedReader := bufio.NewReader(timedReader)
  145. request, bodyReader, err := ReadTCPSession(v.user, bufferedReader)
  146. if err != nil {
  147. log.Access(conn.RemoteAddr(), "", log.AccessRejected, err)
  148. log.Info("Shadowsocks|Server: Failed to create request from: ", conn.RemoteAddr(), ": ", err)
  149. return
  150. }
  151. bufferedReader.SetBuffered(false)
  152. userSettings := v.user.GetSettings()
  153. timedReader.SetTimeOut(userSettings.PayloadReadTimeout)
  154. dest := request.Destination()
  155. log.Access(conn.RemoteAddr(), dest, log.AccessAccepted, "")
  156. log.Info("Shadowsocks|Server: Tunnelling request to ", dest)
  157. ray := v.packetDispatcher.DispatchToOutbound(&proxy.SessionInfo{
  158. Source: net.DestinationFromAddr(conn.RemoteAddr()),
  159. Destination: dest,
  160. User: request.User,
  161. Inbound: v.meta,
  162. })
  163. requestDone := signal.ExecuteAsync(func() error {
  164. bufferedWriter := bufio.NewWriter(conn)
  165. responseWriter, err := WriteTCPResponse(request, bufferedWriter)
  166. if err != nil {
  167. log.Warning("Shadowsocks|Server: Failed to write response: ", err)
  168. return err
  169. }
  170. payload, err := ray.InboundOutput().Read()
  171. if err != nil {
  172. return err
  173. }
  174. if err := responseWriter.Write(payload); err != nil {
  175. return err
  176. }
  177. payload.Release()
  178. if err := bufferedWriter.SetBuffered(false); err != nil {
  179. return err
  180. }
  181. if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
  182. log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
  183. return err
  184. }
  185. return nil
  186. })
  187. responseDone := signal.ExecuteAsync(func() error {
  188. defer ray.InboundInput().Close()
  189. if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
  190. log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
  191. return err
  192. }
  193. return nil
  194. })
  195. if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
  196. log.Info("Shadowsocks|Server: Connection ends with ", err)
  197. ray.InboundInput().CloseError()
  198. ray.InboundOutput().CloseError()
  199. }
  200. }
  201. func init() {
  202. common.Must(common.RegisterConfig((*ServerConfig)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
  203. return NewServer(ctx, config.(*ServerConfig))
  204. }))
  205. }