diff --git a/pkg/alertmanager/alert_observer.go b/pkg/alertmanager/alert_observer.go new file mode 100644 index 00000000000..4533daf9e53 --- /dev/null +++ b/pkg/alertmanager/alert_observer.go @@ -0,0 +1,225 @@ +package alertmanager + +import ( + "context" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/alertmanager/alertobserver" + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/types" +) + +type ObserverLimits interface { + AlertmanagerAlertLifeCycleObserverLevel(tenant string) int +} + +type AlertLifeCycleObserverLimiter struct { + limits ObserverLimits + tenant string +} + +func NewAlertLifeCycleObserverLimiter(tenant string, limits ObserverLimits) *AlertLifeCycleObserverLimiter { + return &AlertLifeCycleObserverLimiter{ + tenant: tenant, + limits: limits, + } +} + +func (a *AlertLifeCycleObserverLimiter) level() int { + return a.limits.AlertmanagerAlertLifeCycleObserverLevel(a.tenant) +} + +type LogAlertLifeCycleObserver struct { + logger log.Logger + limiter *AlertLifeCycleObserverLimiter +} + +func NewLogAlertLifeCycleObserver(logger log.Logger, user string, limiter *AlertLifeCycleObserverLimiter) *LogAlertLifeCycleObserver { + logger = log.With(logger, "user", user) + logger = log.With(logger, "component", "observer") + return &LogAlertLifeCycleObserver{ + logger: logger, + limiter: limiter, + } +} + +// Observe implements LifeCycleObserver +func (o *LogAlertLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta alertobserver.AlertEventMeta) { + if alerts == nil || o.limiter == nil || o.limiter.level() <= 0 { + return + } + switch event { + case alertobserver.EventAlertReceived: + o.Received(alerts) + case alertobserver.EventAlertRejected: + o.Rejected(alerts, meta) + case alertobserver.EventAlertAddedToAggrGroup: + o.AddedAggrGroup(alerts, meta) + case alertobserver.EventAlertFailedAddToAggrGroup: + o.FailedAddToAggrGroup(alerts, meta) + case alertobserver.EventAlertPipelineStart: + o.PipelineStart(alerts, meta) + case alertobserver.EventAlertPipelinePassStage: + o.PipelinePassStage(meta) + case alertobserver.EventAlertSent: + o.Sent(alerts, meta) + case alertobserver.EventAlertSendFailed: + o.SendFailed(alerts, meta) + case alertobserver.EventAlertMuted: + o.Muted(alerts, meta) + } +} + +func (o *LogAlertLifeCycleObserver) Received(alerts []*types.Alert) { + for _, a := range alerts { + o.logWithAlert(a, true, "msg", "Received") + } +} + +func (o *LogAlertLifeCycleObserver) Rejected(alerts []*types.Alert, meta alertobserver.AlertEventMeta) { + reason, ok := meta["msg"] + if !ok { + reason = "Unknown" + } + for _, a := range alerts { + o.logWithAlert(a, true, "msg", "Rejected", "reason", reason) + } +} + +func (o *LogAlertLifeCycleObserver) AddedAggrGroup(alerts []*types.Alert, meta alertobserver.AlertEventMeta) { + groupKey, ok := meta["groupKey"] + if !ok { + return + } + for _, a := range alerts { + o.logWithAlert(a, true, "msg", "Added to aggregation group", "groupKey", groupKey) + } +} + +func (o *LogAlertLifeCycleObserver) FailedAddToAggrGroup(alerts []*types.Alert, meta alertobserver.AlertEventMeta) { + reason, ok := meta["msg"] + if !ok { + reason = "Unknown" + } + for _, a := range alerts { + o.logWithAlert(a, true, "msg", "Failed to add aggregation group", "reason", reason) + } +} + +func (o *LogAlertLifeCycleObserver) PipelineStart(alerts []*types.Alert, meta alertobserver.AlertEventMeta) { + ctx, ok := meta["ctx"] + if !ok { + return + } + receiver, ok := notify.ReceiverName(ctx.(context.Context)) + if !ok { + return + } + groupKey, ok := notify.GroupKey(ctx.(context.Context)) + if !ok { + return + } + for _, a := range alerts { + o.logWithAlert(a, false, "msg", "Entered the pipeline", "groupKey", groupKey, "receiver", receiver) + } +} + +func (o *LogAlertLifeCycleObserver) PipelinePassStage(meta alertobserver.AlertEventMeta) { + stageName, ok := meta["stageName"] + if !ok { + return + } + if stageName == "FanoutStage" { + // Fanout stage is just a collection of stages, so we don't really need to log it. We know if the pipeline + // enters the Fanout stage based on the logs of its substages + return + } + ctx, ok := meta["ctx"] + if !ok { + return + } + receiver, ok := notify.ReceiverName(ctx.(context.Context)) + if !ok { + return + } + groupKey, ok := notify.GroupKey(ctx.(context.Context)) + if !ok { + return + } + level.Info(o.logger).Log("msg", "Passed stage", "groupKey", groupKey, "receiver", receiver, "stage", stageName) +} + +func (o *LogAlertLifeCycleObserver) Sent(alerts []*types.Alert, meta alertobserver.AlertEventMeta) { + ctx, ok := meta["ctx"] + if !ok { + return + } + integration, ok := meta["integration"] + if !ok { + return + } + receiver, ok := notify.ReceiverName(ctx.(context.Context)) + if !ok { + return + } + groupKey, ok := notify.GroupKey(ctx.(context.Context)) + if !ok { + return + } + for _, a := range alerts { + o.logWithAlert(a, false, "msg", "Sent", "groupKey", groupKey, "receiver", receiver, "integration", integration) + } +} + +func (o *LogAlertLifeCycleObserver) SendFailed(alerts []*types.Alert, meta alertobserver.AlertEventMeta) { + ctx, ok := meta["ctx"] + if !ok { + return + } + integration, ok := meta["integration"] + if !ok { + return + } + receiver, ok := notify.ReceiverName(ctx.(context.Context)) + if !ok { + return + } + groupKey, ok := notify.GroupKey(ctx.(context.Context)) + if !ok { + return + } + for _, a := range alerts { + o.logWithAlert(a, false, "msg", "Send failed", "groupKey", groupKey, "receiver", receiver, "integration", integration) + } +} + +func (o *LogAlertLifeCycleObserver) Muted(alerts []*types.Alert, meta alertobserver.AlertEventMeta) { + ctx, ok := meta["ctx"] + if !ok { + return + } + groupKey, ok := notify.GroupKey(ctx.(context.Context)) + if !ok { + return + } + for _, a := range alerts { + o.logWithAlert(a, false, "msg", "Muted", "groupKey", groupKey) + } +} + +func (o *LogAlertLifeCycleObserver) logWithAlert(alert *types.Alert, addLabels bool, keyvals ...interface{}) { + keyvals = append( + keyvals, + "fingerprint", + alert.Fingerprint().String(), + "start", + alert.StartsAt.Unix(), + "end", + alert.EndsAt.Unix(), + ) + if addLabels { + keyvals = append(keyvals, "labels", alert.Labels.String()) + } + level.Info(o.logger).Log(keyvals...) +} diff --git a/pkg/alertmanager/alert_observer_test.go b/pkg/alertmanager/alert_observer_test.go new file mode 100644 index 00000000000..5292187594e --- /dev/null +++ b/pkg/alertmanager/alert_observer_test.go @@ -0,0 +1,196 @@ +package alertmanager + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/prometheus/alertmanager/alertobserver" + "github.com/prometheus/alertmanager/notify" + "github.com/prometheus/alertmanager/types" + "github.com/stretchr/testify/assert" +) + +func TestAlertLifeCycleObserverLimiter(t *testing.T) { + tenant := "fake" + lim := limits{ + tenant: tenant, + limit: 2, + } + limiter := NewAlertLifeCycleObserverLimiter(tenant, lim) + assert.Equal(t, 2, limiter.level()) +} + +type limits struct { + tenant string + limit int +} + +func (l limits) AlertmanagerAlertLifeCycleObserverLevel(tenant string) int { + if tenant == l.tenant { + return l.limit + } + return 0 +} + +func TestLogAlertLifeCycleObserver(t *testing.T) { + logger := &FakeLogger{} + alerts := []*types.Alert{&types.Alert{}, &types.Alert{}} + ctx := context.Background() + ctx = notify.WithReceiverName(ctx, "rcv") + ctx = notify.WithGroupKey(ctx, "key") + + for _, tc := range []struct { + event string + logLvl int + alerts []*types.Alert + meta alertobserver.AlertEventMeta + expectedMsg string + expectedLogCount int + expectedLoggedKeys []string + expectedMissingKeys []string + }{ + { + event: alertobserver.EventAlertReceived, + alerts: alerts, + meta: alertobserver.AlertEventMeta{}, + logLvl: 0, + expectedLogCount: 0, + }, + { + event: alertobserver.EventAlertReceived, + alerts: alerts, + meta: alertobserver.AlertEventMeta{}, + logLvl: 1000, + expectedLogCount: 2, + expectedMsg: "Received", + expectedLoggedKeys: []string{"labels", "fingerprint"}, + }, + { + event: alertobserver.EventAlertRejected, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"msg": "test"}, + logLvl: 1000, + expectedLogCount: 2, + expectedMsg: "Rejected", + expectedLoggedKeys: []string{"reason", "labels", "fingerprint"}, + }, + { + event: alertobserver.EventAlertAddedToAggrGroup, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"groupKey": "test"}, + logLvl: 1000, + expectedLogCount: 2, + expectedMsg: "Added to aggregation group", + expectedLoggedKeys: []string{"groupKey", "labels", "fingerprint"}, + }, + { + event: alertobserver.EventAlertFailedAddToAggrGroup, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"groupKey": "test"}, + logLvl: 1000, + expectedLogCount: 2, + expectedMsg: "Failed to add aggregation group", + expectedLoggedKeys: []string{"reason", "labels", "fingerprint"}, + }, + { + event: alertobserver.EventAlertPipelineStart, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"ctx": ctx}, + logLvl: 1000, + expectedLogCount: 2, + expectedMsg: "Entered the pipeline", + expectedLoggedKeys: []string{"groupKey", "receiver", "fingerprint"}, + expectedMissingKeys: []string{"labels"}, + }, + { + event: alertobserver.EventAlertPipelinePassStage, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"ctx": ctx, "stageName": "FanoutStage"}, + logLvl: 1000, + expectedLogCount: 0, + }, + { + event: alertobserver.EventAlertPipelinePassStage, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"ctx": ctx, "stageName": "Notify"}, + logLvl: 1000, + expectedLogCount: 1, + expectedMsg: "Passed stage", + expectedLoggedKeys: []string{"groupKey", "receiver", "stage"}, + expectedMissingKeys: []string{"fingerprint"}, + }, + { + event: alertobserver.EventAlertSent, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"ctx": ctx, "integration": "sns"}, + logLvl: 1000, + expectedLogCount: 2, + expectedMsg: "Sent", + expectedLoggedKeys: []string{"groupKey", "receiver", "fingerprint"}, + expectedMissingKeys: []string{"labels"}, + }, + { + event: alertobserver.EventAlertSendFailed, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"ctx": ctx, "integration": "sns"}, + logLvl: 1000, + expectedLogCount: 2, + expectedMsg: "Send failed", + expectedLoggedKeys: []string{"groupKey", "receiver", "fingerprint"}, + expectedMissingKeys: []string{"labels"}, + }, + { + event: alertobserver.EventAlertMuted, + alerts: alerts, + meta: alertobserver.AlertEventMeta{"ctx": ctx}, + logLvl: 1000, + expectedLogCount: 2, + expectedMsg: "Muted", + expectedLoggedKeys: []string{"groupKey", "fingerprint"}, + expectedMissingKeys: []string{"labels"}, + }, + } { + tc := tc + logger.clear() + l := NewAlertLifeCycleObserverLimiter("fake", limits{tenant: "fake", limit: tc.logLvl}) + o := NewLogAlertLifeCycleObserver(logger, "fake", l) + o.Observe(tc.event, tc.alerts, tc.meta) + assert.Equal(t, tc.expectedLogCount, len(logger.loggedValues)) + for i := 0; i < tc.expectedLogCount; i++ { + loggedValues := logger.loggedValues[i] + assert.Equal(t, tc.expectedMsg, loggedValues["msg"]) + for _, v := range tc.expectedLoggedKeys { + _, ok := loggedValues[v] + assert.True(t, ok, fmt.Sprintf("'%v' is missing from the log", v)) + } + for _, v := range tc.expectedMissingKeys { + _, ok := loggedValues[v] + assert.False(t, ok, fmt.Sprintf("'%v' should be excluded from the log", v)) + } + } + } +} + +type FakeLogger struct { + loggedValues []map[string]string + mtx sync.RWMutex +} + +func (l *FakeLogger) Log(keyvals ...interface{}) error { + l.mtx.Lock() + defer l.mtx.Unlock() + params := make(map[string]string) + for i, v := range keyvals { + if i%2 == 0 { + params[fmt.Sprintf("%v", v)] = fmt.Sprintf("%v", keyvals[i+1]) + } + } + l.loggedValues = append(l.loggedValues, params) + return nil +} + +func (l *FakeLogger) clear() { + l.loggedValues = l.loggedValues[:0] +} diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go index d67f26eefdc..d92d9b8fd5c 100644 --- a/pkg/alertmanager/alertmanager.go +++ b/pkg/alertmanager/alertmanager.go @@ -16,11 +16,13 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/api" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/dispatch" + "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" "github.com/prometheus/alertmanager/notify" @@ -120,6 +122,8 @@ type Alertmanager struct { configHashMetric prometheus.Gauge rateLimitedNotifications *prometheus.CounterVec + + alertLCObserver alertobserver.LifeCycleObserver } var ( @@ -155,7 +159,7 @@ type Replicator interface { } // New creates a new Alertmanager. -func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { +func New(cfg *Config, reg *prometheus.Registry, o alertobserver.LifeCycleObserver) (*Alertmanager, error) { if cfg.TenantDataDir == "" { return nil, fmt.Errorf("directory for tenant-specific AlertManager is not configured") } @@ -174,6 +178,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { Help: "Number of rate-limited notifications per integration.", }, []string{"integration"}), // "integration" is consistent with other alertmanager metrics. + alertLCObserver: o, } am.registry = reg @@ -243,7 +248,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { } } - am.pipelineBuilder = notify.NewPipelineBuilder(am.registry) + am.pipelineBuilder = notify.NewPipelineBuilder(am.registry, featurecontrol.NoopFlags{}) am.wg.Add(1) go func() { @@ -253,7 +258,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { var callback mem.AlertStoreCallback if am.cfg.Limits != nil { - callback = newAlertsLimiter(am.cfg.UserID, am.cfg.Limits, reg) + callback = newAlertsLimiter(am.cfg.UserID, am.cfg.Limits, reg, o) } am.alerts, err = mem.NewAlerts(context.Background(), am.marker, am.cfg.GCInterval, callback, am.logger, am.registry) if err != nil { @@ -271,7 +276,8 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) { GroupFunc: func(f1 func(*dispatch.Route) bool, f2 func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) { return am.dispatcher.Groups(f1, f2) }, - Concurrency: am.cfg.APIConcurrency, + Concurrency: am.cfg.APIConcurrency, + AlertLCObserver: am.alertLCObserver, }) if err != nil { return nil, fmt.Errorf("failed to create api: %v", err) @@ -393,6 +399,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s timeIntervals, am.nflog, am.state, + am.alertLCObserver, ) am.lastPipeline = pipeline am.dispatcher = dispatch.NewDispatcher( @@ -404,6 +411,7 @@ func (am *Alertmanager) ApplyConfig(userID string, conf *config.Config, rawCfg s &dispatcherLimits{tenant: am.cfg.UserID, limits: am.cfg.Limits}, log.With(am.logger, "component", "dispatcher"), am.dispatcherMetrics, + am.alertLCObserver, ) go am.dispatcher.Run() @@ -495,7 +503,7 @@ func buildReceiverIntegrations(nc config.Receiver, tmpl *template.Template, fire return } n = wrapper(name, n) - integrations = append(integrations, notify.NewIntegration(n, rs, name, i)) + integrations = append(integrations, notify.NewIntegration(n, rs, name, i, nc.Name)) } ) @@ -637,9 +645,11 @@ type alertsLimiter struct { sizes map[model.Fingerprint]int count int totalSize int + + alertLCObserver alertobserver.LifeCycleObserver } -func newAlertsLimiter(tenant string, limits Limits, reg prometheus.Registerer) *alertsLimiter { +func newAlertsLimiter(tenant string, limits Limits, reg prometheus.Registerer, o alertobserver.LifeCycleObserver) *alertsLimiter { limiter := &alertsLimiter{ tenant: tenant, limits: limits, @@ -648,6 +658,7 @@ func newAlertsLimiter(tenant string, limits Limits, reg prometheus.Registerer) * Name: "alertmanager_alerts_insert_limited_total", Help: "Number of failures to insert new alerts to in-memory alert store.", }), + alertLCObserver: o, } promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ @@ -686,6 +697,10 @@ func (a *alertsLimiter) PreStore(alert *types.Alert, existing bool) error { if !existing && countLimit > 0 && (a.count+1) > countLimit { a.failureCounter.Inc() + if a.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": "count limit"} + a.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{alert}, m) + } return fmt.Errorf(errTooManyAlerts, countLimit, alert.Name()) } @@ -695,6 +710,10 @@ func (a *alertsLimiter) PreStore(alert *types.Alert, existing bool) error { if sizeLimit > 0 && (a.totalSize+sizeDiff) > sizeLimit { a.failureCounter.Inc() + if a.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": "size limit"} + a.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{alert}, m) + } return fmt.Errorf(errAlertsTooBig, sizeLimit) } diff --git a/pkg/alertmanager/alertmanager_test.go b/pkg/alertmanager/alertmanager_test.go index 6859fb50867..362d48168b2 100644 --- a/pkg/alertmanager/alertmanager_test.go +++ b/pkg/alertmanager/alertmanager_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/types" "github.com/prometheus/client_golang/prometheus" @@ -39,31 +40,9 @@ func createAlertmanagerAndSendAlerts(t *testing.T, alertGroups, groupsLimit, exp user := "test" reg := prometheus.NewPedanticRegistry() - am, err := New(&Config{ - UserID: user, - Logger: log.NewNopLogger(), - Limits: &mockAlertManagerLimits{maxDispatcherAggregationGroups: groupsLimit}, - TenantDataDir: t.TempDir(), - ExternalURL: &url.URL{Path: "/am"}, - ShardingEnabled: false, - GCInterval: 30 * time.Minute, - }, reg) - require.NoError(t, err) + am := createAlertmanager(t, user, reg, groupsLimit, nil) defer am.StopAndWait() - cfgRaw := `receivers: -- name: 'prod' - -route: - group_by: ['alertname'] - group_wait: 10ms - group_interval: 10ms - receiver: 'prod'` - - cfg, err := config.Load(cfgRaw) - require.NoError(t, err) - require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw)) - now := time.Now() for i := 0; i < alertGroups; i++ { @@ -113,6 +92,33 @@ route: }) } +func createAlertmanager(t *testing.T, user string, reg *prometheus.Registry, groupsLimit int, o alertobserver.LifeCycleObserver) *Alertmanager { + am, err := New(&Config{ + UserID: user, + Logger: log.NewNopLogger(), + Limits: &mockAlertManagerLimits{maxDispatcherAggregationGroups: groupsLimit}, + TenantDataDir: t.TempDir(), + ExternalURL: &url.URL{Path: "/am"}, + ShardingEnabled: false, + GCInterval: 30 * time.Minute, + }, reg, o) + require.NoError(t, err) + + cfgRaw := `receivers: +- name: 'prod' + +route: + group_by: ['alertname'] + group_wait: 10ms + group_interval: 10ms + receiver: 'prod'` + + cfg, err := config.Load(cfgRaw) + require.NoError(t, err) + require.NoError(t, am.ApplyConfig(user, cfg, cfgRaw)) + return am +} + var ( alert1 = model.Alert{ Labels: model.LabelSet{"alert": "first", "alertname": "alert1"}, @@ -152,7 +158,7 @@ func TestAlertsLimiterWithNoLimits(t *testing.T) { {alert: &types.Alert{Alert: alert1}, delete: true, expectedCount: 0, expectedTotalSize: 0}, } - testLimiter(t, &mockAlertManagerLimits{}, ops) + testLimiter(t, &mockAlertManagerLimits{}, ops, nil) } func TestAlertsLimiterWithCountLimit(t *testing.T) { @@ -171,7 +177,15 @@ func TestAlertsLimiterWithCountLimit(t *testing.T) { {alert: &types.Alert{Alert: alert2}, delete: true, expectedCount: 0, expectedTotalSize: 0}, } - testLimiter(t, &mockAlertManagerLimits{maxAlertsCount: 1}, ops) + testLimiter(t, &mockAlertManagerLimits{maxAlertsCount: 1}, ops, nil) + // make sure we log the event if the observer is set + logger := &FakeLogger{} + l := NewAlertLifeCycleObserverLimiter("fake", limits{tenant: "fake", limit: 1000}) + o := NewLogAlertLifeCycleObserver(logger, "fake", l) + testLimiter(t, &mockAlertManagerLimits{maxAlertsCount: 1}, ops, o) + assert.Equal(t, 1, len(logger.loggedValues)) + assert.Equal(t, "Rejected", logger.loggedValues[0]["msg"]) + assert.Equal(t, "count limit", logger.loggedValues[0]["reason"]) } func TestAlertsLimiterWithSizeLimit(t *testing.T) { @@ -191,7 +205,15 @@ func TestAlertsLimiterWithSizeLimit(t *testing.T) { // Prerequisite for this test. We set size limit to alert2Size, but inserting alert1 first will prevent insertion of alert2. require.True(t, alert2Size > alert1Size) - testLimiter(t, &mockAlertManagerLimits{maxAlertsSizeBytes: alert2Size}, ops) + testLimiter(t, &mockAlertManagerLimits{maxAlertsSizeBytes: alert2Size}, ops, nil) + // make sure we log the event if the observer is set + logger := &FakeLogger{} + l := NewAlertLifeCycleObserverLimiter("fake", limits{tenant: "fake", limit: 1000}) + o := NewLogAlertLifeCycleObserver(logger, "fake", l) + testLimiter(t, &mockAlertManagerLimits{maxAlertsSizeBytes: alert2Size}, ops, o) + assert.Equal(t, 2, len(logger.loggedValues)) + assert.Equal(t, "Rejected", logger.loggedValues[0]["msg"]) + assert.Equal(t, "size limit", logger.loggedValues[0]["reason"]) } func TestAlertsLimiterWithSizeLimitAndAnnotationUpdate(t *testing.T) { @@ -203,21 +225,21 @@ func TestAlertsLimiterWithSizeLimitAndAnnotationUpdate(t *testing.T) { testLimiter(t, &mockAlertManagerLimits{maxAlertsSizeBytes: alert2Size}, []callbackOp{ {alert: &types.Alert{Alert: alert2}, existing: false, expectedCount: 1, expectedTotalSize: alert2Size}, {alert: &types.Alert{Alert: alert2WithMoreAnnotations}, existing: true, expectedInsertError: fmt.Errorf(errAlertsTooBig, alert2Size), expectedCount: 1, expectedTotalSize: alert2Size}, - }) + }, nil) // Updating alert with larger annotations in the limit works fine. testLimiter(t, &mockAlertManagerLimits{maxAlertsSizeBytes: alert2WithMoreAnnotationsSize}, []callbackOp{ {alert: &types.Alert{Alert: alert2}, existing: false, expectedCount: 1, expectedTotalSize: alert2Size}, {alert: &types.Alert{Alert: alert2WithMoreAnnotations}, existing: true, expectedCount: 1, expectedTotalSize: alert2WithMoreAnnotationsSize}, {alert: &types.Alert{Alert: alert2}, existing: true, expectedCount: 1, expectedTotalSize: alert2Size}, - }) + }, nil) } // testLimiter sends sequence of alerts to limiter, and checks if limiter updated reacted correctly. -func testLimiter(t *testing.T, limits Limits, ops []callbackOp) { +func testLimiter(t *testing.T, limits Limits, ops []callbackOp, o alertobserver.LifeCycleObserver) { reg := prometheus.NewPedanticRegistry() - limiter := newAlertsLimiter("test", limits, reg) + limiter := newAlertsLimiter("test", limits, reg, o) for ix, op := range ops { if op.delete { @@ -236,3 +258,40 @@ func testLimiter(t *testing.T, limits Limits, ops []callbackOp) { assert.Equal(t, op.expectedTotalSize, totalSize, "wrong total size, op %d", ix) } } + +func TestRunAlertmanagerWithObserver(t *testing.T) { + logger := &FakeLogger{} + l := NewAlertLifeCycleObserverLimiter("fake", limits{tenant: "fake", limit: 1000}) + o := NewLogAlertLifeCycleObserver(logger, "fake", l) + + user := "test" + + reg := prometheus.NewPedanticRegistry() + am := createAlertmanager(t, user, reg, 100, o) + defer am.StopAndWait() + + now := time.Now() + + inputAlerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{ + "z": "y", + }, + Annotations: model.LabelSet{"foo": "bar"}, + StartsAt: now, + EndsAt: now.Add(5 * time.Minute), + GeneratorURL: "http://example.com/prometheus", + }, + UpdatedAt: now, + Timeout: false, + }, + } + require.NoError(t, am.alerts.Put(inputAlerts...)) + + test.Poll(t, 3*time.Second, true, func() interface{} { + logger.mtx.RLock() + defer logger.mtx.RUnlock() + return len(logger.loggedValues) > 10 + }) +} diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index 49a3e401ad2..6976cc1a309 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/cluster/clusterpb" amconfig "github.com/prometheus/alertmanager/config" @@ -90,6 +91,8 @@ type MultitenantAlertmanagerConfig struct { EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"` DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"` + + AlertLifeCycleObserverFn func(config *Config) alertobserver.LifeCycleObserver `yaml:"-"` } type ClusterConfig struct { @@ -227,6 +230,9 @@ type Limits interface { // AlertmanagerMaxAlertsSizeBytes returns total max size of alerts that tenant can have active at the same time. 0 = no limit. // Size of the alert is computed from alert labels, annotations and generator URL. AlertmanagerMaxAlertsSizeBytes(tenant string) int + + // AlertmanagerAlertLifeCycleObserverLevel returns an int that controls how much logs the LifeCycleObserver will collect. 0 = no logs. + AlertmanagerAlertLifeCycleObserverLevel(tenant string) int } // A MultitenantAlertmanager manages Alertmanager instances for multiple @@ -961,7 +967,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco return nil, errors.Wrapf(err, "failed to create per-tenant directory %v", tenantDir) } - newAM, err := New(&Config{ + c := &Config{ UserID: userID, TenantDataDir: tenantDir, Logger: am.logger, @@ -977,7 +983,14 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco Limits: am.limits, APIConcurrency: am.cfg.APIConcurrency, GCInterval: am.cfg.GCInterval, - }, reg) + } + + var o alertobserver.LifeCycleObserver + if am.cfg.AlertLifeCycleObserverFn != nil { + o = am.cfg.AlertLifeCycleObserverFn(c) + } + + newAM, err := New(c, reg, o) if err != nil { return nil, fmt.Errorf("unable to start Alertmanager for user %v: %v", userID, err) } diff --git a/pkg/alertmanager/multitenant_test.go b/pkg/alertmanager/multitenant_test.go index 38057bb1447..e20deb014d8 100644 --- a/pkg/alertmanager/multitenant_test.go +++ b/pkg/alertmanager/multitenant_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/cluster/clusterpb" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/pkg/labels" @@ -188,20 +189,45 @@ receivers: originalFiles, err := listFiles(storeDir) require.NoError(t, err) require.Equal(t, 3, len(originalFiles)) - - cfg := mockAlertmanagerConfig(t) - cfg.DataDir = storeDir - reg := prometheus.NewPedanticRegistry() - am, err := createMultitenantAlertmanager(cfg, nil, nil, store, nil, nil, log.NewNopLogger(), reg) - require.NoError(t, err) - for i := 0; i < 5; i++ { - err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) - require.NoError(t, err) - require.Len(t, am.alertmanagers, 2) - files, err := listFiles(storeDir) - require.NoError(t, err) - // Verify if the files were not deleted - require.Equal(t, originalFiles, files) + for _, tc := range []struct { + name string + observerFn func(config *Config) alertobserver.LifeCycleObserver + }{ + { + name: "running with alert observer", + observerFn: func(config *Config) alertobserver.LifeCycleObserver { + return &LogAlertLifeCycleObserver{} + }, + }, + { + name: "running without alert observer", + }, + } { + t.Run(tc.name, func(t *testing.T) { + cfg := mockAlertmanagerConfig(t) + cfg.AlertLifeCycleObserverFn = tc.observerFn + cfg.DataDir = storeDir + reg := prometheus.NewPedanticRegistry() + am, err := createMultitenantAlertmanager(cfg, nil, nil, store, nil, nil, log.NewNopLogger(), reg) + require.NoError(t, err) + for i := 0; i < 5; i++ { + err = am.loadAndSyncConfigs(context.Background(), reasonPeriodic) + require.NoError(t, err) + require.Len(t, am.alertmanagers, 2) + files, err := listFiles(storeDir) + require.NoError(t, err) + // Verify if the files were not deleted + require.Equal(t, originalFiles, files) + // Verify that alertLCObserver is set when the provider function is present + for _, v := range am.alertmanagers { + if tc.observerFn == nil { + require.Nil(t, v.alertLCObserver) + } else { + require.NotNil(t, v.alertLCObserver) + } + } + } + }) } } @@ -2217,6 +2243,7 @@ type mockAlertManagerLimits struct { maxDispatcherAggregationGroups int maxAlertsCount int maxAlertsSizeBytes int + alertLifeCycleObserverLevel int } func (m *mockAlertManagerLimits) AlertmanagerMaxConfigSize(tenant string) int { @@ -2258,3 +2285,7 @@ func (m *mockAlertManagerLimits) AlertmanagerMaxAlertsCount(_ string) int { func (m *mockAlertManagerLimits) AlertmanagerMaxAlertsSizeBytes(_ string) int { return m.maxAlertsSizeBytes } + +func (m *mockAlertManagerLimits) AlertmanagerAlertLifeCycleObserverLevel(_ string) int { + return m.alertLifeCycleObserverLevel +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 64cce598703..6693dd7a490 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -11,6 +11,7 @@ import ( "github.com/opentracing-contrib/go-stdlib/nethttp" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/rules" @@ -638,6 +639,15 @@ func (t *Cortex) initConfig() (serv services.Service, err error) { func (t *Cortex) initAlertManager() (serv services.Service, err error) { t.Cfg.Alertmanager.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort + if t.Cfg.Alertmanager.AlertLifeCycleObserverFn == nil { + t.Cfg.Alertmanager.AlertLifeCycleObserverFn = func(config *alertmanager.Config) alertobserver.LifeCycleObserver { + if config.Limits == nil { + return nil + } + observerLimiter := alertmanager.NewAlertLifeCycleObserverLimiter(config.UserID, config.Limits) + return alertmanager.NewLogAlertLifeCycleObserver(config.Logger, config.UserID, observerLimiter) + } + } // Initialise the store. store, err := alertstore.NewAlertStore(context.Background(), t.Cfg.AlertmanagerStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index d60b2ca8e65..033cb4bfcce 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -136,6 +136,7 @@ type Limits struct { AlertmanagerMaxDispatcherAggregationGroups int `yaml:"alertmanager_max_dispatcher_aggregation_groups" json:"alertmanager_max_dispatcher_aggregation_groups"` AlertmanagerMaxAlertsCount int `yaml:"alertmanager_max_alerts_count" json:"alertmanager_max_alerts_count"` AlertmanagerMaxAlertsSizeBytes int `yaml:"alertmanager_max_alerts_size_bytes" json:"alertmanager_max_alerts_size_bytes"` + AlertmanagerAlertLifeCycleObserverLevel int `yaml:"alertmanager_alert_lifecycle_observer_level" json:"alertmanager_alert_lifecycle_observer_level"` DisabledRuleGroups DisabledRuleGroups `yaml:"disabled_rule_groups" json:"disabled_rule_groups" doc:"nocli|description=list of rule groups to disable"` } @@ -217,6 +218,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.AlertmanagerMaxDispatcherAggregationGroups, "alertmanager.max-dispatcher-aggregation-groups", 0, "Maximum number of aggregation groups in Alertmanager's dispatcher that a tenant can have. Each active aggregation group uses single goroutine. When the limit is reached, dispatcher will not dispatch alerts that belong to additional aggregation groups, but existing groups will keep working properly. 0 = no limit.") f.IntVar(&l.AlertmanagerMaxAlertsCount, "alertmanager.max-alerts-count", 0, "Maximum number of alerts that a single user can have. Inserting more alerts will fail with a log message and metric increment. 0 = no limit.") f.IntVar(&l.AlertmanagerMaxAlertsSizeBytes, "alertmanager.max-alerts-size-bytes", 0, "Maximum total size of alerts that a single user can have, alert size is the sum of the bytes of its labels, annotations and generatorURL. Inserting more alerts will fail with a log message and metric increment. 0 = no limit.") + f.IntVar(&l.AlertmanagerAlertLifeCycleObserverLevel, "alertmanager.alert-lifecycle-observer-level", 0, "Level of logs the LifeCycleObserver will collect. 0 = no logs.") } // Validate the limits config and returns an error if the validation @@ -676,6 +678,10 @@ func (o *Overrides) AlertmanagerMaxAlertsSizeBytes(userID string) int { return o.GetOverridesForUser(userID).AlertmanagerMaxAlertsSizeBytes } +func (o *Overrides) AlertmanagerAlertLifeCycleObserverLevel(userID string) int { + return o.GetOverridesForUser(userID).AlertmanagerAlertLifeCycleObserverLevel +} + func (o *Overrides) DisabledRuleGroups(userID string) DisabledRuleGroups { if o.tenantLimits != nil { l := o.tenantLimits.ByUserID(userID)