Skip to content

Commit

Permalink
Add alert lifecycle observer
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <[email protected]>
  • Loading branch information
emanlodovice committed Jan 12, 2024
1 parent 2a16a63 commit 9c8a700
Show file tree
Hide file tree
Showing 13 changed files with 610 additions and 84 deletions.
36 changes: 36 additions & 0 deletions alertobserver/alertobserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2023 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alertobserver

import (
"github.com/prometheus/alertmanager/types"
)

const (
EventAlertReceived string = "received"
EventAlertRejected string = "rejected"
EventAlertAddedToAggrGroup string = "addedAggrGroup"
EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup"
EventAlertPipelineStart string = "pipelineStart"
EventAlertPipelinePassStage string = "pipelinePassStage"
EventAlertMuted string = "muted"
EventAlertSent string = "sent"
EventAlertSendFailed string = "sendFailed"
)

type AlertEventMeta map[string]interface{}

type LifeCycleObserver interface {
Observe(event string, alerts []*types.Alert, meta AlertEventMeta)
}
46 changes: 46 additions & 0 deletions alertobserver/testing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2023 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package alertobserver

import (
"sync"

"github.com/prometheus/alertmanager/types"
)

type FakeLifeCycleObserver struct {
AlertsPerEvent map[string][]*types.Alert
PipelineStageAlerts map[string][]*types.Alert
MetaPerEvent map[string][]AlertEventMeta
Mtx sync.RWMutex
}

func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) {
o.Mtx.Lock()
defer o.Mtx.Unlock()
if event == EventAlertPipelinePassStage {
o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...)
} else {
o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...)
}
o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta)
}

func NewFakeLifeCycleObserver() *FakeLifeCycleObserver {
return &FakeLifeCycleObserver{
PipelineStageAlerts: map[string][]*types.Alert{},
AlertsPerEvent: map[string][]*types.Alert{},
MetaPerEvent: map[string][]AlertEventMeta{},
}
}
6 changes: 6 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"

"github.com/prometheus/alertmanager/alertobserver"
apiv1 "github.com/prometheus/alertmanager/api/v1"
apiv2 "github.com/prometheus/alertmanager/api/v2"
"github.com/prometheus/alertmanager/cluster"
Expand Down Expand Up @@ -81,6 +82,9 @@ type Options struct {
GroupInfoFunc func(func(*dispatch.Route) bool) dispatch.AlertGroupInfos
// APICallback define the callback function that each api call will perform before returned.
APICallback callback.Callback
// AlertLCObserver is used to add hooks to the different alert life cycle events.
// If nil then no observer methods will be invoked in the life cycle events.
AlertLCObserver alertobserver.LifeCycleObserver
}

func (o Options) validate() error {
Expand Down Expand Up @@ -124,6 +128,7 @@ func New(opts Options) (*API, error) {
opts.Peer,
log.With(l, "version", "v1"),
opts.Registry,
opts.AlertLCObserver,
)

v2, err := apiv2.NewAPI(
Expand All @@ -136,6 +141,7 @@ func New(opts Options) (*API, error) {
opts.Peer,
log.With(l, "version", "v2"),
opts.Registry,
opts.AlertLCObserver,
)
if err != nil {
return nil, err
Expand Down
45 changes: 30 additions & 15 deletions api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"

"github.com/prometheus/alertmanager/alertobserver"
"github.com/prometheus/alertmanager/api/metrics"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/config"
Expand Down Expand Up @@ -67,14 +68,15 @@ func setCORS(w http.ResponseWriter) {

// API provides registration of handlers for API routes.
type API struct {
alerts provider.Alerts
silences *silence.Silences
config *config.Config
route *dispatch.Route
uptime time.Time
peer cluster.ClusterPeer
logger log.Logger
m *metrics.Alerts
alerts provider.Alerts
silences *silence.Silences
config *config.Config
route *dispatch.Route
uptime time.Time
peer cluster.ClusterPeer
logger log.Logger
m *metrics.Alerts
alertLCObserver alertobserver.LifeCycleObserver

getAlertStatus getAlertStatusFn

Expand All @@ -91,19 +93,21 @@ func New(
peer cluster.ClusterPeer,
l log.Logger,
r prometheus.Registerer,
o alertobserver.LifeCycleObserver,
) *API {
if l == nil {
l = log.NewNopLogger()
}

return &API{
alerts: alerts,
silences: silences,
getAlertStatus: sf,
uptime: time.Now(),
peer: peer,
logger: l,
m: metrics.NewAlerts("v1", r),
alerts: alerts,
silences: silences,
getAlertStatus: sf,
uptime: time.Now(),
peer: peer,
logger: l,
m: metrics.NewAlerts("v1", r),
alertLCObserver: o,
}
}

Expand Down Expand Up @@ -447,6 +451,10 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
if err := a.Validate(); err != nil {
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
}
validAlerts = append(validAlerts, a)
Expand All @@ -456,8 +464,15 @@ func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*
typ: errorInternal,
err: err,
}, nil)
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return
}
if api.alertLCObserver != nil {
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
}

if validationErrs.Len() > 0 {
api.respondError(w, apiError{
Expand Down
73 changes: 71 additions & 2 deletions api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/prometheus/alertmanager/alertobserver"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/pkg/labels"
Expand Down Expand Up @@ -134,7 +135,7 @@ func TestAddAlerts(t *testing.T) {
}

alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil)
defaultGlobalConfig := config.DefaultGlobalConfig()
route := config.Route{}
api.Update(&config.Config{
Expand All @@ -153,6 +154,74 @@ func TestAddAlerts(t *testing.T) {
body, _ := io.ReadAll(res.Body)

require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body)))

observer := alertobserver.NewFakeLifeCycleObserver()
api.alertLCObserver = observer
r, err = http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w = httptest.NewRecorder()
if err != nil {
t.Errorf("Unexpected error %v", err)
}
api.addAlerts(w, r)
if tc.code == 200 {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint())
} else {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint())
}
}
}

func TestAddAlertsWithAlertLCObserver(t *testing.T) {
now := func(offset int) time.Time {
return time.Now().Add(time.Duration(offset) * time.Second)
}

for i, tc := range []struct {
start, end time.Time
err bool
code int
}{
{time.Time{}, time.Time{}, false, 200},
{now(1), now(0), false, 400},
{now(0), time.Time{}, true, 500},
} {
alerts := []model.Alert{{
StartsAt: tc.start,
EndsAt: tc.end,
Labels: model.LabelSet{"label1": "test1"},
Annotations: model.LabelSet{"annotation1": "some text"},
}}
b, err := json.Marshal(&alerts)
if err != nil {
t.Errorf("Unexpected error %v", err)
}

alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err)
observer := alertobserver.NewFakeLifeCycleObserver()
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, observer)
defaultGlobalConfig := config.DefaultGlobalConfig()
route := config.Route{}
api.Update(&config.Config{
Global: &defaultGlobalConfig,
Route: &route,
})

r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b))
w := httptest.NewRecorder()
if err != nil {
t.Errorf("Unexpected error %v", err)
}

api.addAlerts(w, r)
res := w.Result()
body, _ := io.ReadAll(res.Body)

require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body)))
if tc.code == 200 {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint())
} else {
require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint())
}
}
}

Expand Down Expand Up @@ -267,7 +336,7 @@ func TestListAlerts(t *testing.T) {
},
} {
alertsProvider := newFakeAlerts(alerts, tc.err)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil)
api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil)
api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil)

r, err := http.NewRequest("GET", "/api/v1/alerts", nil)
Expand Down
19 changes: 17 additions & 2 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist"
"github.com/prometheus/alertmanager/util/callback"

"github.com/prometheus/alertmanager/alertobserver"
"github.com/prometheus/alertmanager/api/metrics"
open_api_models "github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/alertmanager/api/v2/restapi"
Expand Down Expand Up @@ -76,8 +77,9 @@ type API struct {
route *dispatch.Route
setAlertStatus setAlertStatusFn

logger log.Logger
m *metrics.Alerts
logger log.Logger
m *metrics.Alerts
alertLCObserver alertobserver.LifeCycleObserver

Handler http.Handler
}
Expand All @@ -100,6 +102,7 @@ func NewAPI(
peer cluster.ClusterPeer,
l log.Logger,
r prometheus.Registerer,
o alertobserver.LifeCycleObserver,
) (*API, error) {
if apiCallback == nil {
apiCallback = callback.NoopAPICallback{}
Expand All @@ -115,6 +118,7 @@ func NewAPI(
logger: l,
m: metrics.NewAlerts("v2", r),
uptime: time.Now(),
alertLCObserver: o,
}

// Load embedded swagger file.
Expand Down Expand Up @@ -402,19 +406,30 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.
if err := a.Validate(); err != nil {
validationErrs.Add(err)
api.m.Invalid().Inc()
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m)
}
continue
}
validAlerts = append(validAlerts, a)
}
if err := api.alerts.Put(validAlerts...); err != nil {
level.Error(logger).Log("msg", "Failed to create alerts", "err", err)
if api.alertLCObserver != nil {
m := alertobserver.AlertEventMeta{"msg": err.Error()}
api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m)
}
return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error())
}

if validationErrs.Len() > 0 {
level.Error(logger).Log("msg", "Failed to validate alerts", "err", validationErrs.Error())
return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error())
}
if api.alertLCObserver != nil {
api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{})
}

return alert_ops.NewPostAlertsOK()
}
Expand Down
Loading

0 comments on commit 9c8a700

Please sign in to comment.