Skip to content

Commit

Permalink
refactor(notifier): rescheduling delay (#1046)
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksandrMatsko authored Jul 11, 2024
1 parent 9031d99 commit 2aa786d
Show file tree
Hide file tree
Showing 16 changed files with 382 additions and 125 deletions.
2 changes: 1 addition & 1 deletion cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ func openFile(filePath string, mode int) (*os.File, error) {
if filePath == "" {
return nil, fmt.Errorf("file is not specified")
}
file, err := os.OpenFile(filePath, mode, 0666) //nolint:gofumpt,gomnd
file, err := os.OpenFile(filePath, mode, 0o666) //nolint:gofumpt,gomnd
if err != nil {
return nil, fmt.Errorf("cannot open file: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (config *GraphiteRemoteConfig) GetRemoteSourceSettings() *graphiteRemoteSou
}
}

// GraphiteRemoteConfig is remote prometheus settings structure.
// PrometheusRemoteConfig is remote prometheus settings structure.
type PrometheusRemoteConfig struct {
RemoteCommonConfig `yaml:",inline"`
// Timeout for prometheus api requests
Expand All @@ -277,7 +277,7 @@ func (config PrometheusRemoteConfig) getRemoteCommon() *RemoteCommonConfig {
return &config.RemoteCommonConfig
}

// GetRemoteSourceSettings returns remote config parsed from moira config files.
// GetPrometheusSourceSettings returns remote config parsed from moira config files.
func (config *PrometheusRemoteConfig) GetPrometheusSourceSettings() *prometheusRemoteSource.Config {
return &prometheusRemoteSource.Config{
URL: config.URL,
Expand Down
10 changes: 7 additions & 3 deletions cmd/notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type notifierConfig struct {
SenderTimeout string `yaml:"sender_timeout"`
// Hard timeout to stop retrying to send notification after multiple failed attempts
ResendingTimeout string `yaml:"resending_timeout"`
// Delay before performing one more send attempt
ReschedulingDelay string `yaml:"rescheduling_delay"`
// Senders configuration section. See https://moira.readthedocs.io/en/latest/installation/configuration.html for more explanation
Senders []map[string]interface{} `yaml:"senders"`
// Self state monitor configuration section. Note: No inner subscriptions is required. It's own notification mechanism will be used.
Expand Down Expand Up @@ -94,15 +96,16 @@ func getDefault() config {
NotificationHistoryQueryLimit: int(notifier.NotificationsLimitUnlimited),
},
Notification: cmd.NotificationConfig{
DelayedTime: "1m",
DelayedTime: "50s",
TransactionTimeout: "100ms",
TransactionMaxRetries: 10,
TransactionHeuristicLimit: 10000,
ResaveTime: "30s",
},
Notifier: notifierConfig{
SenderTimeout: "10s",
ResendingTimeout: "1:00",
SenderTimeout: "10s",
ResendingTimeout: "1:00",
ReschedulingDelay: "60s",
SelfState: selfStateConfig{
Enabled: false,
RedisDisconnectDelay: "30s",
Expand Down Expand Up @@ -191,6 +194,7 @@ func (config *notifierConfig) getSettings(logger moira.Logger) notifier.Config {
SelfStateContacts: config.SelfState.Contacts,
SendingTimeout: to.Duration(config.SenderTimeout),
ResendingTimeout: to.Duration(config.ResendingTimeout),
ReschedulingDelay: to.Duration(config.ReschedulingDelay),
Senders: config.Senders,
FrontURL: config.FrontURI,
Location: location,
Expand Down
24 changes: 19 additions & 5 deletions cmd/notifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"os/signal"
"syscall"

"github.com/moira-alert/moira/clock"

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/cmd"
"github.com/moira-alert/moira/database/redis"
Expand Down Expand Up @@ -91,6 +93,11 @@ func main() {

notifierConfig := config.Notifier.getSettings(logger)

systemClock := clock.NewSystemClock()
schedulerConfig := notifier.SchedulerConfig{
ReschedulingDelay: notifierConfig.ReschedulingDelay,
}

notifierMetrics := metrics.ConfigureNotifierMetrics(telemetry.Metrics, serviceName)
sender := notifier.NewNotifier(
database,
Expand All @@ -99,6 +106,8 @@ func main() {
notifierMetrics,
metricSourceProvider,
imageStoreMap,
systemClock,
schedulerConfig,
)

// Register moira senders
Expand Down Expand Up @@ -133,11 +142,16 @@ func main() {

// Start moira new events fetcher
fetchEventsWorker := &events.FetchEventsWorker{
Logger: logger,
Database: database,
Scheduler: notifier.NewScheduler(database, logger, notifierMetrics),
Metrics: notifierMetrics,
Config: notifierConfig,
Logger: logger,
Database: database,
Scheduler: notifier.NewScheduler(
database,
logger,
notifierMetrics,
schedulerConfig,
systemClock),
Metrics: notifierMetrics,
Config: notifierConfig,
}
fetchEventsWorker.Start()
defer stopFetchEvents(fetchEventsWorker)
Expand Down
13 changes: 12 additions & 1 deletion datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (notification *ScheduledNotification) Less(other Comparable) (bool, error)
return notification.Timestamp < otherNotification.Timestamp, nil
}

// IsDelayed checks if the notification is delayed, the difference between the send time and the create time
// IsDelayed checks if the notification is delayed, the difference between the send time and the creation time
// is greater than the delayedTime.
func (notification *ScheduledNotification) IsDelayed(delayedTime int64) bool {
return notification.CreatedAt != 0 && notification.Timestamp-notification.CreatedAt > delayedTime
Expand Down Expand Up @@ -905,3 +905,14 @@ func SetMaintenanceUserAndTime(maintenanceCheck MaintenanceCheck, maintenance in
}
maintenanceCheck.SetMaintenance(&maintenanceInfo, maintenance)
}

// SchedulerParams is the parameters for notifier.Scheduler essential for scheduling notification.
type SchedulerParams struct {
Event NotificationEvent
Trigger TriggerData
Contact ContactData
Plotting PlottingData
ThrottledOld bool
// SendFail is amount of failed send attempts
SendFail int
}
33 changes: 24 additions & 9 deletions integration_tests/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"
"time"

"github.com/moira-alert/moira/clock"

"github.com/golang/mock/gomock"
metricSource "github.com/moira-alert/moira/metric_source"
"github.com/moira-alert/moira/metric_source/local"
Expand All @@ -30,11 +32,12 @@ var (
)

var notifierConfig = notifier.Config{
SendingTimeout: time.Millisecond * 10,
ResendingTimeout: time.Hour * 24,
Location: location,
DateTimeFormat: dateTimeFormat,
ReadBatchSize: notifier.NotificationsLimitUnlimited,
SendingTimeout: time.Millisecond * 10,
ResendingTimeout: time.Hour * 24,
ReschedulingDelay: time.Minute,
Location: location,
DateTimeFormat: dateTimeFormat,
ReadBatchSize: notifier.NotificationsLimitUnlimited,
}

var shutdown = make(chan struct{})
Expand Down Expand Up @@ -119,13 +122,18 @@ func TestNotifier(t *testing.T) {

metricsSourceProvider := metricSource.CreateTestMetricSourceProvider(local.Create(database), nil, nil)

systemClock := clock.NewSystemClock()
schedulerConfig := notifier.SchedulerConfig{ReschedulingDelay: notifierConfig.ReschedulingDelay}

notifierInstance := notifier.NewNotifier(
database,
logger,
notifierConfig,
notifierMetrics,
metricsSourceProvider,
map[string]moira.ImageStore{},
systemClock,
schedulerConfig,
)

sender := mock_moira_alert.NewMockSender(mockCtrl)
Expand All @@ -141,10 +149,17 @@ func TestNotifier(t *testing.T) {
notifierInstance.RegisterSender(senderSettings, sender) //nolint

fetchEventsWorker := events.FetchEventsWorker{
Database: database,
Logger: logger,
Metrics: notifierMetrics,
Scheduler: notifier.NewScheduler(database, logger, notifierMetrics),
Database: database,
Logger: logger,
Metrics: notifierMetrics,
Scheduler: notifier.NewScheduler(
database,
logger,
notifierMetrics,
notifier.SchedulerConfig{
ReschedulingDelay: notifierConfig.ReschedulingDelay,
},
systemClock),
}

fetchNotificationsWorker := notifications.FetchNotificationsWorker{
Expand Down
3 changes: 2 additions & 1 deletion local/notifier.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ prometheus_remote:
notifier:
sender_timeout: 10s
resending_timeout: "1:00"
rescheduling_delay: 60s
senders: []
moira_selfstate:
enabled: false
Expand All @@ -45,7 +46,7 @@ notification_history:
ttl: 48h
query_limit: 10000
notification:
delayed_time: 1m
delayed_time: 50s
transaction_timeout: 100ms
transaction_max_retries: 10
transaction_heuristic_limit: 10000
Expand Down
4 changes: 2 additions & 2 deletions logging/zerolog_adapter/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ func getLogWriter(logFileName string) (io.Writer, error) {
}

logDir := filepath.Dir(logFileName)
if err := os.MkdirAll(logDir, 0755); err != nil { //nolint:gofumpt,gomnd
if err := os.MkdirAll(logDir, 0o755); err != nil { //nolint:gofumpt,gomnd
return nil, fmt.Errorf("can't create log directories %s: %s", logDir, err.Error())
}
logFile, err := os.OpenFile(logFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) //nolint:gofumpt,gomnd
logFile, err := os.OpenFile(logFileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o644) //nolint:gofumpt,gomnd
if err != nil {
return nil, fmt.Errorf("can't open log file %s: %s", logFileName, err.Error())
}
Expand Down
9 changes: 4 additions & 5 deletions mock/scheduler/scheduler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions notifier/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Config struct {
SelfStateContacts []map[string]string
SendingTimeout time.Duration
ResendingTimeout time.Duration
ReschedulingDelay time.Duration
Senders []map[string]interface{}
LogFile string
LogLevel string
Expand Down
11 changes: 9 additions & 2 deletions notifier/events/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ func (worker *FetchEventsWorker) processEvent(event moira.NotificationEvent) err
continue
}
event.SubscriptionID = &subscription.ID
notification := worker.Scheduler.ScheduleNotification(time.Now(), event, triggerData,
contact, subscription.Plotting, false, 0, contactLogger)
params := moira.SchedulerParams{
Event: event,
Trigger: triggerData,
Contact: contact,
Plotting: subscription.Plotting,
ThrottledOld: false,
SendFail: 0,
}
notification := worker.Scheduler.ScheduleNotification(params, contactLogger)
key := notification.GetKey()
if _, exist := duplications[key]; !exist {
if err := worker.Database.AddNotification(notification); err != nil {
Expand Down
Loading

0 comments on commit 2aa786d

Please sign in to comment.