Skip to content

Commit

Permalink
dispatch: Fix initial alerts not honoring group_wait
Browse files Browse the repository at this point in the history
At initial startup of Alertmanager, old alerts will be sent to the
receivers immediately as the start time for those alerts could be
several days old in some cases (and in either way much older than the
group_wait time)

This is problematic for alerts that are supposed to be inhibited. If the
old inhibited alert gets processed before the alert that is supposed to
inhibit it, it will get sent to the receiver and cause unwanted noise.

One approach to combat this is to always wait at least the group_wait
duration for a new alert group, even if the alert is very old. This
should make things a bit more stable as it gives all alerts a fighting
chance to come in before we send out notifications

Signed-off-by: Alexander Rickardsson <[email protected]>
  • Loading branch information
alxric committed Dec 9, 2022
1 parent 9ae6113 commit c225982
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 21 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ type Route struct {
GroupWait *model.Duration `yaml:"group_wait,omitempty" json:"group_wait,omitempty"`
GroupInterval *model.Duration `yaml:"group_interval,omitempty" json:"group_interval,omitempty"`
RepeatInterval *model.Duration `yaml:"repeat_interval,omitempty" json:"repeat_interval,omitempty"`
WaitOnStartup bool `yaml:"wait_on_startup" json:"wait_on_startup,omitempty"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface for Route.
Expand Down
13 changes: 12 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ receivers:
func TestTimeIntervalHasName(t *testing.T) {
in := `
time_intervals:
- name:
- name:
time_intervals:
- times:
- start_time: '09:00'
Expand Down Expand Up @@ -1009,6 +1009,17 @@ func TestGroupByAll(t *testing.T) {
}
}

func TestWaitOnStartup(t *testing.T) {
c, err := LoadFile("testdata/conf.wait-on-startup.yml")
if err != nil {
t.Fatalf("Error parsing %s: %s", "testdata/conf.wait-on-startup.yml", err)
}

if !c.Route.WaitOnStartup {
t.Errorf("Invalid wait on startup param: expected to be true")
}
}

func TestVictorOpsDefaultAPIKey(t *testing.T) {
conf, err := LoadFile("testdata/conf.victorops-default-apikey.yml")
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions config/testdata/conf.wait-on-startup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
route:
group_wait: 30s
group_interval: 5m
repeat_interval: 3h
receiver: team-X
wait_on_startup: True
receivers:
- name: 'team-X'
50 changes: 32 additions & 18 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ type Dispatcher struct {
ctx context.Context
cancel func()

logger log.Logger
logger log.Logger
startTime time.Time
}

// Limits describes limits used by Dispatcher.
Expand All @@ -117,13 +118,14 @@ func NewDispatcher(
}

disp := &Dispatcher{
alerts: ap,
stage: s,
route: r,
timeout: to,
logger: log.With(l, "component", "dispatcher"),
metrics: m,
limits: lim,
alerts: ap,
stage: s,
route: r,
timeout: to,
logger: log.With(l, "component", "dispatcher"),
metrics: m,
limits: lim,
startTime: time.Now(),
}
return disp
}
Expand Down Expand Up @@ -329,7 +331,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
return
}

ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger, d.startTime)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()
Expand Down Expand Up @@ -384,20 +386,22 @@ type aggrGroup struct {

mtx sync.RWMutex
hasFlushed bool
startTime time.Time
}

// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger) *aggrGroup {
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger, startTime time.Time) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
ag := &aggrGroup{
labels: labels,
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
done: make(chan struct{}),
labels: labels,
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
done: make(chan struct{}),
startTime: startTime,
}
ag.ctx, ag.cancel = context.WithCancel(ctx)

Expand Down Expand Up @@ -472,17 +476,27 @@ func (ag *aggrGroup) stop() {
<-ag.done
}

// check if we want to wait on initial startup before sending notification
func (ag *aggrGroup) shouldWaitOnStartup(now time.Time) bool {
return !ag.opts.WaitOnStartup || ag.startTime.Add(ag.opts.GroupWait).Before(now)
}

// check if we want alertgroup timer to reset
func (ag *aggrGroup) shouldReset(alert *types.Alert) bool {
now := time.Now()
return !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(now) && ag.shouldWaitOnStartup(now)
}

// insert inserts the alert into the aggregation group.
func (ag *aggrGroup) insert(alert *types.Alert) {
if err := ag.alerts.Set(alert); err != nil {
level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
}

// Immediately trigger a flush if the wait duration for this
// alert is already over.
ag.mtx.Lock()
defer ag.mtx.Unlock()
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
if ag.shouldReset(alert) {
ag.next.Reset(0)
}
}
Expand Down
28 changes: 26 additions & 2 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestAggrGroup(t *testing.T) {
}

// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now())
go ag.run(ntfy)

ag.insert(a1)
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestAggrGroup(t *testing.T) {
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now())
go ag.run(ntfy)

ag.insert(a1)
Expand Down Expand Up @@ -267,6 +267,30 @@ func TestAggrGroup(t *testing.T) {
}

ag.stop()

// Ensure WaitOnStartup is being honored
opts.WaitOnStartup = true
route = &Route{
RouteOpts: *opts,
}
ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now())
go ag.run(ntfy)

ag.insert(a1)

select {
case <-time.After(opts.GroupWait * 2):
t.Fatalf("Expected alert to be dealt with after group_wait but it has not been handled yet")

case batch := <-alertsCh:
exp := removeEndsAt(types.AlertSlice{a1})
sort.Sort(batch)
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alert %v but got %v", exp, batch)
}
}

ag.stop()
}

func TestGroupLabels(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions dispatch/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var DefaultRouteOpts = RouteOpts{
GroupBy: map[model.LabelName]struct{}{},
GroupByAll: false,
MuteTimeIntervals: []string{},
WaitOnStartup: false,
}

// A Route is a node that contains definitions of how to handle alerts.
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewRoute(cr *config.Route, parent *Route) *Route {
if cr.RepeatInterval != nil {
opts.RepeatInterval = time.Duration(*cr.RepeatInterval)
}
opts.WaitOnStartup = cr.WaitOnStartup

// Build matchers.
var matchers labels.Matchers
Expand Down Expand Up @@ -211,6 +213,9 @@ type RouteOpts struct {

// A list of time intervals for which the route is active.
ActiveTimeIntervals []string

// Honor the group_wait on initial startup even if incoming alerts are old
WaitOnStartup bool
}

func (ro *RouteOpts) String() string {
Expand All @@ -231,12 +236,14 @@ func (ro *RouteOpts) MarshalJSON() ([]byte, error) {
GroupWait time.Duration `json:"groupWait"`
GroupInterval time.Duration `json:"groupInterval"`
RepeatInterval time.Duration `json:"repeatInterval"`
WaitOnStartup bool `json:"waitOnStartup"`
}{
Receiver: ro.Receiver,
GroupByAll: ro.GroupByAll,
GroupWait: ro.GroupWait,
GroupInterval: ro.GroupInterval,
RepeatInterval: ro.RepeatInterval,
WaitOnStartup: ro.WaitOnStartup,
}
for ln := range ro.GroupBy {
v.GroupBy = append(v.GroupBy, ln)
Expand Down

0 comments on commit c225982

Please sign in to comment.