Skip to content

Commit

Permalink
Alerting: Update scheduler to get updates only from database (#64635)
Browse files Browse the repository at this point in the history
* stop using the scheduler's Update and Delete methods all communication must be via the database
* update scheduler's registry to calculate diff before re-setting the cache
* update fetcher to return the diff generated by registry
* update processTick to update rule eval routine if the rule was updated and it is not going to be evaluated at this tick.
* remove references to the scheduler from api package
* remove unused methods in the scheduler
  • Loading branch information
yuri-tceretian authored Mar 14, 2023
1 parent 10c809a commit 85a954c
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 298 deletions.
3 changes: 0 additions & 3 deletions pkg/services/ngalert/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
"github.com/grafana/grafana/pkg/services/ngalert/provisioning"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/services/ngalert/sender"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store"
Expand Down Expand Up @@ -66,7 +65,6 @@ type API struct {
DatasourceService datasources.DataSourceService
RouteRegister routing.RouteRegister
QuotaService quota.Service
Schedule schedule.ScheduleService
TransactionManager provisioning.TransactionManager
ProvenanceStore provisioning.ProvisioningStore
RuleStore RuleStore
Expand Down Expand Up @@ -116,7 +114,6 @@ func (api *API) RegisterAPIEndpoints(m *metrics.API) {
&RulerSrv{
conditionValidator: api.EvaluatorFactory,
QuotaService: api.QuotaService,
scheduleService: api.Schedule,
store: api.RuleStore,
provenanceStore: api.ProvenanceStore,
xactManager: api.TransactionManager,
Expand Down
23 changes: 0 additions & 23 deletions pkg/services/ngalert/api/api_ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/eval"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/provisioning"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/quota"
"github.com/grafana/grafana/pkg/setting"
Expand All @@ -37,7 +36,6 @@ type RulerSrv struct {
provenanceStore provisioning.ProvisioningStore
store RuleStore
QuotaService quota.Service
scheduleService schedule.ScheduleService
log log.Logger
cfg *setting.UnifiedAlertingSettings
ac accesscontrol.AccessControl
Expand Down Expand Up @@ -144,12 +142,6 @@ func (srv RulerSrv) RouteDeleteAlertRules(c *contextmodel.ReqContext, namespaceT
}
return ErrResp(http.StatusInternalServerError, err, "failed to delete rule group")
}

logger.Debug("rules have been deleted from the store. updating scheduler")
for _, ruleKeys := range deletedGroups {
srv.scheduleService.DeleteAlertRule(ruleKeys...)
}

return response.JSON(http.StatusAccepted, util.DynMap{"message": "rules deleted"})
}

Expand Down Expand Up @@ -421,21 +413,6 @@ func (srv RulerSrv) updateAlertRulesInGroup(c *contextmodel.ReqContext, groupKey
return ErrResp(http.StatusInternalServerError, err, "failed to update rule group")
}

for _, rule := range finalChanges.Update {
srv.scheduleService.UpdateAlertRule(ngmodels.AlertRuleKey{
OrgID: c.SignedInUser.OrgID,
UID: rule.Existing.UID,
}, rule.Existing.Version+1, rule.New.IsPaused)
}

if len(finalChanges.Delete) > 0 {
keys := make([]ngmodels.AlertRuleKey, 0, len(finalChanges.Delete))
for _, rule := range finalChanges.Delete {
keys = append(keys, rule.GetKey())
}
srv.scheduleService.DeleteAlertRule(keys...)
}

if finalChanges.IsEmpty() {
return response.JSON(http.StatusAccepted, util.DynMap{"message": "no changes detected in the rule group"})
}
Expand Down
98 changes: 24 additions & 74 deletions pkg/services/ngalert/api/api_ruler_test.go

Large diffs are not rendered by default.

15 changes: 3 additions & 12 deletions pkg/services/ngalert/ngalert.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (ng *AlertNG) init() error {

// if it is required to include folder title to the alerts, we need to subscribe to changes of alert title
if !ng.Cfg.UnifiedAlerting.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel) {
subscribeToFolderChanges(context.Background(), ng.Log, ng.bus, store, scheduler)
subscribeToFolderChanges(ng.Log, ng.bus, store)
}

ng.stateManager = stateManager
Expand All @@ -248,7 +248,6 @@ func (ng *AlertNG) init() error {
DatasourceCache: ng.DataSourceCache,
DatasourceService: ng.DataSourceService,
RouteRegister: ng.RouteRegister,
Schedule: ng.schedule,
DataProxy: ng.DataProxy,
QuotaService: ng.QuotaService,
TransactionManager: store,
Expand Down Expand Up @@ -296,26 +295,18 @@ func (ng *AlertNG) init() error {
return DeclareFixedRoles(ng.accesscontrolService)
}

func subscribeToFolderChanges(ctx context.Context, logger log.Logger, bus bus.Bus, dbStore api.RuleStore, scheduler schedule.ScheduleService) {
func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleStore) {
// if folder title is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and
// clean up the current state
bus.AddEventListener(func(ctx context.Context, e *events.FolderTitleUpdated) error {
// do not block the upstream execution
go func(evt *events.FolderTitleUpdated) {
logger.Info("Got folder title updated event. updating rules in the folder", "folderUID", evt.UID)
updated, err := dbStore.IncreaseVersionForAllRulesInNamespace(ctx, evt.OrgID, evt.UID)
_, err := dbStore.IncreaseVersionForAllRulesInNamespace(ctx, evt.OrgID, evt.UID)
if err != nil {
logger.Error("Failed to update alert rules in the folder after its title was changed", "error", err, "folderUID", evt.UID, "folder", evt.Title)
return
}
if len(updated) > 0 {
logger.Info("Rules that belong to the folder have been updated successfully. Clearing their status", "folderUID", evt.UID, "updatedRules", len(updated))
for _, key := range updated {
scheduler.UpdateAlertRule(key.AlertRuleKey, key.Version, key.IsPaused)
}
} else {
logger.Debug("No alert rules found in the folder. nothing to update", "folderUID", evt.UID, "folder", evt.Title)
}
}(e)
return nil
})
Expand Down
21 changes: 1 addition & 20 deletions pkg/services/ngalert/ngalert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/grafana/grafana/pkg/bus"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/grafana/grafana/pkg/services/folder"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
"github.com/grafana/grafana/pkg/services/ngalert/tests/fakes"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util"
Expand All @@ -39,10 +37,7 @@ func Test_subscribeToFolderChanges(t *testing.T) {
db.Folders[orgID] = append(db.Folders[orgID], folder)
db.PutRule(context.Background(), rules...)

scheduler := &schedule.FakeScheduleService{}
scheduler.On("UpdateAlertRule", mock.Anything, mock.Anything, mock.Anything).Return()

subscribeToFolderChanges(context.Background(), log.New("test"), bus, db, scheduler)
subscribeToFolderChanges(log.New("test"), bus, db)

err := bus.Publish(context.Background(), &events.FolderTitleUpdated{
Timestamp: time.Now(),
Expand All @@ -62,20 +57,6 @@ func Test_subscribeToFolderChanges(t *testing.T) {
return c, true
})) > 0
}, time.Second, 10*time.Millisecond, "expected to call db store method but nothing was called")

var calledTimes int
require.Eventuallyf(t, func() bool {
for _, call := range scheduler.Calls {
if call.Method == "UpdateAlertRule" {
calledTimes++
}
}
return calledTimes == len(rules)
}, time.Second, 10*time.Millisecond, "scheduler was expected to be called %d times but called %d", len(rules), calledTimes)

for _, rule := range rules {
scheduler.AssertCalled(t, "UpdateAlertRule", rule.GetKey(), rule.Version, false)
}
}

func TestConfigureHistorianBackend(t *testing.T) {
Expand Down
20 changes: 10 additions & 10 deletions pkg/services/ngalert/schedule/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func sortedUIDs(alertRules []*models.AlertRule) []string {
}

// updateSchedulableAlertRules updates the alert rules for the scheduler.
// It returns an error if the database is unavailable or the query returned
// an error.
func (sch *schedule) updateSchedulableAlertRules(ctx context.Context) error {
// It returns diff that contains rule keys that were updated since the last poll,
// and an error if the database query encountered problems.
func (sch *schedule) updateSchedulableAlertRules(ctx context.Context) (diff, error) {
start := time.Now()
defer func() {
sch.metrics.UpdateSchedulableAlertRulesDuration.Observe(
Expand All @@ -46,21 +46,21 @@ func (sch *schedule) updateSchedulableAlertRules(ctx context.Context) error {
if !sch.schedulableAlertRules.isEmpty() {
keys, err := sch.ruleStore.GetAlertRulesKeysForScheduling(ctx)
if err != nil {
return err
return diff{}, err
}
if !sch.schedulableAlertRules.needsUpdate(keys) {
sch.log.Debug("No changes detected. Skip updating")
return nil
return diff{}, nil
}
}

// At this point, we know we need to re-fetch rules as there are changes.
q := models.GetAlertRulesForSchedulingQuery{
PopulateFolders: !sch.disableGrafanaFolder,
}
if err := sch.ruleStore.GetAlertRulesForScheduling(ctx, &q); err != nil {
return fmt.Errorf("failed to get alert rules: %w", err)
return diff{}, fmt.Errorf("failed to get alert rules: %w", err)
}
sch.log.Debug("Alert rules fetched", "rulesCount", len(q.ResultRules), "foldersCount", len(q.ResultFoldersTitles))
sch.schedulableAlertRules.set(q.ResultRules, q.ResultFoldersTitles)
return nil
d := sch.schedulableAlertRules.set(q.ResultRules, q.ResultFoldersTitles)
sch.log.Debug("Alert rules fetched", "rulesCount", len(q.ResultRules), "foldersCount", len(q.ResultFoldersTitles), "updatedRules", len(d.updated))
return d, nil
}
50 changes: 32 additions & 18 deletions pkg/services/ngalert/schedule/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package schedule
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -32,19 +31,6 @@ func (r *alertRuleInfoRegistry) getOrCreateInfo(context context.Context, key mod
return info, !ok
}

// get returns the channel for the specific alert rule
// if the key does not exist returns an error
func (r *alertRuleInfoRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) {
r.mu.Lock()
defer r.mu.Unlock()

info, ok := r.alertRuleInfo[key]
if !ok {
return nil, fmt.Errorf("%v key not found", key)
}
return info, nil
}

func (r *alertRuleInfoRegistry) exists(key models.AlertRuleKey) bool {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down Expand Up @@ -169,16 +155,19 @@ func (r *alertRulesRegistry) get(k models.AlertRuleKey) *models.AlertRule {
return r.rules[k]
}

// set replaces all rules in the registry.
func (r *alertRulesRegistry) set(rules []*models.AlertRule, folders map[string]string) {
// set replaces all rules in the registry. Returns difference between previous and the new current version of the registry
func (r *alertRulesRegistry) set(rules []*models.AlertRule, folders map[string]string) diff {
r.mu.Lock()
defer r.mu.Unlock()
r.rules = make(map[models.AlertRuleKey]*models.AlertRule)
rulesMap := make(map[models.AlertRuleKey]*models.AlertRule)
for _, rule := range rules {
r.rules[rule.GetKey()] = rule
rulesMap[rule.GetKey()] = rule
}
d := r.getDiff(rulesMap)
r.rules = rulesMap
// return the map as is without copying because it is not mutated
r.folderTitles = folders
return d
}

// update inserts or replaces a rule in the registry.
Expand Down Expand Up @@ -219,3 +208,28 @@ func (r *alertRulesRegistry) needsUpdate(keys []models.AlertRuleKeyWithVersion)
}
return false
}

type diff struct {
updated map[models.AlertRuleKey]struct{}
}

func (d diff) IsEmpty() bool {
return len(d.updated) == 0
}

// getDiff calculates difference between the list of rules fetched previously and provided keys. Returns diff where
// updated - a list of keys that exist in the registry but with different version,
func (r *alertRulesRegistry) getDiff(rules map[models.AlertRuleKey]*models.AlertRule) diff {
result := diff{
updated: map[models.AlertRuleKey]struct{}{},
}
for key, newRule := range rules {
oldRule, ok := r.rules[key]
if !ok || newRule.Version == oldRule.Version {
// a new rule or not updated
continue
}
result.updated[key] = struct{}{}
}
return result
}
50 changes: 50 additions & 0 deletions pkg/services/ngalert/schedule/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,3 +318,53 @@ func TestSchedulableAlertRulesRegistry(t *testing.T) {
assert.False(t, ok)
assert.Nil(t, deleted)
}

func TestSchedulableAlertRulesRegistry_set(t *testing.T) {
_, initialRules := models.GenerateUniqueAlertRules(100, models.AlertRuleGen())
init := make(map[models.AlertRuleKey]*models.AlertRule, len(initialRules))
for _, rule := range initialRules {
init[rule.GetKey()] = rule
}
r := alertRulesRegistry{rules: init}
t.Run("should return empty diff if exactly the same rules", func(t *testing.T) {
newRules := make([]*models.AlertRule, 0, len(initialRules))
for _, rule := range initialRules {
newRules = append(newRules, models.CopyRule(rule))
}
diff := r.set(newRules, map[string]string{})
require.Truef(t, diff.IsEmpty(), "Diff is not empty. Probably we check something else than key + version")
})
t.Run("should return empty diff if version does not change", func(t *testing.T) {
newRules := make([]*models.AlertRule, 0, len(initialRules))
// generate random and then override rule key + version
_, randomNew := models.GenerateUniqueAlertRules(len(initialRules), models.AlertRuleGen())
for i := 0; i < len(initialRules); i++ {
rule := randomNew[i]
oldRule := initialRules[i]
rule.UID = oldRule.UID
rule.OrgID = oldRule.OrgID
rule.Version = oldRule.Version
newRules = append(newRules, rule)
}

diff := r.set(newRules, map[string]string{})
require.Truef(t, diff.IsEmpty(), "Diff is not empty. Probably we check something else than key + version")
})
t.Run("should return key in diff if version changes", func(t *testing.T) {
newRules := make([]*models.AlertRule, 0, len(initialRules))
expectedUpdated := map[models.AlertRuleKey]struct{}{}
for i, rule := range initialRules {
cp := models.CopyRule(rule)
if i%2 == 0 {
cp.Version++
expectedUpdated[cp.GetKey()] = struct{}{}
}
newRules = append(newRules, cp)
}
require.NotEmptyf(t, expectedUpdated, "Input parameters have changed. Nothing to assert")

diff := r.set(newRules, map[string]string{})
require.Falsef(t, diff.IsEmpty(), "Diff is empty but should not be")
require.Equal(t, expectedUpdated, diff.updated)
})
}
Loading

0 comments on commit 85a954c

Please sign in to comment.