From 0ed865f4e44bd6e863a920e907ea3dfd45d2effb Mon Sep 17 00:00:00 2001 From: Vladimir Stoilov Date: Wed, 19 Apr 2023 17:38:56 +0200 Subject: [PATCH] Add support for module sleep mode --- metrics/api.go | 4 +-- modules/modules.go | 63 ++++++++++++++++++++++++++++++++++++++ modules/sleepyticker.go | 66 ++++++++++++++++++++++++++++++++++++++++ modules/tasks.go | 4 +++ notifications/cleaner.go | 4 +-- utils/broadcastflag.go | 4 +-- 6 files changed, 139 insertions(+), 6 deletions(-) create mode 100644 modules/sleepyticker.go diff --git a/metrics/api.go b/metrics/api.go index 859e1bbe..9221f73c 100644 --- a/metrics/api.go +++ b/metrics/api.go @@ -121,14 +121,14 @@ func writeMetricsTo(ctx context.Context, url string) error { func metricsWriter(ctx context.Context) error { pushURL := pushOption() - ticker := time.NewTicker(1 * time.Minute) + ticker := module.NewSleepyTicker(1*time.Minute, 0) defer ticker.Stop() for { select { case <-ctx.Done(): return nil - case <-ticker.C: + case <-ticker.Read(): err := writeMetricsTo(ctx, pushURL) if err != nil { return err diff --git a/modules/modules.go b/modules/modules.go index 5f35d52d..74a04a63 100644 --- a/modules/modules.go +++ b/modules/modules.go @@ -20,6 +20,9 @@ var ( // modulesLocked locks `modules` during starting. modulesLocked = abool.New() + sleepMode = abool.NewBool(false) + taskSchedulerSleepModeExitChannel = make(chan struct{}) + moduleStartTimeout = 2 * time.Minute moduleStopTimeout = 1 * time.Minute @@ -37,6 +40,8 @@ type Module struct { enabled *abool.AtomicBool enabledAsDependency *abool.AtomicBool status uint8 + sleepMode *abool.AtomicBool + sleepWaitingChannel chan time.Time // failure status failureStatus uint8 @@ -100,6 +105,41 @@ func (m *Module) Dependencies() []*Module { return m.depModules } +// Sleep enables or disables sleep mode. +func (m *Module) Sleep(enable bool) { + set := m.sleepMode.SetToIf(!enable, enable) + if !set { + return + } + + // Notify all waiting tasks that we are not sleeping anymore. + m.Lock() + defer m.Unlock() + + if enable { + m.sleepWaitingChannel = make(chan time.Time) + } else { + close(m.sleepWaitingChannel) + } +} + +// IsSleeping returns true if sleep mode is enabled. +func (m *Module) IsSleeping() bool { + return m.sleepMode.IsSet() +} + +// WaitIfSleeping returns channel that will signal when it exits sleep mode. +func (m *Module) WaitIfSleeping() <-chan time.Time { + m.RLock() + defer m.RUnlock() + return m.sleepWaitingChannel +} + +// NewSleepyTicker returns new sleepyTicker that will respect the modules sleep mode. +func (m *Module) NewSleepyTicker(normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker { + return newSleepyTicker(m, normalDuration, sleepDuration) +} + func (m *Module) prep(reports chan *report) { // check and set intermediate status m.Lock() @@ -343,6 +383,8 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ... Name: name, enabled: abool.NewBool(false), enabledAsDependency: abool.NewBool(false), + sleepMode: abool.NewBool(false), + sleepWaitingChannel: make(chan time.Time), prepFn: prep, startFn: start, stopFn: stop, @@ -358,6 +400,8 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ... depNames: dependencies, } + newModule.Sleep(false) + return newModule } @@ -380,3 +424,22 @@ func initDependencies() error { return nil } + +// EnterSleepMode enables or disables sleep mode for all the modules. +func EnterSleepMode(enabled bool) { + // Check if differs with the old state. + set := sleepMode.SetToIf(!enabled, enabled) + if !set { + return + } + + // Update all modules + for _, m := range modules { + m.Sleep(enabled) + } + + // Send signal to the task schedular. + if !enabled { + taskSchedulerSleepModeExitChannel <- struct{}{} + } +} diff --git a/modules/sleepyticker.go b/modules/sleepyticker.go new file mode 100644 index 00000000..510c35ef --- /dev/null +++ b/modules/sleepyticker.go @@ -0,0 +1,66 @@ +package modules + +import "time" + +type SleepyTicker struct { + ticker time.Ticker + module *Module + normalDuration time.Duration + sleepDuration time.Duration + sleepMode bool +} + +// newSleepyTicker returns a new SleepyTicker. This is a wrapper of the standard time.Ticker but it respects modules.Module sleep mode. Check https://pkg.go.dev/time#Ticker. +// If sleepDuration is set to 0 ticker will not tick during sleep. +func newSleepyTicker(module *Module, normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker { + st := &SleepyTicker{ + ticker: *time.NewTicker(normalDuration), + module: module, + normalDuration: normalDuration, + sleepDuration: sleepDuration, + sleepMode: false, + } + + return st +} + +// Read waits until the module is not in sleep mode and returns time.Ticker.C channel. +func (st *SleepyTicker) Read() <-chan time.Time { + sleepModeEnabled := st.module.sleepMode.IsSet() + + // Update Sleep mode + if sleepModeEnabled != st.sleepMode { + st.enterSleepMode(sleepModeEnabled) + } + + // Wait if until sleep mode exits only if sleepDuration is set to 0. + if sleepModeEnabled { + if st.sleepDuration == 0 { + return st.module.WaitIfSleeping() + } + } + + return st.ticker.C +} + +// Stop turns off a ticker. After Stop, no more ticks will be sent. Stop does not close the channel, to prevent a concurrent goroutine reading from the channel from seeing an erroneous "tick". +func (st *SleepyTicker) Stop() { + st.ticker.Stop() +} + +// Reset stops a ticker and resets its period to the specified duration. The next tick will arrive after the new period elapses. The duration d must be greater than zero; if not, Reset will panic. +func (st *SleepyTicker) Reset(d time.Duration) { + // Reset standard ticker + st.ticker.Reset(d) +} + +func (st *SleepyTicker) enterSleepMode(enabled bool) { + st.sleepMode = enabled + if enabled { + if st.sleepDuration > 0 { + st.ticker.Reset(st.sleepDuration) + } + } else { + st.ticker.Reset(st.normalDuration) + } +} diff --git a/modules/tasks.go b/modules/tasks.go index dbc0252d..f27fd95f 100644 --- a/modules/tasks.go +++ b/modules/tasks.go @@ -443,6 +443,10 @@ func (t *Task) addToSchedule(overtime bool) { } func waitUntilNextScheduledTask() <-chan time.Time { + if sleepMode.IsSet() { + <-taskSchedulerSleepModeExitChannel + } + scheduleLock.Lock() defer scheduleLock.Unlock() diff --git a/notifications/cleaner.go b/notifications/cleaner.go index f9e2ffdb..b0ac9800 100644 --- a/notifications/cleaner.go +++ b/notifications/cleaner.go @@ -6,14 +6,14 @@ import ( ) func cleaner(ctx context.Context) error { //nolint:unparam // Conforms to worker interface - ticker := time.NewTicker(1 * time.Second) + ticker := module.NewSleepyTicker(1*time.Second, 0) defer ticker.Stop() for { select { case <-ctx.Done(): return nil - case <-ticker.C: + case <-ticker.Read(): deleteExpiredNotifs() } } diff --git a/utils/broadcastflag.go b/utils/broadcastflag.go index 51c79f2e..ea6c7a48 100644 --- a/utils/broadcastflag.go +++ b/utils/broadcastflag.go @@ -23,7 +23,7 @@ type Flag struct { } // NewBroadcastFlag returns a new BroadcastFlag. -// In the initial state, the flag is not set and the singal does not trigger. +// In the initial state, the flag is not set and the signal does not trigger. func NewBroadcastFlag() *BroadcastFlag { return &BroadcastFlag{ flag: abool.New(), @@ -33,7 +33,7 @@ func NewBroadcastFlag() *BroadcastFlag { } // NewFlag returns a new Flag that listens to this broadcasting flag. -// In the initial state, the flag is set and the singal triggers. +// In the initial state, the flag is set and the signal triggers. // You can call Refresh immediately to get the current state from the // broadcasting flag. func (bf *BroadcastFlag) NewFlag() *Flag {