Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dispatch: Fix initial alerts not honoring group_wait #3167

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ type Route struct {
GroupWait *model.Duration `yaml:"group_wait,omitempty" json:"group_wait,omitempty"`
GroupInterval *model.Duration `yaml:"group_interval,omitempty" json:"group_interval,omitempty"`
RepeatInterval *model.Duration `yaml:"repeat_interval,omitempty" json:"repeat_interval,omitempty"`
WaitOnStartup bool `yaml:"wait_on_startup" json:"wait_on_startup,omitempty"`
}

// UnmarshalYAML implements the yaml.Unmarshaler interface for Route.
Expand Down
13 changes: 12 additions & 1 deletion config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ receivers:
func TestTimeIntervalHasName(t *testing.T) {
in := `
time_intervals:
- name:
- name:
time_intervals:
- times:
- start_time: '09:00'
Expand Down Expand Up @@ -1009,6 +1009,17 @@ func TestGroupByAll(t *testing.T) {
}
}

func TestWaitOnStartup(t *testing.T) {
c, err := LoadFile("testdata/conf.wait-on-startup.yml")
if err != nil {
t.Fatalf("Error parsing %s: %s", "testdata/conf.wait-on-startup.yml", err)
}

if !c.Route.WaitOnStartup {
t.Errorf("Invalid wait on startup param: expected to be true")
}
}

func TestVictorOpsDefaultAPIKey(t *testing.T) {
conf, err := LoadFile("testdata/conf.victorops-default-apikey.yml")
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions config/testdata/conf.wait-on-startup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
route:
group_wait: 30s
group_interval: 5m
repeat_interval: 3h
receiver: team-X
wait_on_startup: True
receivers:
- name: 'team-X'
55 changes: 37 additions & 18 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ type Dispatcher struct {
ctx context.Context
cancel func()

logger log.Logger
logger log.Logger
startTime time.Time
}

// Limits describes limits used by Dispatcher.
Expand All @@ -117,13 +118,14 @@ func NewDispatcher(
}

disp := &Dispatcher{
alerts: ap,
stage: s,
route: r,
timeout: to,
logger: log.With(l, "component", "dispatcher"),
metrics: m,
limits: lim,
alerts: ap,
stage: s,
route: r,
timeout: to,
logger: log.With(l, "component", "dispatcher"),
metrics: m,
limits: lim,
startTime: time.Now(),
}
return disp
}
Expand Down Expand Up @@ -329,7 +331,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
return
}

ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger, d.startTime)
routeGroups[fp] = ag
d.aggrGroupsNum++
d.metrics.aggrGroups.Inc()
Expand Down Expand Up @@ -384,20 +386,22 @@ type aggrGroup struct {

mtx sync.RWMutex
hasFlushed bool
startTime time.Time
}

// newAggrGroup returns a new aggregation group.
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger) *aggrGroup {
func newAggrGroup(ctx context.Context, labels model.LabelSet, r *Route, to func(time.Duration) time.Duration, logger log.Logger, startTime time.Time) *aggrGroup {
if to == nil {
to = func(d time.Duration) time.Duration { return d }
}
ag := &aggrGroup{
labels: labels,
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
done: make(chan struct{}),
labels: labels,
routeKey: r.Key(),
opts: &r.RouteOpts,
timeout: to,
alerts: store.NewAlerts(),
done: make(chan struct{}),
startTime: startTime,
}
ag.ctx, ag.cancel = context.WithCancel(ctx)

Expand Down Expand Up @@ -472,17 +476,32 @@ func (ag *aggrGroup) stop() {
<-ag.done
}

// check if we want to wait on initial startup before sending notification
func (ag *aggrGroup) shouldWaitOnStartup() bool {
now := time.Now()
return !ag.opts.WaitOnStartup || ag.startTime.Add(ag.opts.GroupWait).Before(now)
}

func (ag *aggrGroup) shouldWaitForGroup(alert *types.Alert) bool {
now := time.Now()
return alert.StartsAt.Add(ag.opts.GroupWait).Before(now)
}

// check if we want alertgroup timer to reset
func (ag *aggrGroup) shouldReset(alert *types.Alert) bool {
return !ag.hasFlushed && ag.shouldWaitForGroup(alert) && ag.shouldWaitOnStartup()
}

// insert inserts the alert into the aggregation group.
func (ag *aggrGroup) insert(alert *types.Alert) {
if err := ag.alerts.Set(alert); err != nil {
level.Error(ag.logger).Log("msg", "error on set alert", "err", err)
}

// Immediately trigger a flush if the wait duration for this
// alert is already over.
ag.mtx.Lock()
defer ag.mtx.Unlock()
if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) {
if ag.shouldReset(alert) {
ag.next.Reset(0)
}
}
Expand Down
28 changes: 26 additions & 2 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestAggrGroup(t *testing.T) {
}

// Test regular situation where we wait for group_wait to send out alerts.
ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
ag := newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now())
go ag.run(ntfy)

ag.insert(a1)
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestAggrGroup(t *testing.T) {
// immediate flushing.
// Finally, set all alerts to be resolved. After successful notify the aggregation group
// should empty itself.
ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger())
ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now())
go ag.run(ntfy)

ag.insert(a1)
Expand Down Expand Up @@ -267,6 +267,30 @@ func TestAggrGroup(t *testing.T) {
}

ag.stop()

// Ensure WaitOnStartup is being honored
opts.WaitOnStartup = true
route = &Route{
RouteOpts: *opts,
}
ag = newAggrGroup(context.Background(), lset, route, nil, log.NewNopLogger(), time.Now())
go ag.run(ntfy)

ag.insert(a1)

select {
case <-time.After(opts.GroupWait * 2):
t.Fatalf("Expected alert to be dealt with after group_wait but it has not been handled yet")

case batch := <-alertsCh:
exp := removeEndsAt(types.AlertSlice{a1})
sort.Sort(batch)
if !reflect.DeepEqual(batch, exp) {
t.Fatalf("expected alert %v but got %v", exp, batch)
}
}

ag.stop()
}

func TestGroupLabels(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions dispatch/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var DefaultRouteOpts = RouteOpts{
GroupBy: map[model.LabelName]struct{}{},
GroupByAll: false,
MuteTimeIntervals: []string{},
WaitOnStartup: false,
}

// A Route is a node that contains definitions of how to handle alerts.
Expand Down Expand Up @@ -88,6 +89,7 @@ func NewRoute(cr *config.Route, parent *Route) *Route {
if cr.RepeatInterval != nil {
opts.RepeatInterval = time.Duration(*cr.RepeatInterval)
}
opts.WaitOnStartup = cr.WaitOnStartup

// Build matchers.
var matchers labels.Matchers
Expand Down Expand Up @@ -211,6 +213,9 @@ type RouteOpts struct {

// A list of time intervals for which the route is active.
ActiveTimeIntervals []string

// Honor the group_wait on initial startup even if incoming alerts are old
WaitOnStartup bool
}

func (ro *RouteOpts) String() string {
Expand All @@ -231,12 +236,14 @@ func (ro *RouteOpts) MarshalJSON() ([]byte, error) {
GroupWait time.Duration `json:"groupWait"`
GroupInterval time.Duration `json:"groupInterval"`
RepeatInterval time.Duration `json:"repeatInterval"`
WaitOnStartup bool `json:"waitOnStartup"`
}{
Receiver: ro.Receiver,
GroupByAll: ro.GroupByAll,
GroupWait: ro.GroupWait,
GroupInterval: ro.GroupInterval,
RepeatInterval: ro.RepeatInterval,
WaitOnStartup: ro.WaitOnStartup,
}
for ln := range ro.GroupBy {
v.GroupBy = append(v.GroupBy, ln)
Expand Down