diff --git a/config/config.go b/config/config.go index 7f3602e066..c178bef500 100644 --- a/config/config.go +++ b/config/config.go @@ -794,6 +794,7 @@ type Route struct { GroupWait *model.Duration `yaml:"group_wait,omitempty" json:"group_wait,omitempty"` GroupInterval *model.Duration `yaml:"group_interval,omitempty" json:"group_interval,omitempty"` RepeatInterval *model.Duration `yaml:"repeat_interval,omitempty" json:"repeat_interval,omitempty"` + WaitOnStartup bool `yaml:"wait_on_startup" json:"wait_on_startup,omitempty"` } // UnmarshalYAML implements the yaml.Unmarshaler interface for Route. diff --git a/config/config_test.go b/config/config_test.go index 7631d37cb7..f558054f62 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -200,7 +200,7 @@ receivers: func TestTimeIntervalHasName(t *testing.T) { in := ` time_intervals: -- name: +- name: time_intervals: - times: - start_time: '09:00' @@ -1010,6 +1010,17 @@ func TestGroupByAll(t *testing.T) { } } +func TestWaitOnStartup(t *testing.T) { + c, err := LoadFile("testdata/conf.wait-on-startup.yml") + if err != nil { + t.Fatalf("Error parsing %s: %s", "testdata/conf.wait-on-startup.yml", err) + } + + if !c.Route.WaitOnStartup { + t.Errorf("Invalid wait on startup param: expected to be true") + } +} + func TestVictorOpsDefaultAPIKey(t *testing.T) { conf, err := LoadFile("testdata/conf.victorops-default-apikey.yml") if err != nil { diff --git a/config/testdata/conf.wait-on-startup.yml b/config/testdata/conf.wait-on-startup.yml new file mode 100644 index 0000000000..7a4f3b520d --- /dev/null +++ b/config/testdata/conf.wait-on-startup.yml @@ -0,0 +1,8 @@ +route: + group_wait: 30s + group_interval: 5m + repeat_interval: 3h + receiver: team-X + wait_on_startup: True +receivers: + - name: 'team-X' diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 640b22abe2..527e9eed0e 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -91,7 +91,8 @@ type Dispatcher struct { ctx context.Context cancel func() - logger log.Logger + logger log.Logger + startTime time.Time } // Limits describes limits used by Dispatcher. @@ -118,13 +119,14 @@ func NewDispatcher( } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - timeout: to, - logger: log.With(l, "component", "dispatcher"), - metrics: m, - limits: lim, + alerts: ap, + stage: s, + route: r, + timeout: to, + logger: log.With(l, "component", "dispatcher"), + metrics: m, + limits: lim, + startTime: time.Now(), } return disp } @@ -330,7 +332,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { return } - ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger) + ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger, d.startTime) routeGroups[fp] = ag d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() @@ -385,20 +387,22 @@ type aggrGroup struct { mtx sync.RWMutex hasFlushed bool + startTime time.Time } // newAggrGroup returns a new aggregation group. -func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger) *aggrGroup { +func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger, startTime time.Time) *aggrGroup { if to == nil { to = func(d time.Duration) time.Duration { return d } } ag := &aggrGroup{ - labels: labels, - routeKey: r.Key(), - opts: &r.RouteOpts, - timeout: to, - alerts: store.NewAlerts(), - done: make(chan struct{}), + labels: labels, + routeKey: r.Key(), + opts: &r.RouteOpts, + timeout: to, + alerts: store.NewAlerts(), + done: make(chan struct{}), + startTime: startTime, } ag.ctx, ag.cancel = context.WithCancel(ctx) @@ -473,17 +477,32 @@ func (ag *aggrGroup) stop() { <-ag.done } +// check if we want to wait on initial startup before sending notification +func (ag *aggrGroup) shouldWaitOnStartup() bool { + now := time.Now() + return !ag.opts.WaitOnStartup || ag.startTime.Add(ag.opts.GroupWait).Before(now) +} + +func (ag *aggrGroup) shouldWaitForGroup(alert *types.Alert) bool { + now := time.Now() + return alert.StartsAt.Add(ag.opts.GroupWait).Before(now) +} + +// check if we want alertgroup timer to reset +func (ag *aggrGroup) shouldReset(alert *types.Alert) bool { + return !ag.hasFlushed && ag.shouldWaitForGroup(alert) && ag.shouldWaitOnStartup() +} + // insert inserts the alert into the aggregation group. func (ag *aggrGroup) insert(alert *types.Alert) { if err := ag.alerts.Set(alert); err != nil { level.Error(ag.logger).Log("msg", "error on set alert", "err", err) } - // Immediately trigger a flush if the wait duration for this // alert is already over. ag.mtx.Lock() defer ag.mtx.Unlock() - if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { + if ag.shouldReset(alert) { ag.next.Reset(0) } } diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 85bd62dc4d..5ed10fb955 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -138,7 +138,7 @@ func TestAggrGroup(t *testing.T) { } // Test regular situation where we wait for group_wait to send out alerts. - ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger()) + ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now()) go ag.run(ntfy) ag.insert(a1) @@ -192,7 +192,7 @@ func TestAggrGroup(t *testing.T) { // immediate flushing. // Finally, set all alerts to be resolved. After successful notify the aggregation group // should empty itself. - ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger()) + ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now()) go ag.run(ntfy) ag.insert(a1) @@ -267,6 +267,30 @@ func TestAggrGroup(t *testing.T) { } ag.stop() + + // Ensure WaitOnStartup is being honored + opts.WaitOnStartup = true + route = &Route{ + RouteOpts: *opts, + } + ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now()) + go ag.run(ntfy) + + ag.insert(a1) + + select { + case <-time.After(opts.GroupWait * 2): + t.Fatalf("Expected alert to be dealt with after group_wait but it has not been handled yet") + + case batch := <-alertsCh: + exp := removeEndsAt(types.AlertSlice{a1}) + sort.Sort(batch) + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alert %v but got %v", exp, batch) + } + } + + ag.stop() } func TestGroupLabels(t *testing.T) { diff --git a/dispatch/route.go b/dispatch/route.go index 5ada178dab..3e18d2dd35 100644 --- a/dispatch/route.go +++ b/dispatch/route.go @@ -35,6 +35,7 @@ var DefaultRouteOpts = RouteOpts{ GroupBy: map[model.LabelName]struct{}{}, GroupByAll: false, MuteTimeIntervals: []string{}, + WaitOnStartup: false, } // A Route is a node that contains definitions of how to handle alerts. @@ -88,6 +89,7 @@ func NewRoute(cr *config.Route, parent *Route) *Route { if cr.RepeatInterval != nil { opts.RepeatInterval = time.Duration(*cr.RepeatInterval) } + opts.WaitOnStartup = cr.WaitOnStartup // Build matchers. var matchers labels.Matchers @@ -234,6 +236,9 @@ type RouteOpts struct { // A list of time intervals for which the route is active. ActiveTimeIntervals []string + + // Honor the group_wait on initial startup even if incoming alerts are old + WaitOnStartup bool } func (ro *RouteOpts) String() string { @@ -254,12 +259,14 @@ func (ro *RouteOpts) MarshalJSON() ([]byte, error) { GroupWait time.Duration `json:"groupWait"` GroupInterval time.Duration `json:"groupInterval"` RepeatInterval time.Duration `json:"repeatInterval"` + WaitOnStartup bool `json:"waitOnStartup"` }{ Receiver: ro.Receiver, GroupByAll: ro.GroupByAll, GroupWait: ro.GroupWait, GroupInterval: ro.GroupInterval, RepeatInterval: ro.RepeatInterval, + WaitOnStartup: ro.WaitOnStartup, } for ln := range ro.GroupBy { v.GroupBy = append(v.GroupBy, ln)