From a2495fbec7bd69663e7dba4cbcfb9c80d27ba3ab Mon Sep 17 00:00:00 2001 From: Emmanuel Lodovice Date: Wed, 15 Nov 2023 22:51:19 -0800 Subject: [PATCH] Add AlertLifeCycleObserver that allows consumers to hook into Alert life cycle Signed-off-by: Emmanuel Lodovice --- alertobserver/alertobserver.go | 36 ++++++++++ alertobserver/testing.go | 46 +++++++++++++ api/api.go | 6 ++ api/v1/api.go | 45 ++++++++----- api/v1/api_test.go | 73 +++++++++++++++++++- api/v2/api.go | 35 +++++++--- api/v2/api_test.go | 64 ++++++++++++++++++ api/v2/testing.go | 48 +++++++++++++ cmd/alertmanager/main.go | 3 +- dispatch/dispatch.go | 32 ++++++--- dispatch/dispatch_test.go | 72 ++++++++++++++++++-- notify/notify.go | 110 +++++++++++++++++++++--------- notify/notify_test.go | 120 ++++++++++++++++++++++++++++++--- 13 files changed, 606 insertions(+), 84 deletions(-) create mode 100644 alertobserver/alertobserver.go create mode 100644 alertobserver/testing.go diff --git a/alertobserver/alertobserver.go b/alertobserver/alertobserver.go new file mode 100644 index 0000000000..cf05d9210c --- /dev/null +++ b/alertobserver/alertobserver.go @@ -0,0 +1,36 @@ +// Copyright 2023 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alertobserver + +import ( + "github.com/prometheus/alertmanager/types" +) + +const ( + EventAlertReceived string = "received" + EventAlertRejected string = "rejected" + EventAlertAddedToAggrGroup string = "addedAggrGroup" + EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup" + EventAlertPipelineStart string = "pipelineStart" + EventAlertPipelinePassStage string = "pipelinePassStage" + EventAlertMuted string = "muted" + EventAlertSent string = "sent" + EventAlertSendFailed string = "sendFailed" +) + +type AlertEventMeta map[string]interface{} + +type LifeCycleObserver interface { + Observe(event string, alerts []*types.Alert, meta AlertEventMeta) +} diff --git a/alertobserver/testing.go b/alertobserver/testing.go new file mode 100644 index 0000000000..66f774fbb7 --- /dev/null +++ b/alertobserver/testing.go @@ -0,0 +1,46 @@ +// Copyright 2023 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alertobserver + +import ( + "sync" + + "github.com/prometheus/alertmanager/types" +) + +type FakeLifeCycleObserver struct { + AlertsPerEvent map[string][]*types.Alert + PipelineStageAlerts map[string][]*types.Alert + MetaPerEvent map[string][]AlertEventMeta + Mtx sync.RWMutex +} + +func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) { + o.Mtx.Lock() + defer o.Mtx.Unlock() + if event == EventAlertPipelinePassStage { + o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...) + } else { + o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...) + } + o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta) +} + +func NewFakeLifeCycleObserver() *FakeLifeCycleObserver { + return &FakeLifeCycleObserver{ + PipelineStageAlerts: map[string][]*types.Alert{}, + AlertsPerEvent: map[string][]*types.Alert{}, + MetaPerEvent: map[string][]AlertEventMeta{}, + } +} diff --git a/api/api.go b/api/api.go index 59bfb76da6..db4b3b5337 100644 --- a/api/api.go +++ b/api/api.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/alertmanager/alertobserver" apiv1 "github.com/prometheus/alertmanager/api/v1" apiv2 "github.com/prometheus/alertmanager/api/v2" "github.com/prometheus/alertmanager/cluster" @@ -74,6 +75,9 @@ type Options struct { // according to the current active configuration. Alerts returned are // filtered by the arguments provided to the function. GroupFunc func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) + // AlertLCObserver is used to add hooks to the different alert life cycle events. + // If nil then no observer methods will be invoked in the life cycle events. + AlertLCObserver alertobserver.LifeCycleObserver } func (o Options) validate() error { @@ -117,6 +121,7 @@ func New(opts Options) (*API, error) { opts.Peer, log.With(l, "version", "v1"), opts.Registry, + opts.AlertLCObserver, ) v2, err := apiv2.NewAPI( @@ -127,6 +132,7 @@ func New(opts Options) (*API, error) { opts.Peer, log.With(l, "version", "v2"), opts.Registry, + opts.AlertLCObserver, ) if err != nil { return nil, err diff --git a/api/v1/api.go b/api/v1/api.go index 39018a7589..d8b4ce0a19 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -30,6 +30,7 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/common/version" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/api/metrics" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" @@ -67,14 +68,15 @@ func setCORS(w http.ResponseWriter) { // API provides registration of handlers for API routes. type API struct { - alerts provider.Alerts - silences *silence.Silences - config *config.Config - route *dispatch.Route - uptime time.Time - peer cluster.ClusterPeer - logger log.Logger - m *metrics.Alerts + alerts provider.Alerts + silences *silence.Silences + config *config.Config + route *dispatch.Route + uptime time.Time + peer cluster.ClusterPeer + logger log.Logger + m *metrics.Alerts + alertLCObserver alertobserver.LifeCycleObserver getAlertStatus getAlertStatusFn @@ -91,19 +93,21 @@ func New( peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, + o alertobserver.LifeCycleObserver, ) *API { if l == nil { l = log.NewNopLogger() } return &API{ - alerts: alerts, - silences: silences, - getAlertStatus: sf, - uptime: time.Now(), - peer: peer, - logger: l, - m: metrics.NewAlerts("v1", r), + alerts: alerts, + silences: silences, + getAlertStatus: sf, + uptime: time.Now(), + peer: peer, + logger: l, + m: metrics.NewAlerts("v1", r), + alertLCObserver: o, } } @@ -447,6 +451,10 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* if err := a.Validate(); err != nil { validationErrs.Add(err) api.m.Invalid().Inc() + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) + } continue } validAlerts = append(validAlerts, a) @@ -456,8 +464,15 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* typ: errorInternal, err: err, }, nil) + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) + } return } + if api.alertLCObserver != nil { + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{}) + } if validationErrs.Len() > 0 { api.respondError(w, apiError{ diff --git a/api/v1/api_test.go b/api/v1/api_test.go index 84315ef394..8310c78164 100644 --- a/api/v1/api_test.go +++ b/api/v1/api_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/dispatch" "github.com/prometheus/alertmanager/pkg/labels" @@ -134,7 +135,7 @@ func TestAddAlerts(t *testing.T) { } alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) - api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil) defaultGlobalConfig := config.DefaultGlobalConfig() route := config.Route{} api.Update(&config.Config{ @@ -153,6 +154,74 @@ func TestAddAlerts(t *testing.T) { body, _ := io.ReadAll(res.Body) require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body))) + + observer := alertobserver.NewFakeLifeCycleObserver() + api.alertLCObserver = observer + r, err = http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) + w = httptest.NewRecorder() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + api.addAlerts(w, r) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint()) + } + } +} + +func TestAddAlertsWithAlertLCObserver(t *testing.T) { + now := func(offset int) time.Time { + return time.Now().Add(time.Duration(offset) * time.Second) + } + + for i, tc := range []struct { + start, end time.Time + err bool + code int + }{ + {time.Time{}, time.Time{}, false, 200}, + {now(1), now(0), false, 400}, + {now(0), time.Time{}, true, 500}, + } { + alerts := []model.Alert{{ + StartsAt: tc.start, + EndsAt: tc.end, + Labels: model.LabelSet{"label1": "test1"}, + Annotations: model.LabelSet{"annotation1": "some text"}, + }} + b, err := json.Marshal(&alerts) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) + observer := alertobserver.NewFakeLifeCycleObserver() + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, observer) + defaultGlobalConfig := config.DefaultGlobalConfig() + route := config.Route{} + api.Update(&config.Config{ + Global: &defaultGlobalConfig, + Route: &route, + }) + + r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) + w := httptest.NewRecorder() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + api.addAlerts(w, r) + res := w.Result() + body, _ := io.ReadAll(res.Body) + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body))) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint()) + } } } @@ -267,7 +336,7 @@ func TestListAlerts(t *testing.T) { }, } { alertsProvider := newFakeAlerts(alerts, tc.err) - api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil) api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil) r, err := http.NewRequest("GET", "/api/v1/alerts", nil) diff --git a/api/v2/api.go b/api/v2/api.go index 1ddb2bcbae..b31d5bbfd2 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/common/version" "github.com/rs/cors" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/api/metrics" open_api_models "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/restapi" @@ -68,8 +69,9 @@ type API struct { route *dispatch.Route setAlertStatus setAlertStatusFn - logger log.Logger - m *metrics.Alerts + logger log.Logger + m *metrics.Alerts + alertLCObserver alertobserver.LifeCycleObserver Handler http.Handler } @@ -89,16 +91,18 @@ func NewAPI( peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, + o alertobserver.LifeCycleObserver, ) (*API, error) { api := API{ - alerts: alerts, - getAlertStatus: sf, - alertGroups: gf, - peer: peer, - silences: silences, - logger: l, - m: metrics.NewAlerts("v2", r), - uptime: time.Now(), + alerts: alerts, + getAlertStatus: sf, + alertGroups: gf, + peer: peer, + silences: silences, + logger: l, + m: metrics.NewAlerts("v2", r), + uptime: time.Now(), + alertLCObserver: o, } // Load embedded swagger file. @@ -350,12 +354,20 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. if err := a.Validate(); err != nil { validationErrs.Add(err) api.m.Invalid().Inc() + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) + } continue } validAlerts = append(validAlerts, a) } if err := api.alerts.Put(validAlerts...); err != nil { level.Error(logger).Log("msg", "Failed to create alerts", "err", err) + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) + } return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error()) } @@ -363,6 +375,9 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. level.Error(logger).Log("msg", "Failed to validate alerts", "err", validationErrs.Error()) return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error()) } + if api.alertLCObserver != nil { + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{}) + } return alert_ops.NewPostAlertsOK() } diff --git a/api/v2/api_test.go b/api/v2/api_test.go index d520dbde13..96241e9c46 100644 --- a/api/v2/api_test.go +++ b/api/v2/api_test.go @@ -28,7 +28,10 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" + "github.com/prometheus/alertmanager/api/metrics" open_api_models "github.com/prometheus/alertmanager/api/v2/models" + alert_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alert" general_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/general" receiver_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/receiver" silence_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/silence" @@ -510,3 +513,64 @@ receivers: require.Equal(t, tc.body, string(body)) } } + +func TestPostAlertHandler(t *testing.T) { + now := time.Now() + for i, tc := range []struct { + start, end time.Time + err bool + code int + }{ + {time.Time{}, time.Time{}, false, 200}, + {now, time.Time{}, false, 200}, + {time.Time{}, now.Add(time.Duration(-1) * time.Second), false, 200}, + {time.Time{}, now, false, 200}, + {time.Time{}, now.Add(time.Duration(1) * time.Second), false, 200}, + {now.Add(time.Duration(-2) * time.Second), now.Add(time.Duration(-1) * time.Second), false, 200}, + {now.Add(time.Duration(1) * time.Second), now.Add(time.Duration(2) * time.Second), false, 200}, + {now.Add(time.Duration(1) * time.Second), now, false, 400}, + } { + alerts, alertsBytes := createAlert(t, tc.start, tc.end) + api := API{ + uptime: time.Now(), + alerts: newFakeAlerts([]*types.Alert{}), + logger: log.NewNopLogger(), + m: metrics.NewAlerts("v2", nil), + } + api.Update(&config.Config{ + Global: &config.GlobalConfig{ + ResolveTimeout: model.Duration(5), + }, + Route: &config.Route{}, + }, nil) + + r, err := http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) + require.NoError(t, err) + + w := httptest.NewRecorder() + p := runtime.TextProducer() + responder := api.postAlertsHandler(alert_ops.PostAlertsParams{ + HTTPRequest: r, + Alerts: alerts, + }) + responder.WriteResponse(w, p) + body, _ := io.ReadAll(w.Result().Body) + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body))) + + observer := alertobserver.NewFakeLifeCycleObserver() + api.alertLCObserver = observer + r, err = http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) + require.NoError(t, err) + api.postAlertsHandler(alert_ops.PostAlertsParams{ + HTTPRequest: r, + Alerts: alerts, + }) + amAlert := OpenAPIAlertsToAlerts(alerts) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), amAlert[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), amAlert[0].Fingerprint()) + } + } +} diff --git a/api/v2/testing.go b/api/v2/testing.go index c813d350d3..9e8f31db0a 100644 --- a/api/v2/testing.go +++ b/api/v2/testing.go @@ -19,11 +19,14 @@ import ( "time" "github.com/go-openapi/strfmt" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" open_api_models "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/pkg/labels" + "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/silence/silencepb" + "github.com/prometheus/alertmanager/types" ) func createSilence(t *testing.T, ID, creator string, start, ends time.Time) (open_api_models.PostableSilence, []byte) { @@ -68,3 +71,48 @@ func createLabelMatcher(t *testing.T, name, value string, matchType labels.Match matcher, _ := labels.NewMatcher(matchType, name, value) return matcher } + +func createAlert(t *testing.T, start, ends time.Time) (open_api_models.PostableAlerts, []byte) { + startsAt := strfmt.DateTime(start) + endsAt := strfmt.DateTime(ends) + + alert := open_api_models.PostableAlert{ + StartsAt: startsAt, + EndsAt: endsAt, + Annotations: open_api_models.LabelSet{"annotation1": "some text"}, + Alert: open_api_models.Alert{ + Labels: open_api_models.LabelSet{"label1": "test1"}, + GeneratorURL: "http://localhost:3000", + }, + } + alerts := open_api_models.PostableAlerts{} + alerts = append(alerts, &alert) + b, err := json.Marshal(alerts) + require.NoError(t, err) + return alerts, b +} + +type fakeAlerts struct { + fps map[model.Fingerprint]int + alerts []*types.Alert + err error +} + +func newFakeAlerts(alerts []*types.Alert) *fakeAlerts { + fps := make(map[model.Fingerprint]int) + for i, a := range alerts { + fps[a.Fingerprint()] = i + } + f := &fakeAlerts{ + alerts: alerts, + fps: fps, + } + return f +} + +func (f *fakeAlerts) Subscribe() provider.AlertIterator { return nil } +func (f *fakeAlerts) Get(model.Fingerprint) (*types.Alert, error) { return nil, nil } +func (f *fakeAlerts) GetPending() provider.AlertIterator { return nil } +func (f *fakeAlerts) Put(alerts ...*types.Alert) error { + return f.err +} diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 339b2b71cb..fd96542d4f 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -429,6 +429,7 @@ func run() int { intervener, notificationLog, pipelinePeer, + nil, ) configuredReceivers.Set(float64(len(activeReceivers))) @@ -439,7 +440,7 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics) + disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics, nil) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { level.Warn(configLogger).Log( diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 853d849688..833c70a544 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/store" @@ -90,7 +91,8 @@ type Dispatcher struct { ctx context.Context cancel func() - logger log.Logger + logger log.Logger + alertLCObserver alertobserver.LifeCycleObserver } // Limits describes limits used by Dispatcher. @@ -111,19 +113,21 @@ func NewDispatcher( lim Limits, l log.Logger, m *DispatcherMetrics, + o alertobserver.LifeCycleObserver, ) *Dispatcher { if lim == nil { lim = nilLimits{} } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - timeout: to, - logger: log.With(l, "component", "dispatcher"), - metrics: m, - limits: lim, + alerts: ap, + stage: s, + route: r, + timeout: to, + logger: log.With(l, "component", "dispatcher"), + metrics: m, + limits: lim, + alertLCObserver: o, } return disp } @@ -319,13 +323,20 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { ag, ok := routeGroups[fp] if ok { ag.insert(alert) + if d.alertLCObserver != nil { + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"groupKey": ag.GroupKey()}) + } return } // If the group does not exist, create it. But check the limit first. if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { d.metrics.aggrGroupLimitReached.Inc() - level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + errMsg := "Too many aggregation groups, cannot create new group for alert" + level.Error(d.logger).Log("msg", errMsg, "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + if d.alertLCObserver != nil { + d.alertLCObserver.Observe(alertobserver.EventAlertFailedAddToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"msg": errMsg}) + } return } @@ -338,6 +349,9 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { // function, to make sure that when the run() will be executed the 1st // alert is already there. ag.insert(alert) + if d.alertLCObserver != nil { + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"groupKey": ag.GroupKey()}) + } go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { _, _, err := d.stage.Exec(ctx, d.logger, alerts...) diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 17ffe85a9a..e026dffb84 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider/mem" @@ -374,7 +375,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() @@ -514,7 +515,7 @@ route: recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} lim := limits{groups: 6} m := NewDispatcherMetrics(true, prometheus.NewRegistry()) - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, nil) go dispatcher.Run() defer dispatcher.Stop() @@ -568,6 +569,69 @@ route: require.Len(t, alertGroups, 6) } +func TestGroupsAlertLCObserver(t *testing.T) { + confData := `receivers: +- name: 'testing' + +route: + group_by: ['alertname'] + group_wait: 10ms + group_interval: 10ms + receiver: 'testing'` + conf, err := config.Load(confData) + if err != nil { + t.Fatal(err) + } + + logger := log.NewNopLogger() + route := NewRoute(conf.Route, nil) + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + timeout := func(d time.Duration) time.Duration { return time.Duration(0) } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + m := NewDispatcherMetrics(true, prometheus.NewRegistry()) + observer := alertobserver.NewFakeLifeCycleObserver() + lim := limits{groups: 1} + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, observer) + go dispatcher.Run() + defer dispatcher.Stop() + + // Create alerts. the dispatcher will automatically create the groups. + alert1 := newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}) + alert2 := newAlert(model.LabelSet{"alertname": "YetAnotherAlert", "cluster": "cc", "service": "db"}) + err = alerts.Put(alert1) + if err != nil { + t.Fatal(err) + } + // Let alerts get processed. + for i := 0; len(recorder.Alerts()) != 1 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + err = alerts.Put(alert2) + if err != nil { + t.Fatal(err) + } + // Let alert get processed. + for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + observer.Mtx.RLock() + defer observer.Mtx.RUnlock() + require.Equal(t, 1, len(recorder.Alerts())) + require.Equal(t, alert1.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint()) + groupFp := getGroupLabels(alert1, route).Fingerprint() + groupKey := dispatcher.aggrGroupsPerRoute[route][groupFp].GroupKey() + require.Equal(t, groupKey, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupKey"].(string)) + + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup])) + require.Equal(t, alert2.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup][0].Fingerprint()) +} + type recordStage struct { mtx sync.RWMutex alerts map[string]map[model.Fingerprint]*types.Alert @@ -632,7 +696,7 @@ func TestDispatcherRace(t *testing.T) { defer alerts.Close() timeout := func(d time.Duration) time.Duration { return time.Duration(0) } - dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() dispatcher.Stop() } @@ -660,7 +724,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() diff --git a/notify/notify.go b/notify/notify.go index 33d499af30..73453406d7 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sort" + "strings" "sync" "time" @@ -28,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" @@ -377,18 +379,25 @@ func (pb *PipelineBuilder) New( intervener *timeinterval.Intervener, notificationLog NotificationLog, peer Peer, + o alertobserver.LifeCycleObserver, ) RoutingStage { - rs := make(RoutingStage, len(receivers)) + rs := RoutingStage{ + stages: make(map[string]Stage, len(receivers)), + alertLCObserver: o, + } ms := NewGossipSettleStage(peer) - is := NewMuteStage(inhibitor) + is := NewMuteStage(inhibitor, o) tas := NewTimeActiveStage(intervener) tms := NewTimeMuteStage(intervener) - ss := NewMuteStage(silencer) + ss := NewMuteStage(silencer, o) for name := range receivers { - st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) - rs[name] = MultiStage{ms, is, tas, tms, ss, st} + st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics, o) + rs.stages[name] = MultiStage{ + alertLCObserver: o, + stages: []Stage{ms, is, tas, tms, ss, st}, + } } pb.metrics.InitializeFor(receivers) @@ -403,6 +412,7 @@ func createReceiverStage( wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, + o alertobserver.LifeCycleObserver, ) Stage { var fs FanoutStage for i := range integrations { @@ -411,20 +421,23 @@ func createReceiverStage( Integration: integrations[i].Name(), Idx: uint32(integrations[i].Index()), } - var s MultiStage + var s []Stage s = append(s, NewWaitStage(wait)) s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) - s = append(s, NewRetryStage(integrations[i], name, metrics)) + s = append(s, NewRetryStage(integrations[i], name, metrics, o)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) - fs = append(fs, s) + fs = append(fs, MultiStage{stages: s, alertLCObserver: o}) } return fs } // RoutingStage executes the inner stages based on the receiver specified in // the context. -type RoutingStage map[string]Stage +type RoutingStage struct { + stages map[string]Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { @@ -433,21 +446,28 @@ func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types. return ctx, nil, errors.New("receiver missing") } - s, ok := rs[receiver] + s, ok := rs.stages[receiver] if !ok { return ctx, nil, errors.New("stage for receiver missing") } + if rs.alertLCObserver != nil { + rs.alertLCObserver.Observe(alertobserver.EventAlertPipelineStart, alerts, alertobserver.AlertEventMeta{"ctx": ctx}) + } + return s.Exec(ctx, l, alerts...) } // A MultiStage executes a series of stages sequentially. -type MultiStage []Stage +type MultiStage struct { + stages []Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var err error - for _, s := range ms { + for _, s := range ms.stages { if len(alerts) == 0 { return ctx, nil, nil } @@ -456,6 +476,10 @@ func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al if err != nil { return ctx, nil, err } + if ms.alertLCObserver != nil { + p := strings.Split(fmt.Sprintf("%T", s), ".") + ms.alertLCObserver.Observe(alertobserver.EventAlertPipelinePassStage, alerts, alertobserver.AlertEventMeta{"ctx": ctx, "stageName": p[len(p)-1]}) + } } return ctx, alerts, nil } @@ -509,12 +533,13 @@ func (n *GossipSettleStage) Exec(ctx context.Context, _ log.Logger, alerts ...*t // MuteStage filters alerts through a Muter. type MuteStage struct { - muter types.Muter + muter types.Muter + alertLCObserver alertobserver.LifeCycleObserver } // NewMuteStage return a new MuteStage. -func NewMuteStage(m types.Muter) *MuteStage { - return &MuteStage{muter: m} +func NewMuteStage(m types.Muter, o alertobserver.LifeCycleObserver) *MuteStage { + return &MuteStage{muter: m, alertLCObserver: o} } // Exec implements the Stage interface. @@ -536,6 +561,9 @@ func (n *MuteStage) Exec(ctx context.Context, logger log.Logger, alerts ...*type if len(muted) > 0 { level.Debug(logger).Log("msg", "Notifications will not be sent for muted alerts", "alerts", fmt.Sprintf("%v", muted)) } + if n.alertLCObserver != nil { + n.alertLCObserver.Observe(alertobserver.EventAlertMuted, muted, alertobserver.AlertEventMeta{"ctx": ctx}) + } return ctx, filtered, nil } @@ -708,14 +736,15 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al // RetryStage notifies via passed integration with exponential backoff until it // succeeds. It aborts if the context is canceled or timed out. type RetryStage struct { - integration Integration - groupName string - metrics *Metrics - labelValues []string + integration Integration + groupName string + metrics *Metrics + labelValues []string + alertLCObserver alertobserver.LifeCycleObserver } // NewRetryStage returns a new instance of a RetryStage. -func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage { +func NewRetryStage(i Integration, groupName string, metrics *Metrics, o alertobserver.LifeCycleObserver) *RetryStage { labelValues := []string{i.Name()} if metrics.ff.EnableReceiverNamesInMetrics() { @@ -723,16 +752,17 @@ func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStag } return &RetryStage{ - integration: i, - groupName: groupName, - metrics: metrics, - labelValues: labelValues, + integration: i, + groupName: groupName, + metrics: metrics, + labelValues: labelValues, + alertLCObserver: o, } } func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { r.metrics.numNotifications.WithLabelValues(r.labelValues...).Inc() - ctx, alerts, err := r.exec(ctx, l, alerts...) + ctx, alerts, sent, err := r.exec(ctx, l, alerts...) failureReason := DefaultReason.String() if err != nil { @@ -740,11 +770,26 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale failureReason = e.Reason.String() } r.metrics.numTotalFailedNotifications.WithLabelValues(append(r.labelValues, failureReason)...).Inc() + if r.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "ctx": ctx, + "integration": r.integration.Name(), + "stageName": "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m) + } + } else if r.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "ctx": ctx, + "integration": r.integration.Name(), + "stageName": "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSent, sent, m) } return ctx, alerts, err } -func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { +func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, []*types.Alert, error) { var sent []*types.Alert // If we shouldn't send notifications for resolved alerts, but there are only @@ -753,10 +798,10 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if !r.integration.SendResolved() { firing, ok := FiringAlerts(ctx) if !ok { - return ctx, nil, errors.New("firing alerts missing") + return ctx, nil, nil, errors.New("firing alerts missing") } if len(firing) == 0 { - return ctx, alerts, nil + return ctx, alerts, sent, nil } for _, a := range alerts { if a.Status() != model.AlertResolved { @@ -792,7 +837,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale iErr = ctx.Err() } - return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) + return ctx, nil, sent, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) default: } @@ -806,7 +851,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if err != nil { r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc() if !retry { - return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i) + return ctx, alerts, sent, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i) } if ctx.Err() == nil && (iErr == nil || err.Error() != iErr.Error()) { // Log the error if the context isn't done and the error isn't the same as before. @@ -823,14 +868,13 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale } lvl.Log("msg", "Notify success", "attempts", i, "duration", dur) - return ctx, alerts, nil + return ctx, alerts, sent, nil } case <-ctx.Done(): if iErr == nil { iErr = ctx.Err() } - - return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) + return ctx, nil, sent, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) } } } diff --git a/notify/notify_test.go b/notify/notify_test.go index d3eeb4670a..e9328b69e7 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/nflog/nflogpb" @@ -306,7 +307,7 @@ func TestMultiStage(t *testing.T) { alerts3 = []*types.Alert{{}, {}, {}} ) - stage := MultiStage{ + stages := []Stage{ StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of MultiStage") @@ -326,7 +327,9 @@ func TestMultiStage(t *testing.T) { return ctx, alerts3, nil }), } - + stage := MultiStage{ + stages: stages, + } _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), alerts1...) if err != nil { t.Fatalf("Exec failed: %s", err) @@ -335,13 +338,28 @@ func TestMultiStage(t *testing.T) { if !reflect.DeepEqual(alerts, alerts3) { t.Fatal("Output of MultiStage is not equal to the output of the last stage") } + + // Rerun multistage but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + ctx := WithGroupKey(context.Background(), "test") + stage.alertLCObserver = observer + _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + + require.Equal(t, 1, len(observer.PipelineStageAlerts)) + require.Equal(t, 5, len(observer.PipelineStageAlerts["StageFunc"])) + metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelinePassStage][0]["ctx"].(context.Context) + _, ok := GroupKey(metaCtx) + require.True(t, ok) } func TestMultiStageFailure(t *testing.T) { var ( ctx = context.Background() s1 = failStage{} - stage = MultiStage{s1} + stage = MultiStage{stages: []Stage{s1}} ) _, _, err := stage.Exec(ctx, log.NewNopLogger(), nil) @@ -356,7 +374,7 @@ func TestRoutingStage(t *testing.T) { alerts2 = []*types.Alert{{}, {}} ) - stage := RoutingStage{ + s := map[string]Stage{ "name": StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of RoutingStage") @@ -365,6 +383,9 @@ func TestRoutingStage(t *testing.T) { }), "not": failStage{}, } + stage := RoutingStage{ + stages: s, + } ctx := WithReceiverName(context.Background(), "name") @@ -376,6 +397,18 @@ func TestRoutingStage(t *testing.T) { if !reflect.DeepEqual(alerts, alerts2) { t.Fatal("Output of RoutingStage is not equal to the output of the inner stage") } + + // Rerun RoutingStage but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + stage.alertLCObserver = observer + _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + require.Equal(t, len(alerts1), len(observer.AlertsPerEvent[alertobserver.EventAlertPipelineStart])) + metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelineStart][0]["ctx"].(context.Context) + _, ok := ReceiverName(metaCtx) + require.True(t, ok) } func TestRetryStageWithError(t *testing.T) { @@ -392,7 +425,7 @@ func TestRetryStageWithError(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -404,6 +437,7 @@ func TestRetryStageWithError(t *testing.T) { ctx := context.Background() ctx = WithFiringAlerts(ctx, []uint64{0}) + ctx = WithGroupKey(ctx, "test") // Notify with a recoverable error should retry and succeed. resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...) @@ -412,13 +446,40 @@ func TestRetryStageWithError(t *testing.T) { require.Equal(t, alerts, sent) require.NotNil(t, resctx) + // Rerun recoverable error but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSent])) + meta := observer.MetaPerEvent[alertobserver.EventAlertSent][0] + require.Equal(t, "RetryStage", meta["stageName"].(string)) + require.Equal(t, i.Name(), meta["integration"].(string)) + metaCtx := meta["ctx"].(context.Context) + _, ok := GroupKey(metaCtx) + require.True(t, ok) + // Notify with an unrecoverable error should fail. sent = sent[:0] fail = true retry = false + r.alertLCObserver = nil resctx, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) require.NotNil(t, err) require.NotNil(t, resctx) + + // Rerun the unrecoverable error but with alert life cycle observer + fail = true + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.NotNil(t, err) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSendFailed])) + meta = observer.MetaPerEvent[alertobserver.EventAlertSendFailed][0] + require.Equal(t, "RetryStage", meta["stageName"].(string)) + require.Equal(t, i.Name(), meta["integration"].(string)) + metaCtx = meta["ctx"].(context.Context) + _, ok = GroupKey(metaCtx) + require.True(t, ok) } func TestRetryStageWithErrorCode(t *testing.T) { @@ -445,7 +506,7 @@ func TestRetryStageWithErrorCode(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -478,7 +539,7 @@ func TestRetryStageNoResolved(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -529,7 +590,7 @@ func TestRetryStageSendResolved(t *testing.T) { }), rs: sendResolved(true), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -633,7 +694,7 @@ func TestMuteStage(t *testing.T) { return ok }) - stage := NewMuteStage(muter) + stage := NewMuteStage(muter, nil) in := []model.LabelSet{ {}, @@ -689,7 +750,7 @@ func TestMuteStageWithSilences(t *testing.T) { marker := types.NewMarker(prometheus.NewRegistry()) silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) - stage := NewMuteStage(silencer) + stage := NewMuteStage(silencer, nil) in := []model.LabelSet{ {}, @@ -767,6 +828,45 @@ func TestMuteStageWithSilences(t *testing.T) { } } +func TestMuteStageWithAlertObserver(t *testing.T) { + silences, err := silence.New(silence.Options{Retention: time.Hour}) + if err != nil { + t.Fatal(err) + } + _, err = silences.Set(&silencepb.Silence{ + EndsAt: utcNow().Add(time.Hour), + Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}}, + }) + if err != nil { + t.Fatal(err) + } + + marker := types.NewMarker(prometheus.NewRegistry()) + silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) + observer := alertobserver.NewFakeLifeCycleObserver() + stage := NewMuteStage(silencer, observer) + + in := []model.LabelSet{ + {"test": "set"}, + {"mute": "me"}, + {"foo": "bar", "test": "set"}, + } + + var inAlerts []*types.Alert + for _, lset := range in { + inAlerts = append(inAlerts, &types.Alert{ + Alert: model.Alert{Labels: lset}, + }) + } + + _, _, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertMuted])) + require.Equal(t, inAlerts[1], observer.AlertsPerEvent[alertobserver.EventAlertMuted][0]) +} + func TestTimeMuteStage(t *testing.T) { // Route mutes alerts outside business hours in November, using the +1100 timezone. muteIn := `