Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative fix for #3805 by creating a snapshot and using it #3827

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
continue
}

alert := AlertToOpenAPIAlert(a, api.getAlertStatus(a.Fingerprint()), receivers)
alert := AlertToOpenAPIAlert(types.NewAlertSnapshot(a, now), api.getAlertStatus(a.Fingerprint()), receivers)

res = append(res, alert)
}
Expand Down
2 changes: 1 addition & 1 deletion api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func TestAlertToOpenAPIAlert(t *testing.T) {
UpdatedAt: updated,
}
)
openAPIAlert := AlertToOpenAPIAlert(alert, types.AlertStatus{State: types.AlertStateActive}, receivers)
openAPIAlert := AlertToOpenAPIAlert(types.NewAlertSnapshot(alert, time.Now()), types.AlertStatus{State: types.AlertStateActive}, receivers)
require.Equal(t, &open_api_models.GettableAlert{
Annotations: open_api_models.LabelSet{},
Alert: open_api_models.Alert{
Expand Down
2 changes: 1 addition & 1 deletion api/v2/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func PostableSilenceToProto(s *open_api_models.PostableSilence) (*silencepb.Sile
}

// AlertToOpenAPIAlert converts internal alerts, alert types, and receivers to *open_api_models.GettableAlert.
func AlertToOpenAPIAlert(alert *types.Alert, status types.AlertStatus, receivers []string) *open_api_models.GettableAlert {
func AlertToOpenAPIAlert(alert *types.AlertSnapshot, status types.AlertStatus, receivers []string) *open_api_models.GettableAlert {
startsAt := strfmt.DateTime(alert.StartsAt)
updatedAt := strfmt.DateTime(alert.UpdatedAt)
endsAt := strfmt.DateTime(alert.EndsAt)
Expand Down
2 changes: 1 addition & 1 deletion cmd/alertmanager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func run() int {
}()

groupFn := func(routeFilter func(*dispatch.Route) bool, alertFilter func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
return disp.Groups(routeFilter, alertFilter)
return disp.Groups(time.Now(), routeFilter, alertFilter)
}

// An interface value that holds a nil concrete value is non-nil.
Expand Down
39 changes: 15 additions & 24 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (d *Dispatcher) doMaintenance() {

// AlertGroup represents how alerts exist within an aggrGroup.
type AlertGroup struct {
Alerts types.AlertSlice
Alerts types.AlertsSnapshot
Labels model.LabelSet
Receiver string
}
Expand All @@ -220,7 +220,7 @@ func (ag AlertGroups) Less(i, j int) bool {
func (ag AlertGroups) Len() int { return len(ag) }

// Groups returns a slice of AlertGroups from the dispatcher's internal state.
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
func (d *Dispatcher) Groups(now time.Time, routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
groups := AlertGroups{}

d.mtx.RLock()
Expand All @@ -231,7 +231,6 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
// route on ingestion.
receivers := map[model.Fingerprint][]string{}

now := time.Now()
for route, ags := range d.aggrGroupsPerRoute {
if !routeFilter(route) {
continue
Expand All @@ -245,7 +244,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
}

alerts := ag.alerts.List()
filteredAlerts := make([]*types.Alert, 0, len(alerts))
filteredAlerts := make([]*types.AlertSnapshot, 0, len(alerts))
for _, a := range alerts {
if !alertFilter(a, now) {
continue
Expand All @@ -262,7 +261,7 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
receivers[fp] = []string{receiver}
}

filteredAlerts = append(filteredAlerts, a)
filteredAlerts = append(filteredAlerts, types.NewAlertSnapshot(a, now))
}
if len(filteredAlerts) == 0 {
continue
Expand Down Expand Up @@ -303,7 +302,7 @@ func (d *Dispatcher) Stop() {
// notifyFunc is a function that performs notification for the alert
// with the given fingerprint. It aborts on context cancelation.
// Returns false iff notifying failed.
type notifyFunc func(context.Context, ...*types.Alert) bool
type notifyFunc func(context.Context, ...*types.AlertSnapshot) bool

// processAlert determines in which aggregation group the alert falls
// and inserts it.
Expand Down Expand Up @@ -344,7 +343,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// alert is already there.
ag.insert(alert)

go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
go ag.run(func(ctx context.Context, alerts ...*types.AlertSnapshot) bool {
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
if err != nil {
lvl := level.Error(d.logger)
Expand Down Expand Up @@ -461,7 +460,7 @@ func (ag *aggrGroup) run(nf notifyFunc) {
ag.hasFlushed = true
ag.mtx.Unlock()

ag.flush(func(alerts ...*types.Alert) bool {
ag.flush(func(alerts ...*types.AlertSnapshot) bool {
return nf(ctx, alerts...)
})

Expand Down Expand Up @@ -500,28 +499,20 @@ func (ag *aggrGroup) empty() bool {
}

// flush sends notifications for all new alerts.
func (ag *aggrGroup) flush(notify func(...*types.Alert) bool) {
func (ag *aggrGroup) flush(notify func(...*types.AlertSnapshot) bool) {
if ag.empty() {
return
}

var (
alerts = ag.alerts.List()
alertsSlice = make(types.AlertSlice, 0, len(alerts))
resolvedSlice = make(types.AlertSlice, 0, len(alerts))
now = time.Now()
)
for _, alert := range alerts {
a := *alert
// Ensure that alerts don't resolve as time move forwards.
if a.ResolvedAt(now) {
resolvedSlice = append(resolvedSlice, &a)
} else {
a.EndsAt = time.Time{}
alertsSlice := types.SnapshotAlerts(ag.alerts.List(), time.Now())
sort.Stable(alertsSlice)

resolvedSlice := make(types.AlertsSnapshot, 0, len(alertsSlice))
for _, alert := range alertsSlice {
if alert.Resolved() {
resolvedSlice = append(resolvedSlice, alert)
}
alertsSlice = append(alertsSlice, &a)
}
sort.Stable(alertsSlice)

level.Debug(ag.logger).Log("msg", "flushing", "alerts", fmt.Sprintf("%v", alertsSlice))

Expand Down
Loading
Loading