Skip to content

Commit

Permalink
types: Remove AlertSlice in favor of AlertsSnapshot
Browse files Browse the repository at this point in the history
Change the AlertGroup to use the AlertsSnapshot and the Group call
to take the snapshot time. Make changes to accomodate this.

Signed-off-by: Holger Hans Peter Freyther <[email protected]>
  • Loading branch information
zecke committed Jun 17, 2024
1 parent adfccfe commit d987ddd
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 93 deletions.
2 changes: 1 addition & 1 deletion api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
continue
}

alert := AlertToOpenAPIAlert(a, api.getAlertStatus(a.Fingerprint()), receivers)
alert := AlertToOpenAPIAlert(types.NewAlertSnapshot(a, now), api.getAlertStatus(a.Fingerprint()), receivers)

res = append(res, alert)
}
Expand Down
2 changes: 1 addition & 1 deletion api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestAlertToOpenAPIAlert(t *testing.T) {
UpdatedAt: updated,
}
)
openAPIAlert := AlertToOpenAPIAlert(alert, types.AlertStatus{State: types.AlertStateActive}, receivers)
openAPIAlert := AlertToOpenAPIAlert(types.NewAlertSnapshot(alert, time.Now()), types.AlertStatus{State: types.AlertStateActive}, receivers)
require.Equal(t, &open_api_models.GettableAlert{
Annotations: open_api_models.LabelSet{},
Alert: open_api_models.Alert{
Expand Down
2 changes: 1 addition & 1 deletion api/v2/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func PostableSilenceToProto(s *open_api_models.PostableSilence) (*silencepb.Sile
}

// AlertToOpenAPIAlert converts internal alerts, alert types, and receivers to *open_api_models.GettableAlert.
func AlertToOpenAPIAlert(alert *types.Alert, status types.AlertStatus, receivers []string) *open_api_models.GettableAlert {
func AlertToOpenAPIAlert(alert *types.AlertSnapshot, status types.AlertStatus, receivers []string) *open_api_models.GettableAlert {
startsAt := strfmt.DateTime(alert.StartsAt)
updatedAt := strfmt.DateTime(alert.UpdatedAt)
endsAt := strfmt.DateTime(alert.EndsAt)
Expand Down
2 changes: 1 addition & 1 deletion cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func run() int {
}()

groupFn := func(routeFilter func(*dispatch.Route) bool, alertFilter func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
return disp.Groups(routeFilter, alertFilter)
return disp.Groups(time.Now(), routeFilter, alertFilter)
}

// An interface value that holds a nil concrete value is non-nil.
Expand Down
13 changes: 6 additions & 7 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (d *Dispatcher) doMaintenance() {

// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
Alerts types.AlertSlice
Alerts types.AlertsSnapshot
Labels model.LabelSet
Receiver string
}
Expand All @@ -220,7 +220,7 @@ func (ag AlertGroups) Less(i, j int) bool {
func (ag AlertGroups) Len() int { return len(ag) }

// Groups returns a slice of AlertGroups from the dispatcher's internal state.
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
func (d *Dispatcher) Groups(now time.Time, routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
groups := AlertGroups{}

d.mtx.RLock()
Expand All @@ -231,7 +231,6 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
// route on ingestion.
receivers := map[model.Fingerprint][]string{}

now := time.Now()
for route, ags := range d.aggrGroupsPerRoute {
if !routeFilter(route) {
continue
Expand All @@ -245,7 +244,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
}

alerts := ag.alerts.List()
filteredAlerts := make([]*types.Alert, 0, len(alerts))
filteredAlerts := make([]*types.AlertSnapshot, 0, len(alerts))
for _, a := range alerts {
if !alertFilter(a, now) {
continue
Expand All @@ -262,7 +261,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
receivers[fp] = []string{receiver}
}

filteredAlerts = append(filteredAlerts, a)
filteredAlerts = append(filteredAlerts, types.NewAlertSnapshot(a, now))
}
if len(filteredAlerts) == 0 {
continue
Expand Down Expand Up @@ -508,10 +507,10 @@ func (ag *aggrGroup) flush(notify func(...*types.AlertSnapshot) bool) {
alertsSlice := types.SnapshotAlerts(ag.alerts.List(), time.Now())
sort.Stable(alertsSlice)

resolvedSlice := make(types.AlertSlice, 0, len(alertsSlice))
resolvedSlice := make(types.AlertsSnapshot, 0, len(alertsSlice))
for _, alert := range alertsSlice {
if alert.Resolved() {
resolvedSlice = append(resolvedSlice, &alert.Alert)
resolvedSlice = append(resolvedSlice, alert)
}
}

Expand Down
30 changes: 16 additions & 14 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestAggrGroup(t *testing.T) {
if s < opts.GroupWait {
t.Fatalf("received batch too early after %v", s)
}
exp := types.SnapshotAlerts(types.AlertSlice{a1}, batch[0].SnapshotAt)
exp := types.SnapshotAlerts([]*types.Alert{a1}, batch[0].SnapshotAt)
sort.Sort(batch)

require.Equal(t, exp, batch)
Expand All @@ -167,7 +167,7 @@ func TestAggrGroup(t *testing.T) {
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := types.SnapshotAlerts(types.AlertSlice{a1, a3}, batch[0].SnapshotAt)
exp := types.SnapshotAlerts([]*types.Alert{a1, a3}, batch[0].SnapshotAt)
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
Expand All @@ -194,7 +194,7 @@ func TestAggrGroup(t *testing.T) {
t.Fatalf("expected immediate alert but received none")

case batch := <-alertsCh:
exp := types.SnapshotAlerts(types.AlertSlice{a1, a2}, batch[0].SnapshotAt)
exp := types.SnapshotAlerts([]*types.Alert{a1, a2}, batch[0].SnapshotAt)
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
Expand All @@ -217,7 +217,7 @@ func TestAggrGroup(t *testing.T) {
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := types.SnapshotAlerts(types.AlertSlice{a1, a2, a3}, batch[0].SnapshotAt)
exp := types.SnapshotAlerts([]*types.Alert{a1, a2, a3}, batch[0].SnapshotAt)
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
Expand All @@ -241,7 +241,7 @@ func TestAggrGroup(t *testing.T) {
if s < opts.GroupInterval {
t.Fatalf("received batch too early after %v", s)
}
exp := types.SnapshotAlerts(types.AlertSlice{&a1r, a2, a3}, batch[0].SnapshotAt)
exp := types.SnapshotAlerts([]*types.Alert{&a1r, a2, a3}, batch[0].SnapshotAt)
sort.Sort(batch)

if !reflect.DeepEqual(batch, exp) {
Expand All @@ -252,7 +252,7 @@ func TestAggrGroup(t *testing.T) {
// Resolve all remaining alerts, they should be removed after the next batch was sent.
// Do not add a1r as it should have been deleted following the previous batch.
a2r, a3r := *a2, *a3
resolved := types.AlertSlice{&a2r, &a3r}
resolved := []*types.Alert{&a2r, &a3r}
for _, a := range resolved {
a.EndsAt = time.Now()
ag.insert(a)
Expand Down Expand Up @@ -413,7 +413,9 @@ route:
}
require.Len(t, recorder.Alerts(), 7)

now := time.Now()
alertGroups, receivers := dispatcher.Groups(
now,
func(*Route) bool {
return true
}, func(*types.Alert, time.Time) bool {
Expand All @@ -423,22 +425,22 @@ route:

require.Equal(t, AlertGroups{
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[0]},
Alerts: []*types.AlertSnapshot{types.NewAlertSnapshot(inputAlerts[0], now)},
Labels: model.LabelSet{
"alertname": "OtherAlert",
},
Receiver: "prod",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[1]},
Alerts: []*types.AlertSnapshot{types.NewAlertSnapshot(inputAlerts[1], now)},
Labels: model.LabelSet{
"alertname": "TestingAlert",
"service": "api",
},
Receiver: "testing",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[2], inputAlerts[3]},
Alerts: []*types.AlertSnapshot{types.NewAlertSnapshot(inputAlerts[2], now), types.NewAlertSnapshot(inputAlerts[3], now)},
Labels: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
Expand All @@ -447,7 +449,7 @@ route:
Receiver: "prod",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[4]},
Alerts: []*types.AlertSnapshot{types.NewAlertSnapshot(inputAlerts[4], now)},
Labels: model.LabelSet{
"alertname": "HighErrorRate",
"service": "api",
Expand All @@ -456,7 +458,7 @@ route:
Receiver: "prod",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[5]},
Alerts: []*types.AlertSnapshot{types.NewAlertSnapshot(inputAlerts[5], now)},
Labels: model.LabelSet{
"alertname": "HighLatency",
"service": "db",
Expand All @@ -465,7 +467,7 @@ route:
Receiver: "kafka",
},
&AlertGroup{
Alerts: []*types.Alert{inputAlerts[5]},
Alerts: []*types.AlertSnapshot{types.NewAlertSnapshot(inputAlerts[5], now)},
Labels: model.LabelSet{
"alertname": "HighLatency",
"service": "db",
Expand Down Expand Up @@ -559,7 +561,7 @@ route:
routeFilter := func(*Route) bool { return true }
alertFilter := func(*types.Alert, time.Time) bool { return true }

alertGroups, _ := dispatcher.Groups(routeFilter, alertFilter)
alertGroups, _ := dispatcher.Groups(time.Now(), routeFilter, alertFilter)
require.Len(t, alertGroups, 6)

require.Equal(t, 0.0, testutil.ToFloat64(m.aggrGroupLimitReached))
Expand All @@ -577,7 +579,7 @@ route:
require.Equal(t, 1.0, testutil.ToFloat64(m.aggrGroupLimitReached))

// Verify there are still only 6 groups.
alertGroups, _ = dispatcher.Groups(routeFilter, alertFilter)
alertGroups, _ = dispatcher.Groups(time.Now(), routeFilter, alertFilter)
require.Len(t, alertGroups, 6)
}

Expand Down
2 changes: 1 addition & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (a *Alerts) Set(alert *types.Alert) error {

// DeleteIfNotModified deletes the slice of Alerts from the store if not
// modified.
func (a *Alerts) DeleteIfNotModified(alerts types.AlertSlice) error {
func (a *Alerts) DeleteIfNotModified(alerts types.AlertsSnapshot) error {
a.Lock()
defer a.Unlock()
for _, alert := range alerts {
Expand Down
8 changes: 4 additions & 4 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestDeleteIfNotModified(t *testing.T) {
require.NoError(t, a.Set(a1))

// a1 should be deleted as it has not been modified.
a.DeleteIfNotModified(types.AlertSlice{a1})
a.DeleteIfNotModified(types.AlertsSnapshot{types.NewAlertSnapshot(a1, time.Now())})
got, err := a.Get(a1.Fingerprint())
require.Equal(t, ErrNotFound, err)
require.Nil(t, got)
Expand Down Expand Up @@ -80,7 +80,7 @@ func TestDeleteIfNotModified(t *testing.T) {
UpdatedAt: time.Now().Add(-time.Second),
}
require.True(t, a2.UpdatedAt.Before(a1.UpdatedAt))
a.DeleteIfNotModified(types.AlertSlice{a2})
a.DeleteIfNotModified(types.AlertsSnapshot{types.NewAlertSnapshot(a2, time.Now())})
// a1 should not be deleted.
got, err := a.Get(a1.Fingerprint())
require.NoError(t, err)
Expand All @@ -97,7 +97,7 @@ func TestDeleteIfNotModified(t *testing.T) {
UpdatedAt: time.Now().Add(time.Second),
}
require.True(t, a3.UpdatedAt.After(a1.UpdatedAt))
a.DeleteIfNotModified(types.AlertSlice{a3})
a.DeleteIfNotModified(types.AlertsSnapshot{types.NewAlertSnapshot(a3, time.Now())})
// a1 should not be deleted.
got, err = a.Get(a1.Fingerprint())
require.NoError(t, err)
Expand Down Expand Up @@ -126,7 +126,7 @@ func TestDeleteIfNotModified(t *testing.T) {
require.NoError(t, a.Set(a2))

// Deleting a1 should not delete a2.
require.NoError(t, a.DeleteIfNotModified(types.AlertSlice{a1}))
require.NoError(t, a.DeleteIfNotModified(types.AlertsSnapshot{types.NewAlertSnapshot(a1, time.Now())}))
// a1 should be deleted.
got, err := a.Get(a1.Fingerprint())
require.Equal(t, ErrNotFound, err)
Expand Down
26 changes: 0 additions & 26 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,32 +415,6 @@ func (a *Alert) Validate() error {
return nil
}

// AlertSlice is a sortable slice of Alerts.
type AlertSlice []*Alert

func (as AlertSlice) Less(i, j int) bool {
// Look at labels.job, then labels.instance.
for _, overrideKey := range [...]model.LabelName{"job", "instance"} {
iVal, iOk := as[i].Labels[overrideKey]
jVal, jOk := as[j].Labels[overrideKey]
if !iOk && !jOk {
continue
}
if !iOk {
return false
}
if !jOk {
return true
}
if iVal != jVal {
return iVal < jVal
}
}
return as[i].Labels.Before(as[j].Labels)
}
func (as AlertSlice) Swap(i, j int) { as[i], as[j] = as[j], as[i] }
func (as AlertSlice) Len() int { return len(as) }

func (as AlertsSnapshot) Less(i, j int) bool {
// Look at labels.job, then labels.instance.
for _, overrideKey := range [...]model.LabelName{"job", "instance"} {
Expand Down
Loading

0 comments on commit d987ddd

Please sign in to comment.