Skip to content

Commit

Permalink
Simplify observer AlertEventMeta struct to use map
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Sep 14, 2023
1 parent 494e304 commit a5d618e
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 39 deletions.
11 changes: 2 additions & 9 deletions alertobserver/alertobserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
package alertobserver

import (
"context"

"github.com/prometheus/alertmanager/types"
)

Expand All @@ -29,13 +27,8 @@ const (
EventAlertSendFailed string = "sendFailed"
)

type AlertEventMeta struct {
Ctx context.Context
Msg string
Integration string
StageName string
}
type AlertEventMeta map[string]interface{}

type LifeCycleObserver interface {
Observe(event string, alerts []*types.Alert, meta *AlertEventMeta)
Observe(event string, alerts []*types.Alert, meta AlertEventMeta)
}
7 changes: 5 additions & 2 deletions alertobserver/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,22 @@ import (
type FakeLifeCycleObserver struct {
AlertsPerEvent map[string][]*types.Alert
PipelineStageAlerts map[string][]*types.Alert
MetaPerEvent map[string][]AlertEventMeta
}

func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta *AlertEventMeta) {
func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) {
if event == EventAlertPipelinePassStage {
o.PipelineStageAlerts[meta.StageName] = append(o.PipelineStageAlerts[meta.StageName], alerts...)
o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...)
} else {
o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...)
}
o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta)
}

func NewFakeLifeCycleObserver() *FakeLifeCycleObserver {
return &FakeLifeCycleObserver{
PipelineStageAlerts: map[string][]*types.Alert{},
AlertsPerEvent: map[string][]*types.Alert{},
MetaPerEvent: map[string][]AlertEventMeta{},
}
}
6 changes: 3 additions & 3 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
m := &alertobserver.AlertEventMeta{Msg: err.Error()}
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
Expand All @@ -465,13 +465,13 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
err: err,
}, nil)
if api.alertLCObserver != nil {
m := &alertobserver.AlertEventMeta{Msg: err.Error()}
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return
}
if api.alertLCObserver != nil {
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, &alertobserver.AlertEventMeta{})
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
}

if validationErrs.Len() > 0 {
Expand Down
6 changes: 3 additions & 3 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
m := &alertobserver.AlertEventMeta{Msg: err.Error()}
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
Expand All @@ -365,7 +365,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
if err := api.alerts.Put(validAlerts...); err != nil {
level.Error(logger).Log("msg", "Failed to create alerts", "err", err)
if api.alertLCObserver != nil {
m := &alertobserver.AlertEventMeta{Msg: err.Error()}
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
Expand All @@ -376,7 +376,7 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
}
if api.alertLCObserver != nil {
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, &alertobserver.AlertEventMeta{})
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
}

return alert_ops.NewPostAlertsOK()
Expand Down
6 changes: 2 additions & 4 deletions dispatch/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
if ok {
ag.insert(alert)
if d.alertLCObserver != nil {
dummyCtx := notify.WithGroupKey(context.TODO(), ag.GroupKey())
d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, &alertobserver.AlertEventMeta{Ctx: dummyCtx})
d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"groupKey": ag.GroupKey()})
}
return
}
Expand All @@ -347,8 +346,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
// alert is already there.
ag.insert(alert)
if d.alertLCObserver != nil {
dummyCtx := notify.WithGroupKey(context.TODO(), ag.GroupKey())
d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, &alertobserver.AlertEventMeta{Ctx: dummyCtx})
d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"groupKey": ag.GroupKey()})
}

go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
Expand Down
3 changes: 3 additions & 0 deletions dispatch/dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ route:
}
require.Equal(t, 1, len(recorder.Alerts()))
require.Equal(t, inputAlerts[0].Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint())
groupFp := getGroupLabels(inputAlerts[0], route).Fingerprint()
groupKey := dispatcher.aggrGroupsPerRoute[route][groupFp].GroupKey()
require.Equal(t, groupKey, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupKey"].(string))
}

type recordStage struct {
Expand Down
32 changes: 16 additions & 16 deletions notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.
}

if rs.alertLCObserver != nil {
rs.alertLCObserver.Observe(alertobserver.EventAlertPipelineStart, alerts, &alertobserver.AlertEventMeta{Ctx: ctx})
rs.alertLCObserver.Observe(alertobserver.EventAlertPipelineStart, alerts, alertobserver.AlertEventMeta{"ctx": ctx})
}

return s.Exec(ctx, l, alerts...)
Expand All @@ -479,7 +479,7 @@ func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al
}
if ms.alertLCObserver != nil {
p := strings.Split(fmt.Sprintf("%T", s), ".")
ms.alertLCObserver.Observe(alertobserver.EventAlertPipelinePassStage, alerts, &alertobserver.AlertEventMeta{Ctx: ctx, StageName: p[len(p)-1]})
ms.alertLCObserver.Observe(alertobserver.EventAlertPipelinePassStage, alerts, alertobserver.AlertEventMeta{"ctx": ctx, "stageName": p[len(p)-1]})
}
}
return ctx, alerts, nil
Expand Down Expand Up @@ -825,11 +825,11 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc()
if !retry {
if r.alertLCObserver != nil {
m := &alertobserver.AlertEventMeta{
Ctx: ctx,
Msg: "Unrecoverable error",
Integration: r.integration.Name(),
StageName: "RetryStage",
m := alertobserver.AlertEventMeta{
"ctx": ctx,
"msg": "Unrecoverable error",
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m)
}
Expand All @@ -849,10 +849,10 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
lvl = level.Debug(log.With(l, "alerts", fmt.Sprintf("%v", alerts)))
}
if r.alertLCObserver != nil {
m := &alertobserver.AlertEventMeta{
Ctx: ctx,
Integration: r.integration.Name(),
StageName: "RetryStage",
m := alertobserver.AlertEventMeta{
"ctx": ctx,
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSent, alerts, m)
}
Expand All @@ -865,11 +865,11 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale
iErr = ctx.Err()
}
if r.alertLCObserver != nil {
m := &alertobserver.AlertEventMeta{
Ctx: ctx,
Msg: "Retry canceled",
Integration: r.integration.Name(),
StageName: "RetryStage",
m := alertobserver.AlertEventMeta{
"ctx": ctx,
"msg": "Retry canceled",
"integration": r.integration.Name(),
"stageName": "RetryStage",
}
r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, alerts, m)
}
Expand Down
22 changes: 20 additions & 2 deletions notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,6 @@ func TestMultiStage(t *testing.T) {
stage := MultiStage{
stages: stages,
}

_, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), alerts1...)
if err != nil {
t.Fatalf("Exec failed: %s", err)
Expand All @@ -342,7 +341,7 @@ func TestMultiStage(t *testing.T) {

// Rerun multistage but with alert life cycle observer
observer := alertobserver.NewFakeLifeCycleObserver()
ctx := context.Background()
ctx := WithGroupKey(context.Background(), "test")
stage.alertLCObserver = observer
_, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...)
if err != nil {
Expand All @@ -351,6 +350,9 @@ func TestMultiStage(t *testing.T) {

require.Equal(t, 1, len(observer.PipelineStageAlerts))
require.Equal(t, 5, len(observer.PipelineStageAlerts["StageFunc"]))
metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelinePassStage][0]["ctx"].(context.Context)
_, ok := GroupKey(metaCtx)
require.True(t, ok)
}

func TestMultiStageFailure(t *testing.T) {
Expand Down Expand Up @@ -404,6 +406,9 @@ func TestRoutingStage(t *testing.T) {
t.Fatalf("Exec failed: %s", err)
}
require.Equal(t, len(alerts1), len(observer.AlertsPerEvent[alertobserver.EventAlertPipelineStart]))
metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelineStart][0]["ctx"].(context.Context)
_, ok := ReceiverName(metaCtx)
require.True(t, ok)
}

func TestRetryStageWithError(t *testing.T) {
Expand Down Expand Up @@ -432,6 +437,7 @@ func TestRetryStageWithError(t *testing.T) {

ctx := context.Background()
ctx = WithFiringAlerts(ctx, []uint64{0})
ctx = WithGroupKey(ctx, "test")

// Notify with a recoverable error should retry and succeed.
resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...)
Expand All @@ -446,6 +452,12 @@ func TestRetryStageWithError(t *testing.T) {
_, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...)
require.Nil(t, err)
require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSent]))
meta := observer.MetaPerEvent[alertobserver.EventAlertSent][0]
require.Equal(t, "RetryStage", meta["stageName"].(string))
require.Equal(t, i.Name(), meta["integration"].(string))
metaCtx := meta["ctx"].(context.Context)
_, ok := GroupKey(metaCtx)
require.True(t, ok)

// Notify with an unrecoverable error should fail.
sent = sent[:0]
Expand All @@ -462,6 +474,12 @@ func TestRetryStageWithError(t *testing.T) {
_, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...)
require.NotNil(t, err)
require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSendFailed]))
meta = observer.MetaPerEvent[alertobserver.EventAlertSendFailed][0]
require.Equal(t, "RetryStage", meta["stageName"].(string))
require.Equal(t, i.Name(), meta["integration"].(string))
metaCtx = meta["ctx"].(context.Context)
_, ok = GroupKey(metaCtx)
require.True(t, ok)
}

func TestRetryStageWithErrorCode(t *testing.T) {
Expand Down

0 comments on commit a5d618e

Please sign in to comment.