periodic.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. package task
  2. import (
  3. "sync"
  4. "time"
  5. )
  6. // Periodic is a task that runs periodically.
  7. type Periodic struct {
  8. // Interval of the task being run
  9. Interval time.Duration
  10. // Execute is the task function
  11. Execute func() error
  12. // OnFailure will be called when Execute returns non-nil error
  13. OnError func(error)
  14. access sync.RWMutex
  15. timer *time.Timer
  16. closed bool
  17. }
  18. func (t *Periodic) setClosed(f bool) {
  19. t.access.Lock()
  20. t.closed = f
  21. t.access.Unlock()
  22. }
  23. func (t *Periodic) hasClosed() bool {
  24. t.access.RLock()
  25. defer t.access.RUnlock()
  26. return t.closed
  27. }
  28. func (t *Periodic) checkedExecute() error {
  29. if t.hasClosed() {
  30. return nil
  31. }
  32. if err := t.Execute(); err != nil {
  33. return err
  34. }
  35. t.access.Lock()
  36. defer t.access.Unlock()
  37. if t.closed {
  38. return nil
  39. }
  40. t.timer = time.AfterFunc(t.Interval, func() {
  41. if err := t.checkedExecute(); err != nil && t.OnError != nil {
  42. t.OnError(err)
  43. }
  44. })
  45. return nil
  46. }
  47. // Start implements common.Runnable. Start must not be called multiple times without Close being called.
  48. func (t *Periodic) Start() error {
  49. t.setClosed(false)
  50. if err := t.checkedExecute(); err != nil {
  51. t.setClosed(true)
  52. return err
  53. }
  54. return nil
  55. }
  56. // Close implements common.Closable.
  57. func (t *Periodic) Close() error {
  58. t.access.Lock()
  59. defer t.access.Unlock()
  60. t.closed = true
  61. if t.timer != nil {
  62. t.timer.Stop()
  63. t.timer = nil
  64. }
  65. return nil
  66. }