Skip to content

Commit

Permalink
Add support for module sleep mode
Browse files Browse the repository at this point in the history
  • Loading branch information
vlabo committed Apr 19, 2023
1 parent 98574e4 commit 0ed865f
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 6 deletions.
4 changes: 2 additions & 2 deletions metrics/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,14 @@ func writeMetricsTo(ctx context.Context, url string) error {

func metricsWriter(ctx context.Context) error {
pushURL := pushOption()
ticker := time.NewTicker(1 * time.Minute)
ticker := module.NewSleepyTicker(1*time.Minute, 0)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
case <-ticker.Read():
err := writeMetricsTo(ctx, pushURL)
if err != nil {
return err
Expand Down
63 changes: 63 additions & 0 deletions modules/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var (
// modulesLocked locks `modules` during starting.
modulesLocked = abool.New()

sleepMode = abool.NewBool(false)
taskSchedulerSleepModeExitChannel = make(chan struct{})

moduleStartTimeout = 2 * time.Minute
moduleStopTimeout = 1 * time.Minute

Expand All @@ -37,6 +40,8 @@ type Module struct {
enabled *abool.AtomicBool
enabledAsDependency *abool.AtomicBool
status uint8
sleepMode *abool.AtomicBool
sleepWaitingChannel chan time.Time

// failure status
failureStatus uint8
Expand Down Expand Up @@ -100,6 +105,41 @@ func (m *Module) Dependencies() []*Module {
return m.depModules
}

// Sleep enables or disables sleep mode.
func (m *Module) Sleep(enable bool) {
set := m.sleepMode.SetToIf(!enable, enable)
if !set {
return
}

// Notify all waiting tasks that we are not sleeping anymore.
m.Lock()
defer m.Unlock()

if enable {
m.sleepWaitingChannel = make(chan time.Time)
} else {
close(m.sleepWaitingChannel)
}
}

// IsSleeping returns true if sleep mode is enabled.
func (m *Module) IsSleeping() bool {
return m.sleepMode.IsSet()
}

// WaitIfSleeping returns channel that will signal when it exits sleep mode.
func (m *Module) WaitIfSleeping() <-chan time.Time {
m.RLock()
defer m.RUnlock()
return m.sleepWaitingChannel
}

// NewSleepyTicker returns new sleepyTicker that will respect the modules sleep mode.
func (m *Module) NewSleepyTicker(normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker {
return newSleepyTicker(m, normalDuration, sleepDuration)
}

func (m *Module) prep(reports chan *report) {
// check and set intermediate status
m.Lock()
Expand Down Expand Up @@ -343,6 +383,8 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
Name: name,
enabled: abool.NewBool(false),
enabledAsDependency: abool.NewBool(false),
sleepMode: abool.NewBool(false),
sleepWaitingChannel: make(chan time.Time),
prepFn: prep,
startFn: start,
stopFn: stop,
Expand All @@ -358,6 +400,8 @@ func initNewModule(name string, prep, start, stop func() error, dependencies ...
depNames: dependencies,
}

newModule.Sleep(false)

return newModule
}

Expand All @@ -380,3 +424,22 @@ func initDependencies() error {

return nil
}

// EnterSleepMode enables or disables sleep mode for all the modules.
func EnterSleepMode(enabled bool) {
// Check if differs with the old state.
set := sleepMode.SetToIf(!enabled, enabled)
if !set {
return
}

// Update all modules
for _, m := range modules {
m.Sleep(enabled)
}

// Send signal to the task schedular.
if !enabled {
taskSchedulerSleepModeExitChannel <- struct{}{}
}
}
66 changes: 66 additions & 0 deletions modules/sleepyticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package modules

import "time"

type SleepyTicker struct {
ticker time.Ticker
module *Module
normalDuration time.Duration
sleepDuration time.Duration
sleepMode bool
}

// newSleepyTicker returns a new SleepyTicker. This is a wrapper of the standard time.Ticker but it respects modules.Module sleep mode. Check https://pkg.go.dev/time#Ticker.
// If sleepDuration is set to 0 ticker will not tick during sleep.
func newSleepyTicker(module *Module, normalDuration time.Duration, sleepDuration time.Duration) *SleepyTicker {
st := &SleepyTicker{
ticker: *time.NewTicker(normalDuration),
module: module,
normalDuration: normalDuration,
sleepDuration: sleepDuration,
sleepMode: false,
}

return st
}

// Read waits until the module is not in sleep mode and returns time.Ticker.C channel.
func (st *SleepyTicker) Read() <-chan time.Time {
sleepModeEnabled := st.module.sleepMode.IsSet()

// Update Sleep mode
if sleepModeEnabled != st.sleepMode {
st.enterSleepMode(sleepModeEnabled)
}

// Wait if until sleep mode exits only if sleepDuration is set to 0.
if sleepModeEnabled {
if st.sleepDuration == 0 {
return st.module.WaitIfSleeping()
}
}

return st.ticker.C
}

// Stop turns off a ticker. After Stop, no more ticks will be sent. Stop does not close the channel, to prevent a concurrent goroutine reading from the channel from seeing an erroneous "tick".
func (st *SleepyTicker) Stop() {
st.ticker.Stop()
}

// Reset stops a ticker and resets its period to the specified duration. The next tick will arrive after the new period elapses. The duration d must be greater than zero; if not, Reset will panic.
func (st *SleepyTicker) Reset(d time.Duration) {
// Reset standard ticker
st.ticker.Reset(d)
}

func (st *SleepyTicker) enterSleepMode(enabled bool) {
st.sleepMode = enabled
if enabled {
if st.sleepDuration > 0 {
st.ticker.Reset(st.sleepDuration)
}
} else {
st.ticker.Reset(st.normalDuration)
}
}
4 changes: 4 additions & 0 deletions modules/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,10 @@ func (t *Task) addToSchedule(overtime bool) {
}

func waitUntilNextScheduledTask() <-chan time.Time {
if sleepMode.IsSet() {
<-taskSchedulerSleepModeExitChannel
}

scheduleLock.Lock()
defer scheduleLock.Unlock()

Expand Down
4 changes: 2 additions & 2 deletions notifications/cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
)

func cleaner(ctx context.Context) error { //nolint:unparam // Conforms to worker interface
ticker := time.NewTicker(1 * time.Second)
ticker := module.NewSleepyTicker(1*time.Second, 0)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
case <-ticker.Read():
deleteExpiredNotifs()
}
}
Expand Down
4 changes: 2 additions & 2 deletions utils/broadcastflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Flag struct {
}

// NewBroadcastFlag returns a new BroadcastFlag.
// In the initial state, the flag is not set and the singal does not trigger.
// In the initial state, the flag is not set and the signal does not trigger.
func NewBroadcastFlag() *BroadcastFlag {
return &BroadcastFlag{
flag: abool.New(),
Expand All @@ -33,7 +33,7 @@ func NewBroadcastFlag() *BroadcastFlag {
}

// NewFlag returns a new Flag that listens to this broadcasting flag.
// In the initial state, the flag is set and the singal triggers.
// In the initial state, the flag is set and the signal triggers.
// You can call Refresh immediately to get the current state from the
// broadcasting flag.
func (bf *BroadcastFlag) NewFlag() *Flag {
Expand Down

0 comments on commit 0ed865f

Please sign in to comment.