diff --git a/alertobserver/alertobserver.go b/alertobserver/alertobserver.go index 4678d672f2..15113793db 100644 --- a/alertobserver/alertobserver.go +++ b/alertobserver/alertobserver.go @@ -15,16 +15,26 @@ package alertobserver import ( "context" - "github.com/prometheus/alertmanager/types" ) -type AlertLifeCycleObserver interface { - Received(alerts ...*types.Alert) - Rejected(reason string, alerts ...*types.Alert) - AddedAggrGroup(groupKey string, alert *types.Alert) - PipelineStart(ctx context.Context, alerts ...*types.Alert) - PipelinePassStage(ctx context.Context, stageName string, alerts ...*types.Alert) - Sent(ctx context.Context, integration string, alerts ...*types.Alert) - SendFailed(ctx context.Context, integration, reason string, alerts ...*types.Alert) +const ( + EventAlertReceived string = "received" + EventAlertRejected string = "rejected" + EventAlertAddedToAggrGroup string = "addedAggrGroup" + EventAlertPipelineStart string = "pipelineStart" + EventAlertPipelinePassStage string = "pipelinePassStage" + EventAlertSent string = "sent" + EventAlertSendFailed string = "sendFailed" +) + +type AlertEventMeta struct { + Ctx context.Context + Msg string + Integration string + StageName string +} + +type LifeCycleObserver interface { + Observe(event string, alerts []*types.Alert, meta *AlertEventMeta) } diff --git a/alertobserver/testing.go b/alertobserver/testing.go index dfd632a26f..b6bb2f85bf 100644 --- a/alertobserver/testing.go +++ b/alertobserver/testing.go @@ -14,56 +14,25 @@ package alertobserver import ( - "context" - "github.com/prometheus/alertmanager/types" ) -type FakeAlertLifeCycleObserver struct { - ReceivedAlerts []*types.Alert - RejectedAlerts []*types.Alert - SentAlerts []*types.Alert - FailedAlerts []*types.Alert - AggrGroupAlerts []*types.Alert - PipelineAlerts []*types.Alert +type FakeLifeCycleObserver struct { + AlertsPerEvent map[string][]*types.Alert PipelineStageAlerts map[string][]*types.Alert } -func (o *FakeAlertLifeCycleObserver) Received(alerts ...*types.Alert) { - o.ReceivedAlerts = append(o.ReceivedAlerts, alerts...) -} - -func (o *FakeAlertLifeCycleObserver) Rejected(reason string, alerts ...*types.Alert) { - o.RejectedAlerts = append(o.RejectedAlerts, alerts...) -} - -func (o *FakeAlertLifeCycleObserver) AddedAggrGroup(groupKey string, alert *types.Alert) { - o.AggrGroupAlerts = append(o.AggrGroupAlerts, alert) -} - -func (o *FakeAlertLifeCycleObserver) PipelineStart(ctx context.Context, alerts ...*types.Alert) { - o.PipelineAlerts = append(o.PipelineAlerts, alerts...) -} - -func (o *FakeAlertLifeCycleObserver) Sent(ctx context.Context, integration string, alerts ...*types.Alert) { - o.SentAlerts = append(o.SentAlerts, alerts...) -} - -func (o *FakeAlertLifeCycleObserver) SendFailed( - ctx context.Context, - integration string, - reason string, - alerts ...*types.Alert, -) { - o.FailedAlerts = append(o.FailedAlerts, alerts...) -} - -func (o *FakeAlertLifeCycleObserver) PipelinePassStage(ctx context.Context, stageName string, alerts ...*types.Alert) { - o.PipelineStageAlerts[stageName] = append(o.PipelineStageAlerts[stageName], alerts...) +func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta *AlertEventMeta) { + if event == EventAlertPipelinePassStage { + o.PipelineStageAlerts[meta.StageName] = append(o.PipelineStageAlerts[meta.StageName], alerts...) + } else { + o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...) + } } -func NewFakeAlertLifeCycleObserver() *FakeAlertLifeCycleObserver { - return &FakeAlertLifeCycleObserver{ +func NewFakeLifeCycleObserver() *FakeLifeCycleObserver { + return &FakeLifeCycleObserver{ PipelineStageAlerts: map[string][]*types.Alert{}, + AlertsPerEvent: map[string][]*types.Alert{}, } } diff --git a/api/api.go b/api/api.go index ba3f370132..db4b3b5337 100644 --- a/api/api.go +++ b/api/api.go @@ -77,7 +77,7 @@ type Options struct { GroupFunc func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) // AlertLCObserver is used to add hooks to the different alert life cycle events. // If nil then no observer methods will be invoked in the life cycle events. - AlertLCObserver alertobserver.AlertLifeCycleObserver + AlertLCObserver alertobserver.LifeCycleObserver } func (o Options) validate() error { diff --git a/api/v1/api.go b/api/v1/api.go index 1bb41a2f6e..62f7eb3800 100644 --- a/api/v1/api.go +++ b/api/v1/api.go @@ -76,7 +76,7 @@ type API struct { peer cluster.ClusterPeer logger log.Logger m *metrics.Alerts - alertLCObserver alertobserver.AlertLifeCycleObserver + alertLCObserver alertobserver.LifeCycleObserver getAlertStatus getAlertStatusFn @@ -93,7 +93,7 @@ func New( peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, - o alertobserver.AlertLifeCycleObserver, + o alertobserver.LifeCycleObserver, ) *API { if l == nil { l = log.NewNopLogger() @@ -452,7 +452,8 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* validationErrs.Add(err) api.m.Invalid().Inc() if api.alertLCObserver != nil { - api.alertLCObserver.Rejected(err.Error(), a) + m := &alertobserver.AlertEventMeta{Msg: err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) } continue } @@ -464,12 +465,13 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...* err: err, }, nil) if api.alertLCObserver != nil { - api.alertLCObserver.Rejected(err.Error(), validAlerts...) + m := &alertobserver.AlertEventMeta{Msg: err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) } return } if api.alertLCObserver != nil { - api.alertLCObserver.Received(validAlerts...) + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, &alertobserver.AlertEventMeta{}) } if validationErrs.Len() > 0 { diff --git a/api/v1/api_test.go b/api/v1/api_test.go index e16ffa6215..8310c78164 100644 --- a/api/v1/api_test.go +++ b/api/v1/api_test.go @@ -155,7 +155,7 @@ func TestAddAlerts(t *testing.T) { require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body))) - observer := alertobserver.NewFakeAlertLifeCycleObserver() + observer := alertobserver.NewFakeLifeCycleObserver() api.alertLCObserver = observer r, err = http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) w = httptest.NewRecorder() @@ -164,9 +164,9 @@ func TestAddAlerts(t *testing.T) { } api.addAlerts(w, r) if tc.code == 200 { - require.Equal(t, observer.ReceivedAlerts[0].Fingerprint(), alerts[0].Fingerprint()) + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint()) } else { - require.Equal(t, observer.RejectedAlerts[0].Fingerprint(), alerts[0].Fingerprint()) + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint()) } } } @@ -197,7 +197,7 @@ func TestAddAlertsWithAlertLCObserver(t *testing.T) { } alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) - observer := alertobserver.NewFakeAlertLifeCycleObserver() + observer := alertobserver.NewFakeLifeCycleObserver() api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, observer) defaultGlobalConfig := config.DefaultGlobalConfig() route := config.Route{} @@ -218,9 +218,9 @@ func TestAddAlertsWithAlertLCObserver(t *testing.T) { require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body))) if tc.code == 200 { - require.Equal(t, observer.ReceivedAlerts[0].Fingerprint(), alerts[0].Fingerprint()) + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint()) } else { - require.Equal(t, observer.RejectedAlerts[0].Fingerprint(), alerts[0].Fingerprint()) + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint()) } } } diff --git a/api/v2/api.go b/api/v2/api.go index c67fe420ac..9190c38492 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -71,7 +71,7 @@ type API struct { logger log.Logger m *metrics.Alerts - alertLCObserver alertobserver.AlertLifeCycleObserver + alertLCObserver alertobserver.LifeCycleObserver Handler http.Handler } @@ -91,7 +91,7 @@ func NewAPI( peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, - o alertobserver.AlertLifeCycleObserver, + o alertobserver.LifeCycleObserver, ) (*API, error) { api := API{ alerts: alerts, @@ -355,7 +355,8 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. validationErrs.Add(err) api.m.Invalid().Inc() if api.alertLCObserver != nil { - api.alertLCObserver.Rejected(err.Error(), a) + m := &alertobserver.AlertEventMeta{Msg: err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) } continue } @@ -364,7 +365,8 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. if err := api.alerts.Put(validAlerts...); err != nil { level.Error(logger).Log("msg", "Failed to create alerts", "err", err) if api.alertLCObserver != nil { - api.alertLCObserver.Rejected(err.Error(), validAlerts...) + m := &alertobserver.AlertEventMeta{Msg: err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) } return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error()) } @@ -374,7 +376,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error()) } if api.alertLCObserver != nil { - api.alertLCObserver.Received(validAlerts...) + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, &alertobserver.AlertEventMeta{}) } return alert_ops.NewPostAlertsOK() diff --git a/api/v2/api_test.go b/api/v2/api_test.go index ed1f0d2b6c..96241e9c46 100644 --- a/api/v2/api_test.go +++ b/api/v2/api_test.go @@ -558,7 +558,7 @@ func TestPostAlertHandler(t *testing.T) { require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body))) - observer := alertobserver.NewFakeAlertLifeCycleObserver() + observer := alertobserver.NewFakeLifeCycleObserver() api.alertLCObserver = observer r, err = http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) require.NoError(t, err) @@ -568,9 +568,9 @@ func TestPostAlertHandler(t *testing.T) { }) amAlert := OpenAPIAlertsToAlerts(alerts) if tc.code == 200 { - require.Equal(t, observer.ReceivedAlerts[0].Fingerprint(), amAlert[0].Fingerprint()) + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), amAlert[0].Fingerprint()) } else { - require.Equal(t, observer.RejectedAlerts[0].Fingerprint(), amAlert[0].Fingerprint()) + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), amAlert[0].Fingerprint()) } } } diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 4ee250eda8..17c0ac72a8 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -492,6 +492,7 @@ func run() int { timeIntervals, notificationLog, pipelinePeer, + nil, ) configuredReceivers.Set(float64(len(activeReceivers))) configuredIntegrations.Set(float64(integrationsNum)) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index adc333eeb7..cc9736cbee 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -92,7 +92,7 @@ type Dispatcher struct { cancel func() logger log.Logger - alertLCObserver alertobserver.AlertLifeCycleObserver + alertLCObserver alertobserver.LifeCycleObserver } // Limits describes limits used by Dispatcher. @@ -113,7 +113,7 @@ func NewDispatcher( lim Limits, l log.Logger, m *DispatcherMetrics, - o alertobserver.AlertLifeCycleObserver, + o alertobserver.LifeCycleObserver, ) *Dispatcher { if lim == nil { lim = nilLimits{} @@ -140,11 +140,7 @@ func (d *Dispatcher) Run() { d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{} d.aggrGroupsNum = 0 d.metrics.aggrGroups.Set(0) - ctx := context.Background() - if d.alertLCObserver != nil { - ctx = notify.WithAlertLCObserver(ctx, d.alertLCObserver) - } - d.ctx, d.cancel = context.WithCancel(ctx) + d.ctx, d.cancel = context.WithCancel(context.Background()) d.mtx.Unlock() d.run(d.alerts.Subscribe()) @@ -328,7 +324,8 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { if ok { ag.insert(alert) if d.alertLCObserver != nil { - d.alertLCObserver.AddedAggrGroup(ag.GroupKey(), alert) + dummyCtx := notify.WithGroupKey(context.TODO(), ag.GroupKey()) + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, &alertobserver.AlertEventMeta{Ctx: dummyCtx}) } return } @@ -350,7 +347,8 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { // alert is already there. ag.insert(alert) if d.alertLCObserver != nil { - d.alertLCObserver.AddedAggrGroup(ag.GroupKey(), alert) + dummyCtx := notify.WithGroupKey(context.TODO(), ag.GroupKey()) + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, &alertobserver.AlertEventMeta{Ctx: dummyCtx}) } go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool { diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 57e20d246e..76c23d641e 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -595,7 +595,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} m := NewDispatcherMetrics(true, prometheus.NewRegistry()) - observer := alertobserver.NewFakeAlertLifeCycleObserver() + observer := alertobserver.NewFakeLifeCycleObserver() dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, m, observer) go dispatcher.Run() defer dispatcher.Stop() @@ -613,10 +613,7 @@ route: time.Sleep(200 * time.Millisecond) } require.Equal(t, 1, len(recorder.Alerts())) - require.Equal(t, inputAlerts[0].Fingerprint(), observer.AggrGroupAlerts[0].Fingerprint()) - o, ok := notify.AlertLCObserver(dispatcher.ctx) - require.True(t, ok) - require.Equal(t, observer, o) + require.Equal(t, inputAlerts[0].Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint()) } type recordStage struct { diff --git a/notify/notify.go b/notify/notify.go index 3e0299204b..ae9407cacc 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -17,6 +17,7 @@ import ( "context" "fmt" "sort" + "strings" "sync" "time" @@ -117,7 +118,6 @@ const ( keyNow keyMuteTimeIntervals keyActiveTimeIntervals - keyAlertLCObserver ) // WithReceiverName populates a context with a receiver name. @@ -164,11 +164,6 @@ func WithActiveTimeIntervals(ctx context.Context, at []string) context.Context { return context.WithValue(ctx, keyActiveTimeIntervals, at) } -// WithAlertLCObserver populates a contex with an alert life cycle observer -func WithAlertLCObserver(ctx context.Context, o alertobserver.AlertLifeCycleObserver) context.Context { - return context.WithValue(ctx, keyAlertLCObserver, o) -} - // RepeatInterval extracts a repeat interval from the context. Iff none exists, the // second argument is false. func RepeatInterval(ctx context.Context) (time.Duration, bool) { @@ -232,13 +227,6 @@ func ActiveTimeIntervalNames(ctx context.Context) ([]string, bool) { return v, ok } -// AlertLCObserver extracts an alert life cycle observer from the context. -// Iff none exists, the second argument is false. -func AlertLCObserver(ctx context.Context) (alertobserver.AlertLifeCycleObserver, bool) { - v, ok := ctx.Value(keyAlertLCObserver).(alertobserver.AlertLifeCycleObserver) - return v, ok -} - // A Stage processes alerts under the constraints of the given context. type Stage interface { Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) @@ -346,8 +334,12 @@ func (pb *PipelineBuilder) New( times map[string][]timeinterval.TimeInterval, notificationLog NotificationLog, peer Peer, + o alertobserver.LifeCycleObserver, ) RoutingStage { - rs := make(RoutingStage, len(receivers)) + rs := RoutingStage{ + stages: make(map[string]Stage, len(receivers)), + alertLCObserver: o, + } ms := NewGossipSettleStage(peer) is := NewMuteStage(inhibitor) @@ -356,8 +348,11 @@ func (pb *PipelineBuilder) New( ss := NewMuteStage(silencer) for name := range receivers { - st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) - rs[name] = MultiStage{ms, is, tas, tms, ss, st} + st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics, o) + rs.stages[name] = MultiStage{ + alertLCObserver: o, + stages: []Stage{ms, is, tas, tms, ss, st}, + } } return rs } @@ -369,6 +364,7 @@ func createReceiverStage( wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, + o alertobserver.LifeCycleObserver, ) Stage { var fs FanoutStage for i := range integrations { @@ -377,20 +373,23 @@ func createReceiverStage( Integration: integrations[i].Name(), Idx: uint32(integrations[i].Index()), } - var s MultiStage + var s []Stage s = append(s, NewWaitStage(wait)) s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) - s = append(s, NewRetryStage(integrations[i], name, metrics)) + s = append(s, NewRetryStage(integrations[i], name, metrics, o)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) - fs = append(fs, s) + fs = append(fs, MultiStage{stages: s, alertLCObserver: o}) } return fs } // RoutingStage executes the inner stages based on the receiver specified in // the context. -type RoutingStage map[string]Stage +type RoutingStage struct { + stages map[string]Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { @@ -399,27 +398,28 @@ func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types. return ctx, nil, errors.New("receiver missing") } - s, ok := rs[receiver] + s, ok := rs.stages[receiver] if !ok { return ctx, nil, errors.New("stage for receiver missing") } - o, ok := AlertLCObserver(ctx) - if ok { - o.PipelineStart(ctx, alerts...) + if rs.alertLCObserver != nil { + rs.alertLCObserver.Observe(alertobserver.EventAlertPipelineStart, alerts, &alertobserver.AlertEventMeta{Ctx: ctx}) } return s.Exec(ctx, l, alerts...) } // A MultiStage executes a series of stages sequentially. -type MultiStage []Stage +type MultiStage struct { + stages []Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var err error - observer, hasObserver := AlertLCObserver(ctx) - for _, s := range ms { + for _, s := range ms.stages { if len(alerts) == 0 { return ctx, nil, nil } @@ -428,8 +428,9 @@ func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al if err != nil { return ctx, nil, err } - if hasObserver { - observer.PipelinePassStage(ctx, fmt.Sprintf("%T", s), alerts...) + if ms.alertLCObserver != nil { + p := strings.Split(fmt.Sprintf("%T", s), ".") + ms.alertLCObserver.Observe(alertobserver.EventAlertPipelinePassStage, alerts, &alertobserver.AlertEventMeta{Ctx: ctx, StageName: p[len(p)-1]}) } } return ctx, alerts, nil @@ -675,17 +676,19 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al // RetryStage notifies via passed integration with exponential backoff until it // succeeds. It aborts if the context is canceled or timed out. type RetryStage struct { - integration Integration - groupName string - metrics *Metrics + integration Integration + groupName string + metrics *Metrics + alertLCObserver alertobserver.LifeCycleObserver } // NewRetryStage returns a new instance of a RetryStage. -func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStage { +func NewRetryStage(i Integration, groupName string, metrics *Metrics, o alertobserver.LifeCycleObserver) *RetryStage { return &RetryStage{ - integration: i, - groupName: groupName, - metrics: metrics, + integration: i, + groupName: groupName, + metrics: metrics, + alertLCObserver: o, } } @@ -741,7 +744,6 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if groupKey, ok := GroupKey(ctx); ok { l = log.With(l, "aggrGroup", groupKey) } - observer, hasObserver := AlertLCObserver(ctx) for { i++ @@ -765,8 +767,14 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if err != nil { r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.integration.Name()).Inc() if !retry { - if hasObserver { - observer.SendFailed(ctx, r.integration.Name(), "Unrecoverable error", alerts...) + if r.alertLCObserver != nil { + m := &alertobserver.AlertEventMeta{ + Ctx: ctx, + Msg: "Unrecoverable error", + Integration: r.integration.Name(), + StageName: "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m) } return ctx, alerts, errors.Wrapf(err, "%s/%s: notify retry canceled due to unrecoverable error after %d attempts", r.groupName, r.integration.String(), i) } @@ -783,8 +791,13 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if i <= 1 { lvl = level.Debug(log.With(l, "alerts", fmt.Sprintf("%v", alerts))) } - if hasObserver { - observer.Sent(ctx, r.integration.Name(), alerts...) + if r.alertLCObserver != nil { + m := &alertobserver.AlertEventMeta{ + Ctx: ctx, + Integration: r.integration.Name(), + StageName: "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSent, alerts, m) } lvl.Log("msg", "Notify success", "attempts", i) @@ -794,8 +807,14 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if iErr == nil { iErr = ctx.Err() } - if hasObserver { - observer.SendFailed(ctx, r.integration.Name(), "Retry canceled", alerts...) + if r.alertLCObserver != nil { + m := &alertobserver.AlertEventMeta{ + Ctx: ctx, + Msg: "Retry canceled", + Integration: r.integration.Name(), + StageName: "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m) } return ctx, nil, errors.Wrapf(iErr, "%s/%s: notify retry canceled after %d attempts", r.groupName, r.integration.String(), i) } diff --git a/notify/notify_test.go b/notify/notify_test.go index 078f4d7820..5640abbe20 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -306,7 +306,7 @@ func TestMultiStage(t *testing.T) { alerts3 = []*types.Alert{{}, {}, {}} ) - stage := MultiStage{ + stages := []Stage{ StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of MultiStage") @@ -326,6 +326,9 @@ func TestMultiStage(t *testing.T) { return ctx, alerts3, nil }), } + stage := MultiStage{ + stages: stages, + } _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), alerts1...) if err != nil { @@ -337,23 +340,23 @@ func TestMultiStage(t *testing.T) { } // Rerun multistage but with alert life cycle observer - observer := alertobserver.NewFakeAlertLifeCycleObserver() + observer := alertobserver.NewFakeLifeCycleObserver() ctx := context.Background() - ctx = WithAlertLCObserver(ctx, observer) + stage.alertLCObserver = observer _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) if err != nil { t.Fatalf("Exec failed: %s", err) } require.Equal(t, 1, len(observer.PipelineStageAlerts)) - require.Equal(t, 5, len(observer.PipelineStageAlerts["notify.StageFunc"])) + require.Equal(t, 5, len(observer.PipelineStageAlerts["StageFunc"])) } func TestMultiStageFailure(t *testing.T) { var ( ctx = context.Background() s1 = failStage{} - stage = MultiStage{s1} + stage = MultiStage{stages: []Stage{s1}} ) _, _, err := stage.Exec(ctx, log.NewNopLogger(), nil) @@ -368,7 +371,7 @@ func TestRoutingStage(t *testing.T) { alerts2 = []*types.Alert{{}, {}} ) - stage := RoutingStage{ + s := map[string]Stage{ "name": StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of RoutingStage") @@ -377,6 +380,9 @@ func TestRoutingStage(t *testing.T) { }), "not": failStage{}, } + stage := RoutingStage{ + stages: s, + } ctx := WithReceiverName(context.Background(), "name") @@ -390,13 +396,13 @@ func TestRoutingStage(t *testing.T) { } // Rerun RoutingStage but with alert life cycle observer - observer := alertobserver.NewFakeAlertLifeCycleObserver() - ctx = WithAlertLCObserver(ctx, observer) + observer := alertobserver.NewFakeLifeCycleObserver() + stage.alertLCObserver = observer _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) if err != nil { t.Fatalf("Exec failed: %s", err) } - require.Equal(t, len(alerts1), len(observer.PipelineAlerts)) + require.Equal(t, len(alerts1), len(observer.AlertsPerEvent[alertobserver.EventAlertPipelineStart])) } func TestRetryStageWithError(t *testing.T) { @@ -437,24 +443,27 @@ func TestRetryStageWithError(t *testing.T) { require.NotNil(t, resctx) // Rerun recoverable error but with alert life cycle observer - observer := alertobserver.NewFakeAlertLifeCycleObserver() - _, _, err = r.Exec(WithAlertLCObserver(ctx, observer), log.NewNopLogger(), alerts...) + observer := alertobserver.NewFakeLifeCycleObserver() + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) require.Nil(t, err) - require.Equal(t, len(alerts), len(observer.SentAlerts)) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSent])) // Notify with an unrecoverable error should fail. sent = sent[:0] fail = true retry = false + r.alertLCObserver = nil resctx, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) require.NotNil(t, err) require.NotNil(t, resctx) // Rerun the unrecoverable error but with alert life cycle observer fail = true - _, _, err = r.Exec(WithAlertLCObserver(ctx, observer), log.NewNopLogger(), alerts...) + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) require.NotNil(t, err) - require.Equal(t, len(alerts), len(observer.FailedAlerts)) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSendFailed])) } func TestRetryStageWithErrorCode(t *testing.T) {