diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index fe71f14f178..1436a248aba 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -373,7 +373,7 @@ func runQuery( enablePartialResponse, queryReplicaLabels, instantDefaultMaxSourceResolution, - rules.NewRetriever(rulesProxy), + rules.NewGRPCClient(rulesProxy), ) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 2294dce208c..d4a641d8990 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -39,8 +39,8 @@ import ( "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/query" + thanosrules "github.com/thanos-io/thanos/pkg/rules" v1 "github.com/thanos-io/thanos/pkg/rules/api" - thanosmanager "github.com/thanos-io/thanos/pkg/rules/manager" "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" @@ -386,13 +386,13 @@ func runRule( alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion)) } - // Run rule evaluation and alert notifications. var ( + ruleMgr *thanosrules.Manager alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) - ruleMgr = thanosmanager.NewManager(dataDir) ) { - notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { + // Run rule evaluation and alert notifications. + notifyFunc := func(ctx context.Context, expr string, alerts ...*rules.Alert) { res := make([]*alert.Alert, 0, len(alerts)) for _, alrt := range alerts { // Only send actually firing alerts. @@ -414,41 +414,37 @@ func runRule( } alertQ.Push(res) } - st := tsdb.Adapter(db, 0) - - opts := rules.ManagerOptions{ - NotifyFunc: notify, - Logger: log.With(logger, "component", "rules"), - Appendable: st, - ExternalURL: nil, - TSDB: st, - ResendDelay: resendDelay, - } - - // TODO(bwplotka): Hide this behind thanos rules.Manager. - for _, strategy := range storepb.PartialResponseStrategy_value { - s := storepb.PartialResponseStrategy(strategy) - ctx, cancel := context.WithCancel(context.Background()) - ctx = tracing.ContextWithTracer(ctx, tracer) + ctx, cancel := context.WithCancel(context.Background()) - opts := opts - opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg) - opts.Context = ctx - opts.QueryFunc = queryFunc(logger, queryClients, duplicatedQuery, ruleEvalWarnings, s) + st := tsdb.Adapter(db, 0) + logger = log.With(logger, "component", "rules") + ruleMgr = thanosrules.NewManager( + tracing.ContextWithTracer(ctx, tracer), + reg, + dataDir, + rules.ManagerOptions{ + NotifyFunc: notifyFunc, + Logger: logger, + Appendable: st, + ExternalURL: nil, + TSDB: st, + ResendDelay: resendDelay, + }, + queryFuncCreator(logger, queryClients, duplicatedQuery, ruleEvalWarnings), + ) - mgr := rules.NewManager(&opts) - ruleMgr.SetRuleManager(s, mgr) - g.Add(func() error { - mgr.Run() - <-ctx.Done() + // Schedule rule manager that evaluates rules. + g.Add(func() error { + ruleMgr.Run() + <-ctx.Done() - return nil - }, func(error) { - cancel() - mgr.Stop() - }) - } + return nil + }, func(err error) { + cancel() + ruleMgr.Stop() + }) + ruleMgr.Run() } // Run the alert sender. { @@ -539,7 +535,7 @@ func runRule( } // TODO: Add rules API implementation when ready. - s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, nil, + s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, ruleMgr, grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), @@ -578,7 +574,7 @@ func runRule( ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix), ins) - api := v1.NewAPI(logger, reg, ruleMgr) + api := v1.NewAPI(logger, reg, thanosrules.NewGRPCClient(ruleMgr), ruleMgr) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger, ins) srv := httpserver.New(logger, reg, comp, httpProbe, @@ -687,57 +683,59 @@ func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometh return deduplicated } -// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result -// back or the context get canceled. -func queryFunc( +func queryFuncCreator( logger log.Logger, queriers []*http_util.Client, duplicatedQuery prometheus.Counter, ruleEvalWarnings *prometheus.CounterVec, - partialResponseStrategy storepb.PartialResponseStrategy, -) rules.QueryFunc { - var spanID string - - switch partialResponseStrategy { - case storepb.PartialResponseStrategy_WARN: - spanID = "/rule_instant_query HTTP[client]" - case storepb.PartialResponseStrategy_ABORT: - spanID = "/rule_instant_query_part_resp_abort HTTP[client]" - default: - // Programming error will be caught by tests. - panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) - } +) func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + + // queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result + // back or the context get canceled. + return func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + var spanID string + + switch partialResponseStrategy { + case storepb.PartialResponseStrategy_WARN: + spanID = "/rule_instant_query HTTP[client]" + case storepb.PartialResponseStrategy_ABORT: + spanID = "/rule_instant_query_part_resp_abort HTTP[client]" + default: + // Programming error will be caught by tests. + panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) + } - promClients := make([]*promclient.Client, 0, len(queriers)) - for _, q := range queriers { - promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule")) - } + promClients := make([]*promclient.Client, 0, len(queriers)) + for _, q := range queriers { + promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule")) + } - return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { - for _, i := range rand.Perm(len(queriers)) { - promClient := promClients[i] - endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) - for _, i := range rand.Perm(len(endpoints)) { - span, ctx := tracing.StartSpan(ctx, spanID) - v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{ - Deduplicate: true, - PartialResponseStrategy: partialResponseStrategy, - }) - span.Finish() - - if err != nil { - level.Error(logger).Log("err", err, "query", q) - continue - } - if len(warns) > 0 { - ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() - // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): - level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + for _, i := range rand.Perm(len(queriers)) { + promClient := promClients[i] + endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) + for _, i := range rand.Perm(len(endpoints)) { + span, ctx := tracing.StartSpan(ctx, spanID) + v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{ + Deduplicate: true, + PartialResponseStrategy: partialResponseStrategy, + }) + span.Finish() + + if err != nil { + level.Error(logger).Log("err", err, "query", q) + continue + } + if len(warns) > 0 { + ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() + // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): + level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) + } + return v, nil } - return v, nil } + return nil, errors.Errorf("no query API server reachable") } - return nil, errors.Errorf("no query API server reachable") } } diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index 99c58180db9..7d1f22e9f57 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -42,8 +42,10 @@ import ( "github.com/prometheus/prometheus/storage" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/rules" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -98,17 +100,13 @@ func SetCORS(w http.ResponseWriter) { type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError) -type rulesRetriever interface { - RuleGroups(context.Context) ([]*rulespb.RuleGroup, storage.Warnings, error) -} - // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { logger log.Logger queryableCreate query.QueryableCreator queryEngine *promql.Engine - rulesRetriever rulesRetriever + ruleGroups rules.UnaryClient enableAutodownsampling bool enablePartialResponse bool @@ -129,7 +127,7 @@ func NewAPI( enablePartialResponse bool, replicaLabels []string, defaultInstantQueryMaxSourceResolution time.Duration, - rr rulesRetriever, + ruleGroups rules.UnaryClient, ) *API { return &API{ logger: logger, @@ -140,7 +138,7 @@ func NewAPI( replicaLabels: replicaLabels, reg: reg, defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution, - rulesRetriever: rr, + ruleGroups: ruleGroups, now: time.Now, } @@ -178,7 +176,7 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. r.Get("/labels", instr("label_names", api.labelNames)) r.Post("/labels", instr("label_names", api.labelNames)) - r.Get("/rules", instr("rules", api.rules)) + r.Get("/rules", instr("rules", NewRulesHandler(api.ruleGroups))) } type queryData struct { @@ -638,55 +636,28 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { return names, warnings, nil } -func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) { - var ( - res = &rulespb.RuleGroups{} - typeParam = strings.ToLower(r.URL.Query().Get("type")) - ) - - if typeParam != "" && typeParam != "alert" && typeParam != "record" { - return nil, nil, &ApiError{errorBadData, errors.Errorf("invalid query parameter type='%v'", typeParam)} - } - - returnAlerts := typeParam == "" || typeParam == "alert" - returnRecording := typeParam == "" || typeParam == "record" - - groups, warnings, err := api.rulesRetriever.RuleGroups(r.Context()) - if err != nil { - return nil, nil, &ApiError{ErrorInternal, fmt.Errorf("error retrieving rules: %v", err)} - } - - for _, grp := range groups { - apiRuleGroup := &rulespb.RuleGroup{ - Name: grp.Name, - File: grp.File, - Interval: grp.Interval, - EvaluationDurationSeconds: grp.EvaluationDurationSeconds, - LastEvaluation: grp.LastEvaluation, - DeprecatedPartialResponseStrategy: grp.DeprecatedPartialResponseStrategy, - PartialResponseStrategy: grp.PartialResponseStrategy, +// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules +// which uses gRPC Unary Rules API. +func NewRulesHandler(client rules.UnaryClient) func(*http.Request) (interface{}, []error, *ApiError) { + return func(request *http.Request) (interface{}, []error, *ApiError) { + req := &rulespb.RulesRequest{ + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + switch typeParam := strings.ToLower(request.URL.Query().Get("type")); typeParam { + case "": + req.Type = rulespb.RulesRequest_ALL + case "alert": + req.Type = rulespb.RulesRequest_ALERTING + case "record": + req.Type = rulespb.RulesRequest_RECORDING + default: + return nil, nil, &ApiError{errorBadData, errors.Errorf("invalid query parameter type='%v'", typeParam)} } - apiRuleGroup.Rules = make([]*rulespb.Rule, 0, len(grp.Rules)) - - for _, r := range grp.Rules { - switch { - case r.GetAlert() != nil: - if !returnAlerts { - break - } - apiRuleGroup.Rules = append(apiRuleGroup.Rules, r) - case r.GetRecording() != nil: - if !returnRecording { - break - } - apiRuleGroup.Rules = append(apiRuleGroup.Rules, r) - default: - return nil, nil, &ApiError{ErrorInternal, fmt.Errorf("rule %v: unsupported", r)} - } + groups, warnings, err := client.Rules(request.Context(), req) + if err != nil { + return nil, nil, &ApiError{ErrorInternal, errors.Errorf("error retrieving rules: %v", err)} } - res.Groups = append(res.Groups, apiRuleGroup) + return groups, warnings, nil } - - return res, warnings, nil } diff --git a/pkg/rules/api/v1.go b/pkg/rules/api/v1.go index 864a33e37b7..caaf75a97ee 100644 --- a/pkg/rules/api/v1.go +++ b/pkg/rules/api/v1.go @@ -4,43 +4,45 @@ package v1 import ( - "fmt" "net/http" - "strconv" "time" "github.com/NYTimes/gziphandler" - "github.com/prometheus/client_golang/prometheus" - "github.com/thanos-io/thanos/pkg/rules/rulespb" - "github.com/go-kit/kit/log" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/rules" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" qapi "github.com/thanos-io/thanos/pkg/query/api" - "github.com/thanos-io/thanos/pkg/rules/manager" - "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/rules" + "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/tracing" ) type API struct { - logger log.Logger - now func() time.Time - ruleRetriever RulesRetriever - reg prometheus.Registerer + logger log.Logger + now func() time.Time + ruleGroups rules.UnaryClient + alerts alertsRetriever + reg prometheus.Registerer +} + +type alertsRetriever interface { + Active() []*rulespb.AlertInstance } func NewAPI( logger log.Logger, reg prometheus.Registerer, - ruleRetriever RulesRetriever, + ruleGroups rules.UnaryClient, + activeAlerts alertsRetriever, ) *API { return &API{ - logger: logger, - now: time.Now, - ruleRetriever: ruleRetriever, - reg: reg, + logger: logger, + now: time.Now, + ruleGroups: ruleGroups, + alerts: activeAlerts, + reg: reg, } } @@ -59,92 +61,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. return ins.NewHandler(name, tracing.HTTPMiddleware(tracer, name, logger, gziphandler.GzipHandler(hf))) } - r.Get("/alerts", instr("alerts", api.alerts)) - r.Get("/rules", instr("rules", api.rules)) -} - -type RulesRetriever interface { - RuleGroups() []manager.Group - AlertingRules() []manager.AlertingRule -} - -func (api *API) rules(*http.Request) (interface{}, []error, *qapi.ApiError) { - res := &rulespb.RuleGroups{} - for _, grp := range api.ruleRetriever.RuleGroups() { - apiRuleGroup := &rulespb.RuleGroup{ - Name: grp.Name(), - File: grp.OriginalFile(), - Interval: grp.Interval().Seconds(), - PartialResponseStrategy: grp.PartialResponseStrategy, - } - - for _, r := range grp.Rules() { - lastError := "" - if r.LastError() != nil { - lastError = r.LastError().Error() - } - - switch rule := r.(type) { - case *rules.AlertingRule: - apiRuleGroup.Rules = append(apiRuleGroup.Rules, &rulespb.Rule{ - Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{ - State: rulespb.AlertState(rule.State()), - Name: rule.Name(), - Query: rule.Query().String(), - DurationSeconds: rule.Duration().Seconds(), - Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())}, - Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Annotations())}, - Alerts: rulesAlertsToAPIAlerts(grp.PartialResponseStrategy, rule.ActiveAlerts()), - Health: string(rule.Health()), - LastError: lastError, - EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(), - LastEvaluation: rule.GetEvaluationTimestamp(), - }}}) - case *rules.RecordingRule: - apiRuleGroup.Rules = append(apiRuleGroup.Rules, &rulespb.Rule{ - Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{ - Name: rule.Name(), - Query: rule.Query().String(), - Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())}, - Health: string(rule.Health()), - LastError: lastError, - EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(), - LastEvaluation: rule.GetEvaluationTimestamp(), - }}}) - default: - err := fmt.Errorf("rule %q: unsupported type %T", r.Name(), rule) - return nil, nil, &qapi.ApiError{Typ: qapi.ErrorInternal, Err: err} - } - } - res.Groups = append(res.Groups, apiRuleGroup) - } - - return res, nil, nil -} - -func (api *API) alerts(*http.Request) (interface{}, []error, *qapi.ApiError) { - var alerts []*rulespb.AlertInstance - for _, alertingRule := range api.ruleRetriever.AlertingRules() { - alerts = append( - alerts, - rulesAlertsToAPIAlerts(alertingRule.PartialResponseStrategy, alertingRule.ActiveAlerts())..., - ) - } - return struct{ Alerts []*rulespb.AlertInstance }{Alerts: alerts}, nil, nil -} - -func rulesAlertsToAPIAlerts(s storepb.PartialResponseStrategy, rulesAlerts []*rules.Alert) []*rulespb.AlertInstance { - apiAlerts := make([]*rulespb.AlertInstance, len(rulesAlerts)) - for i, ruleAlert := range rulesAlerts { - apiAlerts[i] = &rulespb.AlertInstance{ - PartialResponseStrategy: s, - Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Labels)}, - Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Annotations)}, - State: rulespb.AlertState(ruleAlert.State), - ActiveAt: &ruleAlert.ActiveAt, - Value: strconv.FormatFloat(ruleAlert.Value, 'e', -1, 64), - } - } - - return apiAlerts + r.Get("/alerts", instr("alerts", func(r *http.Request) (interface{}, []error, *qapi.ApiError) { + return struct{ Alerts []*rulespb.AlertInstance }{Alerts: api.alerts.Active()}, nil, nil + })) + r.Get("/rules", instr("rules", qapi.NewRulesHandler(api.ruleGroups))) } diff --git a/pkg/rules/api/v1_test.go b/pkg/rules/api/v1_test.go index 430bc545d78..8ede4160127 100644 --- a/pkg/rules/api/v1_test.go +++ b/pkg/rules/api/v1_test.go @@ -24,7 +24,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/storage/tsdb" qapi "github.com/thanos-io/thanos/pkg/query/api" - "github.com/thanos-io/thanos/pkg/rules/manager" + thanosrules "github.com/thanos-io/thanos/pkg/rules" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/testpromcompatibility" @@ -66,7 +66,7 @@ type rulesRetrieverMock struct { testing *testing.T } -func (m rulesRetrieverMock) RuleGroups() []manager.Group { +func (m rulesRetrieverMock) RuleGroups() []thanosrules.Group { storage := newStorage(m.testing) engineOpts := promql.EngineOpts{ @@ -97,18 +97,18 @@ func (m rulesRetrieverMock) RuleGroups() []manager.Group { recordingRule := rules.NewRecordingRule("recording-rule-1", recordingExpr, labels.Labels{}) r = append(r, recordingRule) - return []manager.Group{ - manager.Group{ + return []thanosrules.Group{ + thanosrules.Group{ Group: rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts), PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, }, } } -func (m rulesRetrieverMock) AlertingRules() []manager.AlertingRule { - var ars []manager.AlertingRule +func (m rulesRetrieverMock) AlertingRules() []thanosrules.AlertingRule { + var ars []thanosrules.AlertingRule for _, ar := range alertingRules(m.testing) { - ars = append(ars, manager.AlertingRule{AlertingRule: ar}) + ars = append(ars, thanosrules.AlertingRule{AlertingRule: ar}) } return ars } diff --git a/pkg/rules/manager.go b/pkg/rules/manager.go new file mode 100644 index 00000000000..51c227a974b --- /dev/null +++ b/pkg/rules/manager.go @@ -0,0 +1,349 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package rules + +import ( + "context" + "crypto/sha256" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/prometheus/prometheus/rules" + tsdberrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "gopkg.in/yaml.v2" +) + +const tmpRuleDir = ".tmp-rules" + +type Group struct { + *rules.Group + originalFile string + PartialResponseStrategy storepb.PartialResponseStrategy +} + +func (g Group) toProto() *rulespb.RuleGroup { + ret := &rulespb.RuleGroup{ + Name: g.Name(), + File: g.originalFile, + Interval: g.Interval().Seconds(), + PartialResponseStrategy: g.PartialResponseStrategy, + } + + for _, r := range g.Rules() { + lastError := "" + if r.LastError() != nil { + lastError = r.LastError().Error() + } + + switch rule := r.(type) { + case *rules.AlertingRule: + ret.Rules = append(ret.Rules, &rulespb.Rule{ + Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{ + State: rulespb.AlertState(rule.State()), + Name: rule.Name(), + Query: rule.Query().String(), + DurationSeconds: rule.Duration().Seconds(), + Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())}, + Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Annotations())}, + Alerts: activeAlertsToProto(g.PartialResponseStrategy, rule), + Health: string(rule.Health()), + LastError: lastError, + EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(), + LastEvaluation: rule.GetEvaluationTimestamp(), + }}}) + case *rules.RecordingRule: + ret.Rules = append(ret.Rules, &rulespb.Rule{ + Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{ + Name: rule.Name(), + Query: rule.Query().String(), + Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())}, + Health: string(rule.Health()), + LastError: lastError, + EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(), + LastEvaluation: rule.GetEvaluationTimestamp(), + }}}) + default: + // We cannot do much, let's panic, API will recover. + panic(fmt.Sprintf("rule %q: unsupported type %T", r.Name(), rule)) + } + } + return ret +} + +func activeAlertsToProto(s storepb.PartialResponseStrategy, a *rules.AlertingRule) []*rulespb.AlertInstance { + active := a.ActiveAlerts() + ret := make([]*rulespb.AlertInstance, len(active)) + for i, ruleAlert := range active { + ret[i] = &rulespb.AlertInstance{ + PartialResponseStrategy: s, + Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Labels)}, + Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Annotations)}, + State: rulespb.AlertState(ruleAlert.State), + ActiveAt: &ruleAlert.ActiveAt, + Value: strconv.FormatFloat(ruleAlert.Value, 'e', -1, 64), + } + } + return ret +} + +// configRuleGroups is what is parsed from config. +type configRuleGroups struct { + Groups []configRuleGroup `yaml:"groups"` +} + +type configRuleGroup struct { + rulefmt.RuleGroup + PartialResponseStrategy *storepb.PartialResponseStrategy +} + +// Manager is a partial response strategy and proto compatible Manager. +// Manager also implements rulespb.Rules gRPC service. +type Manager struct { + workDir string + mgrs map[storepb.PartialResponseStrategy]*rules.Manager + + mtx sync.RWMutex + ruleFiles map[string]string +} + +// NewManager creates new Manager. +// QueryFunc from baseOpts will be rewritten. +func NewManager(ctx context.Context, reg prometheus.Registerer, dataDir string, baseOpts rules.ManagerOptions, queryFuncCreator func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc) *Manager { + m := &Manager{ + workDir: filepath.Join(dataDir, tmpRuleDir), + mgrs: make(map[storepb.PartialResponseStrategy]*rules.Manager), + ruleFiles: make(map[string]string), + } + for _, strategy := range storepb.PartialResponseStrategy_value { + s := storepb.PartialResponseStrategy(strategy) + + opts := baseOpts + opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg) + opts.Context = ctx + opts.QueryFunc = queryFuncCreator(s) + + m.mgrs[s] = rules.NewManager(&opts) + } + + return m +} + +func (m *Manager) Run() { + for _, mgr := range m.mgrs { + mgr.Run() + } +} + +func (m *Manager) Stop() { + for _, mgr := range m.mgrs { + mgr.Stop() + } +} + +func (m *Manager) protoRuleGroups() []*rulespb.RuleGroup { + var res []*rulespb.RuleGroup + for _, g := range m.RuleGroups() { + res = append(res, g.toProto()) + } + return res +} + +func (m *Manager) RuleGroups() []Group { + m.mtx.RLock() + defer m.mtx.RUnlock() + var res []Group + for s, r := range m.mgrs { + for _, group := range r.RuleGroups() { + res = append(res, Group{ + Group: group, + originalFile: m.ruleFiles[group.File()], + PartialResponseStrategy: s, + }) + } + } + return res +} + +func (m *Manager) Active() []*rulespb.AlertInstance { + var res []*rulespb.AlertInstance + for s, r := range m.mgrs { + for _, r := range r.AlertingRules() { + res = append(res, activeAlertsToProto(s, r)...) + } + } + return res +} + +func (r *configRuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error { + rs := struct { + String string `yaml:"partial_response_strategy"` + }{} + + errMsg := fmt.Sprintf("failed to unmarshal 'partial_response_strategy'. Possible values are %s", strings.Join(storepb.PartialResponseStrategyValues, ",")) + if err := unmarshal(&rs); err != nil { + return errors.Wrap(err, errMsg) + } + + rg := rulefmt.RuleGroup{} + if err := unmarshal(&rg); err != nil { + return errors.Wrap(err, "failed to unmarshal rulefmt.configRuleGroup") + } + + p, ok := storepb.PartialResponseStrategy_value[strings.ToUpper(rs.String)] + if !ok { + if rs.String != "" { + return errors.Errorf("%s. Got: %s", errMsg, rs.String) + } + + // NOTE: For Rule default is abort as this is recommended for alerting. + p = storepb.PartialResponseStrategy_value[storepb.PartialResponseStrategy_ABORT.String()] + } + + ps := storepb.PartialResponseStrategy(p) + r.RuleGroup = rg + r.PartialResponseStrategy = &ps + return nil +} + +func (r configRuleGroup) MarshalYAML() (interface{}, error) { + var ps *string + if r.PartialResponseStrategy != nil { + str := r.PartialResponseStrategy.String() + ps = &str + } + + rs := struct { + RuleGroup rulefmt.RuleGroup `yaml:",inline"` + PartialResponseStrategy *string `yaml:"partial_response_strategy,omitempty"` + }{ + RuleGroup: r.RuleGroup, + PartialResponseStrategy: ps, + } + return rs, nil +} + +// Update updates rules from given files to all managers we hold. We decide which groups should go where, based on +// special field in configRuleGroup file. +func (m *Manager) Update(evalInterval time.Duration, files []string) error { + var ( + errs tsdberrors.MultiError + filesByStrategy = map[storepb.PartialResponseStrategy][]string{} + ruleFiles = map[string]string{} + ) + + if err := os.RemoveAll(m.workDir); err != nil { + return errors.Wrapf(err, "failed to remove %s", m.workDir) + } + if err := os.MkdirAll(m.workDir, os.ModePerm); err != nil { + return errors.Wrapf(err, "failed to create %s", m.workDir) + } + + for _, fn := range files { + b, err := ioutil.ReadFile(fn) + if err != nil { + errs = append(errs, err) + continue + } + + var rg configRuleGroups + if err := yaml.Unmarshal(b, &rg); err != nil { + errs = append(errs, errors.Wrap(err, fn)) + continue + } + + // NOTE: This is very ugly, but we need to reparse it into tmp dir without the field to have to reuse + // rules.Manager. The problem is that it uses yaml.UnmarshalStrict for some reasons. + groupsByStrategy := map[storepb.PartialResponseStrategy]*rulefmt.RuleGroups{} + for _, rg := range rg.Groups { + if _, ok := groupsByStrategy[*rg.PartialResponseStrategy]; !ok { + groupsByStrategy[*rg.PartialResponseStrategy] = &rulefmt.RuleGroups{} + } + + groupsByStrategy[*rg.PartialResponseStrategy].Groups = append( + groupsByStrategy[*rg.PartialResponseStrategy].Groups, + rg.RuleGroup, + ) + } + + for s, rg := range groupsByStrategy { + b, err := yaml.Marshal(rg) + if err != nil { + errs = append(errs, errors.Wrapf(err, "%s: failed to marshal rule groups", fn)) + continue + } + + newFn := filepath.Join(m.workDir, fmt.Sprintf("%s.%x.%s", filepath.Base(fn), sha256.Sum256([]byte(fn)), s.String())) + if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil { + errs = append(errs, errors.Wrap(err, newFn)) + continue + } + + filesByStrategy[s] = append(filesByStrategy[s], newFn) + ruleFiles[newFn] = fn + } + } + + m.mtx.Lock() + for s, fs := range filesByStrategy { + mgr, ok := m.mgrs[s] + if !ok { + errs = append(errs, errors.Errorf("no manager found for %v", s)) + continue + } + // We add external labels in `pkg/alert.Queue`. + // TODO(bwplotka): Investigate if we should put ext labels here or not. + if err := mgr.Update(evalInterval, fs, nil); err != nil { + errs = append(errs, errors.Wrapf(err, "strategy %s", s)) + continue + } + } + m.ruleFiles = ruleFiles + m.mtx.Unlock() + + return errs.Err() +} + +// Rules returns specified rules from manager. This is used by gRPC and locally for HTTP and UI purposes. +func (m *Manager) Rules(r *rulespb.RulesRequest, s rulespb.Rules_RulesServer) error { + groups := m.protoRuleGroups() + + pgs := make([]*rulespb.RuleGroup, 0, len(groups)) + for _, g := range groups { + if r.Type == rulespb.RulesRequest_ALL { + pgs = append(pgs, g) + continue + } + + filtered := &rulespb.RuleGroup{} + for _, rule := range g.Rules { + if rule.GetAlert() != nil && r.Type == rulespb.RulesRequest_ALERTING { + filtered.Rules = append(filtered.Rules, rule) + continue + } + if rule.GetRecording() != nil && r.Type == rulespb.RulesRequest_RECORDING { + filtered.Rules = append(filtered.Rules, rule) + } + } + pgs = append(pgs, filtered) + } + + for _, pg := range pgs { + if err := s.Send(&rulespb.RulesResponse{Result: &rulespb.RulesResponse_Group{Group: pg}}); err != nil { + return err + } + } + return nil +} diff --git a/pkg/rules/manager/rule.go b/pkg/rules/manager/rule.go deleted file mode 100644 index 87c33c6e37e..00000000000 --- a/pkg/rules/manager/rule.go +++ /dev/null @@ -1,223 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package manager - -import ( - "crypto/sha256" - "fmt" - "io/ioutil" - "os" - "path/filepath" - "strings" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/rulefmt" - "github.com/prometheus/prometheus/rules" - tsdberrors "github.com/prometheus/prometheus/tsdb/errors" - "github.com/thanos-io/thanos/pkg/store/storepb" - "gopkg.in/yaml.v2" -) - -const tmpRuleDir = ".tmp-rules" - -type Group struct { - *rules.Group - originalFile string - PartialResponseStrategy storepb.PartialResponseStrategy -} - -func (g Group) OriginalFile() string { - return g.originalFile -} - -type AlertingRule struct { - *rules.AlertingRule - PartialResponseStrategy storepb.PartialResponseStrategy -} - -type RuleGroups struct { - Groups []RuleGroup `yaml:"groups"` -} - -type RuleGroup struct { - rulefmt.RuleGroup - PartialResponseStrategy *storepb.PartialResponseStrategy -} - -type Manager struct { - workDir string - mgrs map[storepb.PartialResponseStrategy]*rules.Manager - - mtx sync.RWMutex - ruleFiles map[string]string -} - -func NewManager(dataDir string) *Manager { - return &Manager{ - workDir: filepath.Join(dataDir, tmpRuleDir), - mgrs: make(map[storepb.PartialResponseStrategy]*rules.Manager), - ruleFiles: make(map[string]string), - } -} - -func (m *Manager) SetRuleManager(s storepb.PartialResponseStrategy, mgr *rules.Manager) { - m.mgrs[s] = mgr -} - -func (m *Manager) RuleGroups() []Group { - m.mtx.RLock() - defer m.mtx.RUnlock() - var res []Group - for s, r := range m.mgrs { - for _, group := range r.RuleGroups() { - res = append(res, Group{ - Group: group, - PartialResponseStrategy: s, - originalFile: m.ruleFiles[group.File()], - }) - } - } - return res -} - -func (m *Manager) AlertingRules() []AlertingRule { - var res []AlertingRule - for s, r := range m.mgrs { - for _, r := range r.AlertingRules() { - res = append(res, AlertingRule{AlertingRule: r, PartialResponseStrategy: s}) - } - } - return res -} - -func (r *RuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error { - rs := struct { - String string `yaml:"partial_response_strategy"` - }{} - - errMsg := fmt.Sprintf("failed to unmarshal 'partial_response_strategy'. Possible values are %s", strings.Join(storepb.PartialResponseStrategyValues, ",")) - if err := unmarshal(&rs); err != nil { - return errors.Wrap(err, errMsg) - } - - rg := rulefmt.RuleGroup{} - if err := unmarshal(&rg); err != nil { - return errors.Wrap(err, "failed to unmarshal rulefmt.RuleGroup") - } - - p, ok := storepb.PartialResponseStrategy_value[strings.ToUpper(rs.String)] - if !ok { - if rs.String != "" { - return errors.Errorf("%s. Got: %s", errMsg, rs.String) - } - - // NOTE: For Rule default is abort as this is recommended for alerting. - p = storepb.PartialResponseStrategy_value[storepb.PartialResponseStrategy_ABORT.String()] - } - - ps := storepb.PartialResponseStrategy(p) - r.RuleGroup = rg - r.PartialResponseStrategy = &ps - return nil -} - -func (r RuleGroup) MarshalYAML() (interface{}, error) { - var ps *string - if r.PartialResponseStrategy != nil { - str := r.PartialResponseStrategy.String() - ps = &str - } - - rs := struct { - RuleGroup rulefmt.RuleGroup `yaml:",inline"` - PartialResponseStrategy *string `yaml:"partial_response_strategy,omitempty"` - }{ - RuleGroup: r.RuleGroup, - PartialResponseStrategy: ps, - } - return rs, nil -} - -// Update updates rules from given files to all managers we hold. We decide which groups should go where, based on -// special field in RuleGroup file. -func (m *Manager) Update(evalInterval time.Duration, files []string) error { - var ( - errs tsdberrors.MultiError - filesByStrategy = map[storepb.PartialResponseStrategy][]string{} - ruleFiles = map[string]string{} - ) - - if err := os.RemoveAll(m.workDir); err != nil { - return errors.Wrapf(err, "failed to remove %s", m.workDir) - } - if err := os.MkdirAll(m.workDir, os.ModePerm); err != nil { - return errors.Wrapf(err, "failed to create %s", m.workDir) - } - - for _, fn := range files { - b, err := ioutil.ReadFile(fn) - if err != nil { - errs = append(errs, err) - continue - } - - var rg RuleGroups - if err := yaml.Unmarshal(b, &rg); err != nil { - errs = append(errs, errors.Wrap(err, fn)) - continue - } - - // NOTE: This is very ugly, but we need to reparse it into tmp dir without the field to have to reuse - // rules.Manager. The problem is that it uses yaml.UnmarshalStrict for some reasons. - groupsByStrategy := map[storepb.PartialResponseStrategy]*rulefmt.RuleGroups{} - for _, rg := range rg.Groups { - if _, ok := groupsByStrategy[*rg.PartialResponseStrategy]; !ok { - groupsByStrategy[*rg.PartialResponseStrategy] = &rulefmt.RuleGroups{} - } - - groupsByStrategy[*rg.PartialResponseStrategy].Groups = append( - groupsByStrategy[*rg.PartialResponseStrategy].Groups, - rg.RuleGroup, - ) - } - - for s, rg := range groupsByStrategy { - b, err := yaml.Marshal(rg) - if err != nil { - errs = append(errs, errors.Wrapf(err, "%s: failed to marshal rule groups", fn)) - continue - } - - newFn := filepath.Join(m.workDir, fmt.Sprintf("%s.%x.%s", filepath.Base(fn), sha256.Sum256([]byte(fn)), s.String())) - if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil { - errs = append(errs, errors.Wrap(err, newFn)) - continue - } - - filesByStrategy[s] = append(filesByStrategy[s], newFn) - ruleFiles[newFn] = fn - } - } - - m.mtx.Lock() - for s, fs := range filesByStrategy { - mgr, ok := m.mgrs[s] - if !ok { - errs = append(errs, errors.Errorf("no manager found for %v", s)) - continue - } - // We add external labels in `pkg/alert.Queue`. - // TODO(bwplotka): Investigate if we should put ext labels here or not. - if err := mgr.Update(evalInterval, fs, nil); err != nil { - errs = append(errs, errors.Wrapf(err, "strategy %s", s)) - continue - } - } - m.ruleFiles = ruleFiles - m.mtx.Unlock() - - return errs.Err() -} diff --git a/pkg/rules/manager/rule_test.go b/pkg/rules/manager_test.go similarity index 74% rename from pkg/rules/manager/rule_test.go rename to pkg/rules/manager_test.go index b88f87c2e77..13460c2c4fa 100644 --- a/pkg/rules/manager/rule_test.go +++ b/pkg/rules/manager_test.go @@ -1,7 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -package manager +package rules import ( "context" @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/rulefmt" @@ -53,27 +54,29 @@ groups: queryOnce sync.Once query string ) - opts := rules.ManagerOptions{ - Logger: log.NewLogfmtLogger(os.Stderr), - Context: context.Background(), - QueryFunc: func(ctx context.Context, q string, t time.Time) (vectors promql.Vector, e error) { - queryOnce.Do(func() { - query = q - close(queryDone) - }) - return promql.Vector{}, nil + thanosRuleMgr := NewManager( + context.Background(), + nil, + dir, + rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + Context: context.Background(), + Appendable: nopAppendable{}, }, - Appendable: nopAppendable{}, - } - thanosRuleMgr := NewManager(dir) - ruleMgr := rules.NewManager(&opts) - thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgr) - thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgr) - + func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + return func(ctx context.Context, q string, t time.Time) (vectors promql.Vector, e error) { + queryOnce.Do(func() { + query = q + close(queryDone) + }) + return promql.Vector{}, nil + } + }, + ) testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")})) - ruleMgr.Run() - defer ruleMgr.Stop() + thanosRuleMgr.Run() + defer thanosRuleMgr.Stop() select { case <-time.After(2 * time.Minute): @@ -84,7 +87,7 @@ groups: testutil.Equals(t, "rate(some_metric[1h:5m] offset 1d)", query) } -func TestUpdate(t *testing.T) { +func TestUpdate_Error_UpdatePartial(t *testing.T) { dir, err := ioutil.TempDir("", "test_rule_rule_groups") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() @@ -148,14 +151,20 @@ groups: expr: "up" `), os.ModePerm)) - opts := rules.ManagerOptions{ - Logger: log.NewLogfmtLogger(os.Stderr), - } - m := NewManager(dir) - m.SetRuleManager(storepb.PartialResponseStrategy_ABORT, rules.NewManager(&opts)) - m.SetRuleManager(storepb.PartialResponseStrategy_WARN, rules.NewManager(&opts)) - - err = m.Update(10*time.Second, []string{ + thanosRuleMgr := NewManager( + context.Background(), + nil, + dir, + rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + }, + func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + return nil, nil + } + }, + ) + err = thanosRuleMgr.Update(10*time.Second, []string{ filepath.Join(dir, "no_strategy.yaml"), filepath.Join(dir, "abort.yaml"), filepath.Join(dir, "warn.yaml"), @@ -169,7 +178,7 @@ groups: testutil.Assert(t, strings.Contains(err.Error(), "wrong.yaml: failed to unmarshal 'partial_response_strategy'"), err.Error()) testutil.Assert(t, strings.Contains(err.Error(), "non_existing.yaml: no such file or directory"), err.Error()) - g := m.RuleGroups() + g := thanosRuleMgr.RuleGroups() sort.Slice(g, func(i, j int) bool { return g[i].Name() < g[j].Name() }) @@ -216,11 +225,16 @@ groups: }, } testutil.Equals(t, len(exp), len(g)) + for i := range exp { t.Run(exp[i].name, func(t *testing.T) { testutil.Equals(t, exp[i].strategy, g[i].PartialResponseStrategy) testutil.Equals(t, exp[i].name, g[i].Name()) - testutil.Equals(t, exp[i].file, g[i].OriginalFile()) + + p := g[i].ToProto() + testutil.Equals(t, exp[i].strategy, p.PartialResponseStrategy) + testutil.Equals(t, exp[i].name, p.Name) + testutil.Equals(t, exp[i].file, p.File) }) } } @@ -239,8 +253,8 @@ func TestRuleGroupMarshalYAML(t *testing.T) { ` a := storepb.PartialResponseStrategy_ABORT - var input = RuleGroups{ - Groups: []RuleGroup{ + var input = configRuleGroups{ + Groups: []configRuleGroup{ { RuleGroup: rulefmt.RuleGroup{ Name: "something1", @@ -272,3 +286,33 @@ func TestRuleGroupMarshalYAML(t *testing.T) { testutil.Equals(t, expected, string(b)) } + +func TestManager_Rules(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + dir, err := ioutil.TempDir("", "test_rule_run") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + curr, err := os.Getwd() + testutil.Ok(t, err) + + thanosRuleMgr := NewManager( + context.Background(), + nil, + dir, + rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + }, + func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + return nil, nil + } + }, + ) + testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{ + filepath.Join(curr, "../../examples/alerts/alerts.yaml"), + filepath.Join(curr, "../../examples/alerts/rules.yaml"), + })) + testRulesAgainstExamples(t, filepath.Join(curr, "examples/alerts"), thanosRuleMgr) +} diff --git a/pkg/rules/prometheus.go b/pkg/rules/prometheus.go index 34a775a5744..aaa6287554a 100644 --- a/pkg/rules/prometheus.go +++ b/pkg/rules/prometheus.go @@ -10,7 +10,7 @@ import ( "github.com/thanos-io/thanos/pkg/rules/rulespb" ) -// Prometheus implements rulespb.Rules. +// Prometheus implements rulespb.Rules gRPC that allows to fetch rules from Prometheus HTTP api/v1/rules endpoint. type Prometheus struct { base *url.URL client *promclient.Client diff --git a/pkg/rules/prometheus_test.go b/pkg/rules/prometheus_test.go index 6c523f9327b..71e19c9ff4d 100644 --- a/pkg/rules/prometheus_test.go +++ b/pkg/rules/prometheus_test.go @@ -4,33 +4,26 @@ package rules import ( - "context" "fmt" "net/url" "os" + "path/filepath" "testing" "time" "github.com/fortytw2/leaktest" "github.com/thanos-io/thanos/pkg/promclient" - "github.com/thanos-io/thanos/pkg/rules/rulespb" - "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) -func TestPrometheusStore_Rules_e2e(t *testing.T) { - t.Helper() - +func TestPrometheus_Rules_e2e(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() p, err := e2eutil.NewPrometheus() testutil.Ok(t, err) defer func() { testutil.Ok(t, p.Stop()) }() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - curr, err := os.Getwd() testutil.Ok(t, err) testutil.Ok(t, p.SetConfig(fmt.Sprintf(` @@ -48,131 +41,5 @@ rule_files: testutil.Ok(t, err) promRules := NewPrometheus(u, promclient.NewDefaultClient()) - - someAlert := &rulespb.Rule{Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{Name: "some"}}} - someRecording := &rulespb.Rule{Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{Name: "some"}}} - - for _, tcase := range []struct { - expected []*rulespb.RuleGroup - expectedErr error - }{ - { - expected: []*rulespb.RuleGroup{ - { - Name: "thanos-bucket-replicate.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-compact.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-component-absent.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-query.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-receive.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-rule.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-sidecar.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-store.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-bucket-replicate.rules", - File: fmt.Sprintf("%s/../../examples/alerts/rules.yaml", curr), - Rules: []*rulespb.Rule{}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-query.rules", - File: fmt.Sprintf("%s/../../examples/alerts/rules.yaml", curr), - Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording, someRecording}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-receive.rules", File: fmt.Sprintf("%s/../../examples/alerts/rules.yaml", curr), - Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording, someRecording, someRecording}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-store.rules", - File: fmt.Sprintf("%s/../../examples/alerts/rules.yaml", curr), - Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - }, - // TODO(bwplotka): Potentially add more cases. - }, - } { - t.Run("", func(t *testing.T) { - srv := &rulesServer{ctx: ctx} - err = promRules.Rules(&rulespb.RulesRequest{}, srv) - if tcase.expectedErr != nil { - testutil.NotOk(t, err) - testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) - return - } - - // We don't want to be picky, just check what number and types of rules within group are. - got := srv.groups - for i, g := range got { - for j, r := range g.Rules { - if r.GetAlert() != nil { - got[i].Rules[j] = someAlert - continue - } - if r.GetRecording() != nil { - got[i].Rules[j] = someRecording - continue - } - t.Fatalf("Found rule in group %s that is neither recording not alert.", g.Name) - } - } - - testutil.Ok(t, err) - testutil.Equals(t, []error(nil), srv.warnings) - testutil.Equals(t, tcase.expected, srv.groups) - }) - } + testRulesAgainstExamples(t, filepath.Join(curr, "examples/alerts"), promRules) } diff --git a/pkg/rules/proxy.go b/pkg/rules/proxy.go index 6cd7ecb437b..7be3756a499 100644 --- a/pkg/rules/proxy.go +++ b/pkg/rules/proxy.go @@ -18,7 +18,7 @@ import ( "google.golang.org/grpc/status" ) -// Proxy implements rulespb.Rules that fanouts requests to given rulespb.Rules and deduplication on the way. +// Proxy implements rulespb.Rules gRPC that fanouts requests to given rulespb.Rules and deduplication on the way. type Proxy struct { logger log.Logger rules func() []rulespb.RulesClient diff --git a/pkg/rules/proxy_test.go b/pkg/rules/proxy_test.go index 44c959c8ed4..6fcef874406 100644 --- a/pkg/rules/proxy_test.go +++ b/pkg/rules/proxy_test.go @@ -4,10 +4,10 @@ package rules import ( - "reflect" "testing" "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/testutil" ) func TestDedupGroups(t *testing.T) { @@ -148,10 +148,7 @@ func TestDedupGroups(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - got := dedupGroups(tc.groups) - if !reflect.DeepEqual(tc.want, got) { - t.Errorf("want groups %v, got %v", tc.want, got) - } + testutil.Equals(t, tc.want, dedupGroups(tc.groups)) }) } } diff --git a/pkg/rules/rules.go b/pkg/rules/rules.go index b8646fc17e4..ec4ed0c9eb1 100644 --- a/pkg/rules/rules.go +++ b/pkg/rules/rules.go @@ -9,27 +9,32 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/rules/rulespb" - "github.com/thanos-io/thanos/pkg/store/storepb" ) -// Retriever allows to retrieve rules from gRPC server implementation. +var _ UnaryClient = &GRPCClient{} + +// UnaryClient is gRPC rulespb.Rules client which expands streaming rules API. Useful for consumers that does not +// support streaming. +type UnaryClient interface { + Rules(ctx context.Context, req *rulespb.RulesRequest) ([]*rulespb.RuleGroup, storage.Warnings, error) +} + +// GRPCClient allows to retrieve rules from local gRPC streaming server implementation. // TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available. -type Retriever struct { +type GRPCClient struct { proxy rulespb.RulesServer } -func NewRetriever(rs rulespb.RulesServer) *Retriever { - return &Retriever{ +func NewGRPCClient(rs rulespb.RulesServer) *GRPCClient { + return &GRPCClient{ proxy: rs, } } -func (rr *Retriever) RuleGroups(ctx context.Context) ([]*rulespb.RuleGroup, storage.Warnings, error) { +func (rr *GRPCClient) Rules(ctx context.Context, req *rulespb.RulesRequest) ([]*rulespb.RuleGroup, storage.Warnings, error) { resp := &rulesServer{ctx: ctx} - if err := rr.proxy.Rules(&rulespb.RulesRequest{ - PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, - }, resp); err != nil { + if err := rr.proxy.Rules(req, resp); err != nil { return nil, nil, errors.Wrap(err, "proxy RuleGroups()") } diff --git a/pkg/rules/rules_test.go b/pkg/rules/rules_test.go new file mode 100644 index 00000000000..3210d75df05 --- /dev/null +++ b/pkg/rules/rules_test.go @@ -0,0 +1,165 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package rules + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/fortytw2/leaktest" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +// testRulesAgainstExamples tests against alerts.yaml and rules.yaml examples. +func testRulesAgainstExamples(t *testing.T, dir string, server rulespb.RulesServer) { + t.Helper() + + defer leaktest.CheckTimeout(t, 10*time.Second)() + + // We don't test internals, just if groups are expected. + someAlert := &rulespb.Rule{Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{Name: "some"}}} + someRecording := &rulespb.Rule{Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{Name: "some"}}} + + alerts := []*rulespb.RuleGroup{ + { + Name: "thanos-bucket-replicate.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-compact.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-component-absent.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-query.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-receive.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-rule.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-sidecar.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-store.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-bucket-replicate.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + } + rules := []*rulespb.RuleGroup{ + { + Name: "thanos-query.rules", + File: filepath.Join(dir, "rules.yaml"), + Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording, someRecording}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-receive.rules", + File: filepath.Join(dir, "rules.yaml"), + Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording, someRecording, someRecording}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + { + Name: "thanos-store.rules", + File: filepath.Join(dir, "rules.yaml"), + Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, + }, + } + + for _, tcase := range []struct { + requestedType rulespb.RulesRequest_Type + + expected []*rulespb.RuleGroup + expectedErr error + }{ + { + requestedType: rulespb.RulesRequest_ALL, + expected: append(append([]*rulespb.RuleGroup{}, alerts...), rules...), + }, + { + requestedType: rulespb.RulesRequest_ALERTING, + expected: append([]*rulespb.RuleGroup{}, alerts...), + }, + { + requestedType: rulespb.RulesRequest_RECORDING, + expected: append([]*rulespb.RuleGroup{}, rules...), + }, + } { + t.Run("", func(t *testing.T) { + got, w, err := NewGRPCClient(server).Rules(context.Background(), &rulespb.RulesRequest{ + Type: tcase.requestedType, + }) + testutil.Equals(t, []error(nil), w) + if tcase.expectedErr != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) + return + } + testutil.Ok(t, err) + + // We don't want to be picky, just check what number and types of rules within group are. + for i, g := range got { + for j, r := range g.Rules { + if r.GetAlert() != nil { + got[i].Rules[j] = someAlert + continue + } + if r.GetRecording() != nil { + got[i].Rules[j] = someRecording + continue + } + t.Fatalf("Found rule in group %s that is neither recording not alert.", g.Name) + } + } + testutil.Equals(t, tcase.expected, got) + }) + } +} diff --git a/pkg/ui/rule.go b/pkg/ui/rule.go index 8333edb4673..62c8dbf8b0e 100644 --- a/pkg/ui/rule.go +++ b/pkg/ui/rule.go @@ -17,7 +17,7 @@ import ( "github.com/prometheus/common/route" "github.com/prometheus/prometheus/rules" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" - "github.com/thanos-io/thanos/pkg/rules/manager" + thanosrules "github.com/thanos-io/thanos/pkg/rules" ) type Rule struct { @@ -25,12 +25,12 @@ type Rule struct { flagsMap map[string]string - ruleManager *manager.Manager + ruleManager *thanosrules.Manager queryURL string reg prometheus.Registerer } -func NewRuleUI(logger log.Logger, reg prometheus.Registerer, ruleManager *manager.Manager, queryURL string, flagsMap map[string]string) *Rule { +func NewRuleUI(logger log.Logger, reg prometheus.Registerer, ruleManager *thanosrules.Manager, queryURL string, flagsMap map[string]string) *Rule { return &Rule{ BaseUI: NewBaseUI(logger, "rule_menu.html", ruleTmplFuncs(queryURL)), flagsMap: flagsMap, @@ -116,7 +116,7 @@ func ruleTmplFuncs(queryURL string) template.FuncMap { } func (ru *Rule) alerts(w http.ResponseWriter, r *http.Request) { - var groups []manager.Group + var groups []thanosrules.Group for _, group := range ru.ruleManager.RuleGroups() { if group.HasAlertingRules() { groups = append(groups, group) @@ -167,7 +167,7 @@ func (ru *Rule) Register(r *route.Router, ins extpromhttp.InstrumentationMiddlew // AlertStatus bundles alerting rules and the mapping of alert states to row classes. type AlertStatus struct { - Groups []manager.Group + Groups []thanosrules.Group AlertStateToRowClass map[rules.AlertState]string Counts AlertByStateCount } @@ -178,7 +178,7 @@ type AlertByStateCount struct { Firing int32 } -func alertCounts(groups []manager.Group) AlertByStateCount { +func alertCounts(groups []thanosrules.Group) AlertByStateCount { result := AlertByStateCount{} for _, group := range groups { for _, alert := range group.AlertingRules() {