From 77269f6fcd4d77ded83573265c63d123d9903d65 Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Tue, 5 May 2020 07:10:09 +0100 Subject: [PATCH] Added Ruler support for RulesAPI; Refactored Manager. As per: https://thanos.io/proposals/202003_thanos_rules_federation.md/ Signed-off-by: Bartlomiej Plotka --- .circleci/config.yml | 2 +- CHANGELOG.md | 2 + Makefile | 4 +- cmd/thanos/query.go | 2 +- cmd/thanos/rule.go | 153 ++++---- cmd/thanos/sidecar.go | 2 +- pkg/promclient/promclient.go | 4 + pkg/query/api/v1.go | 82 ++-- pkg/rules/api/v1.go | 129 ++----- pkg/rules/manager.go | 357 ++++++++++++++++++ pkg/rules/manager/rule.go | 238 ------------ .../{manager/rule_test.go => manager_test.go} | 162 ++++---- pkg/rules/prometheus.go | 41 +- pkg/rules/prometheus_test.go | 152 +------- pkg/rules/proxy.go | 2 +- pkg/rules/rules.go | 23 +- pkg/rules/rules_test.go | 188 +++++++++ pkg/rules/rulespb/custom.go | 6 + pkg/rules/rulespb/rpc.pb.go | 151 ++++---- pkg/rules/rulespb/rpc.proto | 11 +- pkg/store/prometheus.go | 6 +- pkg/store/storepb/custom.go | 38 +- pkg/store/storepb/custom_test.go | 10 + pkg/testutil/e2eutil/prometheus.go | 2 +- pkg/ui/rule.go | 12 +- test/e2e/e2ethanos/services.go | 2 +- test/e2e/query_test.go | 68 ---- test/e2e/rules_api_test.go | 103 +++++ 28 files changed, 1080 insertions(+), 872 deletions(-) create mode 100644 pkg/rules/manager.go delete mode 100644 pkg/rules/manager/rule.go rename pkg/rules/{manager/rule_test.go => manager_test.go} (71%) create mode 100644 pkg/rules/rules_test.go create mode 100644 test/e2e/rules_api_test.go diff --git a/.circleci/config.yml b/.circleci/config.yml index 0133c5d6b9a..5d076366c4f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -4,7 +4,7 @@ version: 2 defaults: &defaults docker: # Built by Thanos make docker-ci - - image: quay.io/thanos/thanos-ci:go1.14.2-node + - image: quay.io/thanos/thanos-ci:v1.1-go1.14.2-node jobs: test: <<: *defaults diff --git a/CHANGELOG.md b/CHANGELOG.md index b9ad0f628b6..b4dcaa2b84d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel ## Unreleased +TODO: Add rules API, also now StoreAPI consistently solves clashes in external labels vs metrics - by choosing external always. + ### Fixed - [#2536](https://github.com/thanos-io/thanos/pull/2536) minio-go: Fixed AWS STS endpoint url to https for Web Identity providers on AWS EKS diff --git a/Makefile b/Makefile index a9deed30417..fa70b3c8899 100644 --- a/Makefile +++ b/Makefile @@ -70,8 +70,8 @@ ME ?= $(shell whoami) # Referenced by github.com/thanos-io/thanos/blob/master/docs/getting_started.md#prometheus # Limited prom version, because testing was not possible. This should fix it: https://github.com/thanos-io/thanos/issues/758 -PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0 -PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0 +PROM_VERSIONS ?= v2.4.3 v2.5.0 v2.8.1 v2.9.2 v2.13.0 v2.18.0 +PROMS ?= $(GOBIN)/prometheus-v2.4.3 $(GOBIN)/prometheus-v2.5.0 $(GOBIN)/prometheus-v2.8.1 $(GOBIN)/prometheus-v2.9.2 $(GOBIN)/prometheus-v2.13.0 $(GOBIN)/prometheus-v2.18.0 ALERTMANAGER_VERSION ?= v0.20.0 ALERTMANAGER ?= $(GOBIN)/alertmanager-$(ALERTMANAGER_VERSION) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index b6db0cf4c84..ab9847fc04b 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -388,7 +388,7 @@ func runQuery( enablePartialResponse, queryReplicaLabels, instantDefaultMaxSourceResolution, - rules.NewRetriever(rulesProxy), + rules.NewGRPCClient(rulesProxy), ) api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index b128215a12c..05aa15e2983 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -40,8 +40,8 @@ import ( "github.com/thanos-io/thanos/pkg/prober" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/query" + thanosrules "github.com/thanos-io/thanos/pkg/rules" v1 "github.com/thanos-io/thanos/pkg/rules/api" - rulesmanager "github.com/thanos-io/thanos/pkg/rules/manager" "github.com/thanos-io/thanos/pkg/runutil" grpcserver "github.com/thanos-io/thanos/pkg/server/grpc" httpserver "github.com/thanos-io/thanos/pkg/server/http" @@ -397,13 +397,13 @@ func runRule( alertmgrs = append(alertmgrs, alert.NewAlertmanager(logger, amClient, time.Duration(cfg.Timeout), cfg.APIVersion)) } - // Run rule evaluation and alert notifications. var ( + ruleMgr *thanosrules.Manager alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) - ruleMgr = rulesmanager.NewManager(dataDir) ) { - notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { + // Run rule evaluation and alert notifications. + notifyFunc := func(ctx context.Context, expr string, alerts ...*rules.Alert) { res := make([]*alert.Alert, 0, len(alerts)) for _, alrt := range alerts { // Only send actually firing alerts. @@ -425,41 +425,38 @@ func runRule( } alertQ.Push(res) } - st := tsdb.Adapter(db, 0) - - opts := rules.ManagerOptions{ - NotifyFunc: notify, - Logger: log.With(logger, "component", "rules"), - Appendable: st, - ExternalURL: nil, - TSDB: st, - ResendDelay: resendDelay, - } - - // TODO(bwplotka): Hide this behind thanos rules.Manager. - for _, strategy := range storepb.PartialResponseStrategy_value { - s := storepb.PartialResponseStrategy(strategy) - ctx, cancel := context.WithCancel(context.Background()) - ctx = tracing.ContextWithTracer(ctx, tracer) + ctx, cancel := context.WithCancel(context.Background()) - opts := opts - opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg) - opts.Context = ctx - opts.QueryFunc = queryFunc(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, s) + st := tsdb.Adapter(db, 0) + logger = log.With(logger, "component", "rules") + ruleMgr = thanosrules.NewManager( + tracing.ContextWithTracer(ctx, tracer), + reg, + dataDir, + rules.ManagerOptions{ + NotifyFunc: notifyFunc, + Logger: logger, + Appendable: st, + ExternalURL: nil, + TSDB: st, + ResendDelay: resendDelay, + }, + queryFuncCreator(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings), + lset, + ) - mgr := rules.NewManager(&opts) - ruleMgr.SetRuleManager(s, mgr) - g.Add(func() error { - mgr.Run() - <-ctx.Done() + // Schedule rule manager that evaluates rules. + g.Add(func() error { + ruleMgr.Run() + <-ctx.Done() - return nil - }, func(error) { - cancel() - mgr.Stop() - }) - } + return nil + }, func(err error) { + cancel() + ruleMgr.Stop() + }) + ruleMgr.Run() } // Run the alert sender. { @@ -533,7 +530,7 @@ func runRule( } // TODO: Add rules API implementation when ready. - s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, nil, + s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, store, ruleMgr, grpcserver.WithListen(grpcBindAddr), grpcserver.WithGracePeriod(grpcGracePeriod), grpcserver.WithTLSConfig(tlsCfg), @@ -575,7 +572,7 @@ func runRule( // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. ui.NewRuleUI(logger, reg, ruleMgr, alertQueryURL.String(), webExternalPrefix, webPrefixHeaderName).Register(router, ins) - api := v1.NewAPI(logger, reg, ruleMgr) + api := v1.NewAPI(logger, reg, thanosrules.NewGRPCClient(ruleMgr), ruleMgr) api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins) srv := httpserver.New(logger, reg, comp, httpProbe, @@ -684,57 +681,59 @@ func removeDuplicateQueryEndpoints(logger log.Logger, duplicatedQueriers prometh return deduplicated } -// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result -// back or the context get canceled. -func queryFunc( +func queryFuncCreator( logger log.Logger, queriers []*http_util.Client, duplicatedQuery prometheus.Counter, ruleEvalWarnings *prometheus.CounterVec, - partialResponseStrategy storepb.PartialResponseStrategy, -) rules.QueryFunc { - var spanID string - - switch partialResponseStrategy { - case storepb.PartialResponseStrategy_WARN: - spanID = "/rule_instant_query HTTP[client]" - case storepb.PartialResponseStrategy_ABORT: - spanID = "/rule_instant_query_part_resp_abort HTTP[client]" - default: - // Programming error will be caught by tests. - panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) - } +) func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + + // queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result + // back or the context get canceled. + return func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + var spanID string + + switch partialResponseStrategy { + case storepb.PartialResponseStrategy_WARN: + spanID = "/rule_instant_query HTTP[client]" + case storepb.PartialResponseStrategy_ABORT: + spanID = "/rule_instant_query_part_resp_abort HTTP[client]" + default: + // Programming error will be caught by tests. + panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) + } - promClients := make([]*promclient.Client, 0, len(queriers)) - for _, q := range queriers { - promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule")) - } + promClients := make([]*promclient.Client, 0, len(queriers)) + for _, q := range queriers { + promClients = append(promClients, promclient.NewClient(q, logger, "thanos-rule")) + } - return func(ctx context.Context, q string, t time.Time) (v promql.Vector, err error) { - for _, i := range rand.Perm(len(queriers)) { - promClient := promClients[i] - endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) - for _, i := range rand.Perm(len(endpoints)) { - var warns []string - tracing.DoInSpan(ctx, spanID, func(ctx context.Context) { - v, warns, err = promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{ + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + for _, i := range rand.Perm(len(queriers)) { + promClient := promClients[i] + endpoints := removeDuplicateQueryEndpoints(logger, duplicatedQuery, queriers[i].Endpoints()) + for _, i := range rand.Perm(len(endpoints)) { + span, ctx := tracing.StartSpan(ctx, spanID) + v, warns, err := promClient.PromqlQueryInstant(ctx, endpoints[i], q, t, promclient.QueryOptions{ Deduplicate: true, PartialResponseStrategy: partialResponseStrategy, }) - }) - if err != nil { - level.Error(logger).Log("err", err, "query", q) - continue - } - if len(warns) > 0 { - ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() - // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): - level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) + span.Finish() + + if err != nil { + level.Error(logger).Log("err", err, "query", q) + continue + } + if len(warns) > 0 { + ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() + // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): + level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) + } + return v, nil } - return v, nil } + return nil, errors.Errorf("no query API server reachable") } - return nil, errors.New("no query API server reachable") } } @@ -759,7 +758,7 @@ func addDiscoveryGroups(g *run.Group, c *http_util.Client, interval time.Duratio func reloadRules(logger log.Logger, ruleFiles []string, - ruleMgr *rulesmanager.Manager, + ruleMgr *thanosrules.Manager, evalInterval time.Duration, metrics *RuleMetrics) error { level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ",")) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 07afebf8405..9a7595172c4 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -223,7 +223,7 @@ func runSidecar( return errors.Wrap(err, "setup gRPC server") } - s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c), + s := grpcserver.New(logger, reg, tracer, comp, grpcProbe, promStore, rules.NewPrometheus(conf.prometheus.url, c, m.Labels), grpcserver.WithListen(conf.grpc.bindAddress), grpcserver.WithGracePeriod(time.Duration(conf.grpc.gracePeriod)), grpcserver.WithTLSConfig(tlsCfg), diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 9ae81d63e39..3d1c9949f5e 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -628,5 +628,9 @@ func (c *Client) RulesInGRPC(ctx context.Context, base *url.URL, typeRules strin return nil, err } + // Prometheus does not support PartialResponseStrategy, and probably would never do. Make it Abort by default. + for _, g := range m.Data.Groups { + g.PartialResponseStrategy = storepb.PartialResponseStrategy_ABORT + } return m.Data.Groups, nil } diff --git a/pkg/query/api/v1.go b/pkg/query/api/v1.go index de79c0f7f31..2b977b2a608 100644 --- a/pkg/query/api/v1.go +++ b/pkg/query/api/v1.go @@ -42,8 +42,10 @@ import ( "github.com/prometheus/prometheus/storage" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/rules" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -98,17 +100,13 @@ func SetCORS(w http.ResponseWriter) { type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError) -type rulesRetriever interface { - RuleGroups(context.Context) ([]*rulespb.RuleGroup, storage.Warnings, error) -} - // API can register a set of endpoints in a router and handle // them using the provided storage and query engine. type API struct { logger log.Logger queryableCreate query.QueryableCreator queryEngine *promql.Engine - rulesRetriever rulesRetriever + ruleGroups rules.UnaryClient enableAutodownsampling bool enablePartialResponse bool @@ -131,7 +129,7 @@ func NewAPI( enablePartialResponse bool, replicaLabels []string, defaultInstantQueryMaxSourceResolution time.Duration, - rr rulesRetriever, + ruleGroups rules.UnaryClient, ) *API { return &API{ logger: logger, @@ -143,7 +141,7 @@ func NewAPI( reg: reg, storeSet: storeSet, defaultInstantQueryMaxSourceResolution: defaultInstantQueryMaxSourceResolution, - rulesRetriever: rr, + ruleGroups: ruleGroups, now: time.Now, } @@ -181,9 +179,9 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. r.Get("/labels", instr("label_names", api.labelNames)) r.Post("/labels", instr("label_names", api.labelNames)) - r.Get("/rules", instr("rules", api.rules)) - r.Get("/stores", instr("stores", api.stores)) + + r.Get("/rules", instr("rules", NewRulesHandler(api.ruleGroups))) } type queryData struct { @@ -643,57 +641,29 @@ func (api *API) labelNames(r *http.Request) (interface{}, []error, *ApiError) { return names, warnings, nil } -func (api *API) rules(r *http.Request) (interface{}, []error, *ApiError) { - var ( - res = &rulespb.RuleGroups{} - typeParam = strings.ToLower(r.URL.Query().Get("type")) - ) - - if typeParam != "" && typeParam != "alert" && typeParam != "record" { - return nil, nil, &ApiError{errorBadData, errors.Errorf("invalid query parameter type='%v'", typeParam)} - } - - returnAlerts := typeParam == "" || typeParam == "alert" - returnRecording := typeParam == "" || typeParam == "record" - - groups, warnings, err := api.rulesRetriever.RuleGroups(r.Context()) - if err != nil { - return nil, nil, &ApiError{ErrorInternal, fmt.Errorf("error retrieving rules: %v", err)} - } - - for _, grp := range groups { - apiRuleGroup := &rulespb.RuleGroup{ - Name: grp.Name, - File: grp.File, - Interval: grp.Interval, - EvaluationDurationSeconds: grp.EvaluationDurationSeconds, - LastEvaluation: grp.LastEvaluation, - DeprecatedPartialResponseStrategy: grp.DeprecatedPartialResponseStrategy, - PartialResponseStrategy: grp.PartialResponseStrategy, +// NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules +// which uses gRPC Unary Rules API. +func NewRulesHandler(client rules.UnaryClient) func(*http.Request) (interface{}, []error, *ApiError) { + return func(request *http.Request) (interface{}, []error, *ApiError) { + typeParam := request.URL.Query().Get("type") + typ, ok := rulespb.RulesRequest_Type_value[strings.ToUpper(typeParam)] + if !ok { + if typeParam != "" { + return nil, nil, &ApiError{errorBadData, errors.Errorf("invalid rules parameter type='%v'", typeParam)} + } + typ = int32(rulespb.RulesRequest_ALL) } - apiRuleGroup.Rules = make([]*rulespb.Rule, 0, len(grp.Rules)) - - for _, r := range grp.Rules { - switch { - case r.GetAlert() != nil: - if !returnAlerts { - break - } - apiRuleGroup.Rules = append(apiRuleGroup.Rules, r) - case r.GetRecording() != nil: - if !returnRecording { - break - } - apiRuleGroup.Rules = append(apiRuleGroup.Rules, r) - default: - return nil, nil, &ApiError{ErrorInternal, fmt.Errorf("rule %v: unsupported", r)} - } + req := &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_Type(typ), + PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + } + groups, warnings, err := client.Rules(request.Context(), req) + if err != nil { + return nil, nil, &ApiError{ErrorInternal, errors.Errorf("error retrieving rules: %v", err)} } - res.Groups = append(res.Groups, apiRuleGroup) + return groups, warnings, nil } - - return res, warnings, nil } func (api *API) stores(r *http.Request) (interface{}, []error, *ApiError) { diff --git a/pkg/rules/api/v1.go b/pkg/rules/api/v1.go index d44bf480f62..754fd04c059 100644 --- a/pkg/rules/api/v1.go +++ b/pkg/rules/api/v1.go @@ -5,42 +5,49 @@ package v1 import ( "net/http" - "strconv" "time" "github.com/NYTimes/gziphandler" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/go-kit/kit/log" - opentracing "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/route" - "github.com/prometheus/prometheus/rules" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" qapi "github.com/thanos-io/thanos/pkg/query/api" - "github.com/thanos-io/thanos/pkg/rules/manager" - "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/rules" + "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/tracing" ) +// API is a very simple API used by Thanos Ruler. type API struct { - logger log.Logger - now func() time.Time - ruleRetriever RulesRetriever - reg prometheus.Registerer + logger log.Logger + now func() time.Time + ruleGroups rules.UnaryClient + alerts alertsRetriever + reg prometheus.Registerer +} + +type alertsRetriever interface { + Active() []*rulespb.AlertInstance } +// NewAPI creates an Thanos ruler API. func NewAPI( logger log.Logger, reg prometheus.Registerer, - ruleRetriever RulesRetriever, + ruleGroups rules.UnaryClient, + activeAlerts alertsRetriever, ) *API { return &API{ - logger: logger, - now: time.Now, - ruleRetriever: ruleRetriever, - reg: reg, + logger: logger, + now: time.Now, + ruleGroups: ruleGroups, + alerts: activeAlerts, + reg: reg, } } @@ -59,92 +66,8 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. return ins.NewHandler(name, tracing.HTTPMiddleware(tracer, name, logger, gziphandler.GzipHandler(hf))) } - r.Get("/alerts", instr("alerts", api.alerts)) - r.Get("/rules", instr("rules", api.rules)) -} - -type RulesRetriever interface { - RuleGroups() []manager.Group - AlertingRules() []manager.AlertingRule -} - -func (api *API) rules(*http.Request) (interface{}, []error, *qapi.ApiError) { - res := &rulespb.RuleGroups{} - for _, grp := range api.ruleRetriever.RuleGroups() { - apiRuleGroup := &rulespb.RuleGroup{ - Name: grp.Name(), - File: grp.OriginalFile(), - Interval: grp.Interval().Seconds(), - PartialResponseStrategy: grp.PartialResponseStrategy, - } - - for _, r := range grp.Rules() { - lastError := "" - if r.LastError() != nil { - lastError = r.LastError().Error() - } - - switch rule := r.(type) { - case *rules.AlertingRule: - apiRuleGroup.Rules = append(apiRuleGroup.Rules, &rulespb.Rule{ - Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{ - State: rulespb.AlertState(rule.State()), - Name: rule.Name(), - Query: rule.Query().String(), - DurationSeconds: rule.Duration().Seconds(), - Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())}, - Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Annotations())}, - Alerts: rulesAlertsToAPIAlerts(grp.PartialResponseStrategy, rule.ActiveAlerts()), - Health: string(rule.Health()), - LastError: lastError, - EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(), - LastEvaluation: rule.GetEvaluationTimestamp(), - }}}) - case *rules.RecordingRule: - apiRuleGroup.Rules = append(apiRuleGroup.Rules, &rulespb.Rule{ - Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{ - Name: rule.Name(), - Query: rule.Query().String(), - Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())}, - Health: string(rule.Health()), - LastError: lastError, - EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(), - LastEvaluation: rule.GetEvaluationTimestamp(), - }}}) - default: - err := errors.Errorf("rule %q: unsupported type %T", r.Name(), rule) - return nil, nil, &qapi.ApiError{Typ: qapi.ErrorInternal, Err: err} - } - } - res.Groups = append(res.Groups, apiRuleGroup) - } - - return res, nil, nil -} - -func (api *API) alerts(*http.Request) (interface{}, []error, *qapi.ApiError) { - var alerts []*rulespb.AlertInstance - for _, alertingRule := range api.ruleRetriever.AlertingRules() { - alerts = append( - alerts, - rulesAlertsToAPIAlerts(alertingRule.PartialResponseStrategy, alertingRule.ActiveAlerts())..., - ) - } - return struct{ Alerts []*rulespb.AlertInstance }{Alerts: alerts}, nil, nil -} - -func rulesAlertsToAPIAlerts(s storepb.PartialResponseStrategy, rulesAlerts []*rules.Alert) []*rulespb.AlertInstance { - apiAlerts := make([]*rulespb.AlertInstance, len(rulesAlerts)) - for i, ruleAlert := range rulesAlerts { - apiAlerts[i] = &rulespb.AlertInstance{ - PartialResponseStrategy: s, - Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Labels)}, - Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Annotations)}, - State: rulespb.AlertState(ruleAlert.State), - ActiveAt: &ruleAlert.ActiveAt, - Value: strconv.FormatFloat(ruleAlert.Value, 'e', -1, 64), - } - } - - return apiAlerts + r.Get("/alerts", instr("alerts", func(r *http.Request) (interface{}, []error, *qapi.ApiError) { + return struct{ Alerts []*rulespb.AlertInstance }{Alerts: api.alerts.Active()}, nil, nil + })) + r.Get("/rules", instr("rules", qapi.NewRulesHandler(api.ruleGroups))) } diff --git a/pkg/rules/manager.go b/pkg/rules/manager.go new file mode 100644 index 00000000000..5a5ea45ad7b --- /dev/null +++ b/pkg/rules/manager.go @@ -0,0 +1,357 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package rules + +import ( + "context" + "crypto/sha256" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/pkg/rulefmt" + "github.com/prometheus/prometheus/rules" + tsdberrors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "gopkg.in/yaml.v2" +) + +const tmpRuleDir = ".tmp-rules" + +type Group struct { + *rules.Group + originalFile string + PartialResponseStrategy storepb.PartialResponseStrategy +} + +func (g Group) toProto() *rulespb.RuleGroup { + ret := &rulespb.RuleGroup{ + Name: g.Name(), + File: g.originalFile, + Interval: g.Interval().Seconds(), + PartialResponseStrategy: g.PartialResponseStrategy, + } + + for _, r := range g.Rules() { + lastError := "" + if r.LastError() != nil { + lastError = r.LastError().Error() + } + + switch rule := r.(type) { + case *rules.AlertingRule: + ret.Rules = append(ret.Rules, &rulespb.Rule{ + Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{ + State: rulespb.AlertState(rule.State()), + Name: rule.Name(), + Query: rule.Query().String(), + DurationSeconds: rule.Duration().Seconds(), + Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())}, + Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Annotations())}, + Alerts: ActiveAlertsToProto(g.PartialResponseStrategy, rule), + Health: string(rule.Health()), + LastError: lastError, + EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(), + LastEvaluation: rule.GetEvaluationTimestamp(), + }}}) + case *rules.RecordingRule: + ret.Rules = append(ret.Rules, &rulespb.Rule{ + Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{ + Name: rule.Name(), + Query: rule.Query().String(), + Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(rule.Labels())}, + Health: string(rule.Health()), + LastError: lastError, + EvaluationDurationSeconds: rule.GetEvaluationDuration().Seconds(), + LastEvaluation: rule.GetEvaluationTimestamp(), + }}}) + default: + // We cannot do much, let's panic, API will recover. + panic(fmt.Sprintf("rule %q: unsupported type %T", r.Name(), rule)) + } + } + return ret +} + +func ActiveAlertsToProto(s storepb.PartialResponseStrategy, a *rules.AlertingRule) []*rulespb.AlertInstance { + active := a.ActiveAlerts() + ret := make([]*rulespb.AlertInstance, len(active)) + for i, ruleAlert := range active { + ret[i] = &rulespb.AlertInstance{ + PartialResponseStrategy: s, + Labels: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Labels)}, + Annotations: &rulespb.PromLabels{Labels: storepb.PromLabelsToLabels(ruleAlert.Annotations)}, + State: rulespb.AlertState(ruleAlert.State), + ActiveAt: &ruleAlert.ActiveAt, + Value: strconv.FormatFloat(ruleAlert.Value, 'e', -1, 64), + } + } + return ret +} + +// configRuleGroups is what is parsed from config. +type configRuleGroups struct { + Groups []configRuleGroup `yaml:"groups"` +} + +type configRuleGroup struct { + rulefmt.RuleGroup + PartialResponseStrategy *storepb.PartialResponseStrategy +} + +// Manager is a partial response strategy and proto compatible Manager. +// Manager also implements rulespb.Rules gRPC service. +type Manager struct { + workDir string + mgrs map[storepb.PartialResponseStrategy]*rules.Manager + extLset labels.Labels + + mtx sync.RWMutex + ruleFiles map[string]string +} + +// NewManager creates new Manager. +// QueryFunc from baseOpts will be rewritten. +func NewManager( + ctx context.Context, + reg prometheus.Registerer, + dataDir string, + baseOpts rules.ManagerOptions, + queryFuncCreator func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc, + extLset labels.Labels, +) *Manager { + m := &Manager{ + workDir: filepath.Join(dataDir, tmpRuleDir), + mgrs: make(map[storepb.PartialResponseStrategy]*rules.Manager), + extLset: extLset, + ruleFiles: make(map[string]string), + } + for _, strategy := range storepb.PartialResponseStrategy_value { + s := storepb.PartialResponseStrategy(strategy) + + opts := baseOpts + opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg) + opts.Context = ctx + opts.QueryFunc = queryFuncCreator(s) + + m.mgrs[s] = rules.NewManager(&opts) + } + + return m +} + +func (m *Manager) Run() { + for _, mgr := range m.mgrs { + mgr.Run() + } +} + +func (m *Manager) Stop() { + for _, mgr := range m.mgrs { + mgr.Stop() + } +} + +func (m *Manager) protoRuleGroups() []*rulespb.RuleGroup { + rg := m.RuleGroups() + res := make([]*rulespb.RuleGroup, 0, len(rg)) + for _, g := range rg { + res = append(res, g.toProto()) + } + return res +} + +func (m *Manager) RuleGroups() []Group { + m.mtx.RLock() + defer m.mtx.RUnlock() + var res []Group + for s, r := range m.mgrs { + for _, group := range r.RuleGroups() { + res = append(res, Group{ + Group: group, + originalFile: m.ruleFiles[group.File()], + PartialResponseStrategy: s, + }) + } + } + return res +} + +func (m *Manager) Active() []*rulespb.AlertInstance { + var res []*rulespb.AlertInstance + for s, r := range m.mgrs { + for _, r := range r.AlertingRules() { + res = append(res, ActiveAlertsToProto(s, r)...) + } + } + return res +} + +func (r *configRuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error { + rs := struct { + Strategy string `yaml:"partial_response_strategy"` + }{} + + if err := unmarshal(&rs); err != nil { + return err + } + + r.PartialResponseStrategy = new(storepb.PartialResponseStrategy) + + // Same as YAMl. Quote as JSON unmarshal expects raw JSON field. + if err := r.PartialResponseStrategy.UnmarshalJSON([]byte("\"" + rs.Strategy + "\"")); err != nil { + return err + } + + rg := rulefmt.RuleGroup{} + if err := unmarshal(&rg); err != nil { + return errors.Wrap(err, "failed to unmarshal rulefmt.configRuleGroup") + } + r.RuleGroup = rg + return nil +} + +func (r configRuleGroup) MarshalYAML() (interface{}, error) { + var ps *string + if r.PartialResponseStrategy != nil { + str := r.PartialResponseStrategy.String() + ps = &str + } + + rs := struct { + RuleGroup rulefmt.RuleGroup `yaml:",inline"` + PartialResponseStrategy *string `yaml:"partial_response_strategy,omitempty"` + }{ + RuleGroup: r.RuleGroup, + PartialResponseStrategy: ps, + } + return rs, nil +} + +// Update updates rules from given files to all managers we hold. We decide which groups should go where, based on +// special field in configRuleGroup file. +func (m *Manager) Update(evalInterval time.Duration, files []string) error { + var ( + errs tsdberrors.MultiError + filesByStrategy = map[storepb.PartialResponseStrategy][]string{} + ruleFiles = map[string]string{} + ) + + if err := os.RemoveAll(m.workDir); err != nil { + return errors.Wrapf(err, "failed to remove %s", m.workDir) + } + if err := os.MkdirAll(m.workDir, os.ModePerm); err != nil { + return errors.Wrapf(err, "failed to create %s", m.workDir) + } + + for _, fn := range files { + b, err := ioutil.ReadFile(fn) + if err != nil { + errs = append(errs, err) + continue + } + + var rg configRuleGroups + if err := yaml.Unmarshal(b, &rg); err != nil { + errs = append(errs, errors.Wrap(err, fn)) + continue + } + + // NOTE: This is very ugly, but we need to reparse it into tmp dir without the field to have to reuse + // rules.Manager. The problem is that it uses yaml.UnmarshalStrict for some reasons. + groupsByStrategy := map[storepb.PartialResponseStrategy]*rulefmt.RuleGroups{} + for _, rg := range rg.Groups { + if _, ok := groupsByStrategy[*rg.PartialResponseStrategy]; !ok { + groupsByStrategy[*rg.PartialResponseStrategy] = &rulefmt.RuleGroups{} + } + + groupsByStrategy[*rg.PartialResponseStrategy].Groups = append( + groupsByStrategy[*rg.PartialResponseStrategy].Groups, + rg.RuleGroup, + ) + } + + for s, rg := range groupsByStrategy { + b, err := yaml.Marshal(rg) + if err != nil { + errs = append(errs, errors.Wrapf(err, "%s: failed to marshal rule groups", fn)) + continue + } + + newFn := filepath.Join(m.workDir, fmt.Sprintf("%s.%x.%s", filepath.Base(fn), sha256.Sum256([]byte(fn)), s.String())) + if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil { + errs = append(errs, errors.Wrap(err, newFn)) + continue + } + + filesByStrategy[s] = append(filesByStrategy[s], newFn) + ruleFiles[newFn] = fn + } + } + + m.mtx.Lock() + for s, fs := range filesByStrategy { + mgr, ok := m.mgrs[s] + if !ok { + errs = append(errs, errors.Errorf("no manager found for %v", s)) + continue + } + // We add external labels in `pkg/alert.Queue`. + // TODO(bwplotka): Investigate if we should put ext labels here or not. + if err := mgr.Update(evalInterval, fs, nil); err != nil { + errs = append(errs, errors.Wrapf(err, "strategy %s", s)) + continue + } + } + m.ruleFiles = ruleFiles + m.mtx.Unlock() + + return errs.Err() +} + +// Rules returns specified rules from manager. This is used by gRPC and locally for HTTP and UI purposes. +func (m *Manager) Rules(r *rulespb.RulesRequest, s rulespb.Rules_RulesServer) error { + groups := m.protoRuleGroups() + + pgs := make([]*rulespb.RuleGroup, 0, len(groups)) + for _, g := range groups { + if r.Type == rulespb.RulesRequest_ALL { + pgs = append(pgs, g) + continue + } + + filtered := proto.Clone(g).(*rulespb.RuleGroup) + filtered.Rules = nil + for _, rule := range g.Rules { + if rule.GetAlert() != nil && r.Type == rulespb.RulesRequest_ALERT { + filtered.Rules = append(filtered.Rules, rule) + continue + } + if rule.GetRecording() != nil && r.Type == rulespb.RulesRequest_RECORD { + filtered.Rules = append(filtered.Rules, rule) + } + } + pgs = append(pgs, filtered) + } + + enrichRulesWithExtLabels(pgs, m.extLset) + + for _, pg := range pgs { + if err := s.Send(&rulespb.RulesResponse{Result: &rulespb.RulesResponse_Group{Group: pg}}); err != nil { + return err + } + } + return nil +} diff --git a/pkg/rules/manager/rule.go b/pkg/rules/manager/rule.go deleted file mode 100644 index 50c8decf001..00000000000 --- a/pkg/rules/manager/rule.go +++ /dev/null @@ -1,238 +0,0 @@ -// Copyright (c) The Thanos Authors. -// Licensed under the Apache License 2.0. - -package manager - -import ( - "crypto/sha256" - "fmt" - "io/ioutil" - "os" - "path/filepath" - "strings" - "sync" - "time" - - "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/rulefmt" - "github.com/prometheus/prometheus/rules" - tsdberrors "github.com/prometheus/prometheus/tsdb/errors" - "github.com/thanos-io/thanos/pkg/store/storepb" - "gopkg.in/yaml.v2" -) - -const tmpRuleDir = ".tmp-rules" - -type Group struct { - *rules.Group - originalFile string - PartialResponseStrategy storepb.PartialResponseStrategy -} - -func (g Group) OriginalFile() string { - return g.originalFile -} - -type AlertingRule struct { - *rules.AlertingRule - PartialResponseStrategy storepb.PartialResponseStrategy -} - -type RuleGroups struct { - Groups []RuleGroup `yaml:"groups"` -} - -type RuleGroup struct { - rulefmt.RuleGroup - PartialResponseStrategy *storepb.PartialResponseStrategy -} - -type Manager struct { - workDir string - mgrs map[storepb.PartialResponseStrategy]*rules.Manager - - mtx sync.RWMutex - ruleFiles map[string]string -} - -func NewManager(dataDir string) *Manager { - return &Manager{ - workDir: filepath.Join(dataDir, tmpRuleDir), - mgrs: make(map[storepb.PartialResponseStrategy]*rules.Manager), - ruleFiles: make(map[string]string), - } -} - -func (m *Manager) SetRuleManager(s storepb.PartialResponseStrategy, mgr *rules.Manager) { - m.mgrs[s] = mgr -} - -func (m *Manager) RuleGroups() []Group { - m.mtx.RLock() - defer m.mtx.RUnlock() - var groups []Group - for s, r := range m.mgrs { - for _, group := range r.RuleGroups() { - groups = append(groups, Group{ - Group: group, - PartialResponseStrategy: s, - originalFile: m.ruleFiles[group.File()], - }) - } - } - return groups -} - -func (m *Manager) AlertingRules() []AlertingRule { - var res []AlertingRule - for s, r := range m.mgrs { - for _, r := range r.AlertingRules() { - res = append(res, AlertingRule{AlertingRule: r, PartialResponseStrategy: s}) - } - } - return res -} - -func (r *RuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error { - rs := struct { - String string `yaml:"partial_response_strategy"` - }{} - - errMsg := fmt.Sprintf("failed to unmarshal 'partial_response_strategy'. Possible values are %s", strings.Join(storepb.PartialResponseStrategyValues, ",")) - if err := unmarshal(&rs); err != nil { - return errors.Wrap(err, errMsg) - } - - rg := rulefmt.RuleGroup{} - if err := unmarshal(&rg); err != nil { - return errors.Wrap(err, "failed to unmarshal rulefmt.RuleGroup") - } - - p, ok := storepb.PartialResponseStrategy_value[strings.ToUpper(rs.String)] - if !ok { - if rs.String != "" { - return errors.Errorf("%s. Got: %s", errMsg, rs.String) - } - - // NOTE: For Rule default is abort as this is recommended for alerting. - p = storepb.PartialResponseStrategy_value[storepb.PartialResponseStrategy_ABORT.String()] - } - - ps := storepb.PartialResponseStrategy(p) - r.RuleGroup = rg - r.PartialResponseStrategy = &ps - return nil -} - -func (r RuleGroup) MarshalYAML() (interface{}, error) { - var ps *string - if r.PartialResponseStrategy != nil { - str := r.PartialResponseStrategy.String() - ps = &str - } - - rs := struct { - RuleGroup rulefmt.RuleGroup `yaml:",inline"` - PartialResponseStrategy *string `yaml:"partial_response_strategy,omitempty"` - }{ - RuleGroup: r.RuleGroup, - PartialResponseStrategy: ps, - } - return rs, nil -} - -// Update updates rules from given files to all managers we hold. We decide which groups should go where, based on -// special field in RuleGroup file. -func (m *Manager) Update(evalInterval time.Duration, files []string) error { - var ( - errs tsdberrors.MultiError - filesByStrategy = map[storepb.PartialResponseStrategy][]string{} - ruleFiles = map[string]string{} - ) - - if err := os.RemoveAll(m.workDir); err != nil { - return errors.Wrapf(err, "failed to remove %s", m.workDir) - } - if err := os.MkdirAll(m.workDir, os.ModePerm); err != nil { - return errors.Wrapf(err, "failed to create %s", m.workDir) - } - - for _, fn := range files { - b, err := ioutil.ReadFile(fn) - if err != nil { - errs = append(errs, err) - continue - } - - var rg RuleGroups - if err := yaml.Unmarshal(b, &rg); err != nil { - errs = append(errs, errors.Wrap(err, fn)) - continue - } - - // NOTE: This is very ugly, but we need to reparse it into tmp dir without the field to have to reuse - // rules.Manager. The problem is that it uses yaml.UnmarshalStrict for some reasons. - groupsByStrategy := map[storepb.PartialResponseStrategy]*rulefmt.RuleGroups{} - for _, rg := range rg.Groups { - if _, ok := groupsByStrategy[*rg.PartialResponseStrategy]; !ok { - groupsByStrategy[*rg.PartialResponseStrategy] = &rulefmt.RuleGroups{} - } - - groupsByStrategy[*rg.PartialResponseStrategy].Groups = append( - groupsByStrategy[*rg.PartialResponseStrategy].Groups, - rg.RuleGroup, - ) - } - - for s, rg := range groupsByStrategy { - b, err := yaml.Marshal(rg) - if err != nil { - errs = append(errs, errors.Wrapf(err, "%s: failed to marshal rule groups", fn)) - continue - } - - newFn := filepath.Join(m.workDir, fmt.Sprintf("%s.%x.%s", filepath.Base(fn), sha256.Sum256([]byte(fn)), s.String())) - if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil { - errs = append(errs, errors.Wrap(err, newFn)) - continue - } - - filesByStrategy[s] = append(filesByStrategy[s], newFn) - ruleFiles[newFn] = fn - } - } - - m.mtx.Lock() - for s, fs := range filesByStrategy { - mgr, ok := m.mgrs[s] - if !ok { - errs = append(errs, errors.Errorf("no manager found for %v", s)) - continue - } - // We add external labels in `pkg/alert.Queue`. - // TODO(bwplotka): Investigate if we should put ext labels here or not. - if err := mgr.Update(evalInterval, fs, nil); err != nil { - errs = append(errs, errors.Wrapf(err, "strategy %s", s)) - continue - } - } - - // Removes the rules from a manager when a strategy has no more rule. - for s, mgr := range m.mgrs { - if _, ok := filesByStrategy[s]; ok { - continue - } - - if len(mgr.RuleGroups()) == 0 { - continue - } - - if err := mgr.Update(evalInterval, []string{}, nil); err != nil { - errs = append(errs, err) - } - } - m.ruleFiles = ruleFiles - m.mtx.Unlock() - - return errs.Err() -} diff --git a/pkg/rules/manager/rule_test.go b/pkg/rules/manager_test.go similarity index 71% rename from pkg/rules/manager/rule_test.go rename to pkg/rules/manager_test.go index b9018e3a170..13f1c407a12 100644 --- a/pkg/rules/manager/rule_test.go +++ b/pkg/rules/manager_test.go @@ -1,7 +1,7 @@ // Copyright (c) The Thanos Authors. // Licensed under the Apache License 2.0. -package manager +package rules import ( "context" @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/rulefmt" @@ -53,31 +54,31 @@ groups: queryOnce sync.Once query string ) - opts := rules.ManagerOptions{ - Logger: log.NewLogfmtLogger(os.Stderr), - Context: context.Background(), - QueryFunc: func(ctx context.Context, q string, t time.Time) (vectors promql.Vector, e error) { - queryOnce.Do(func() { - query = q - close(queryDone) - }) - return promql.Vector{}, nil + thanosRuleMgr := NewManager( + context.Background(), + nil, + dir, + rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + Context: context.Background(), + Appendable: nopAppendable{}, }, - Appendable: nopAppendable{}, - } - thanosRuleMgr := NewManager(dir) - ruleMgrAbort := rules.NewManager(&opts) - ruleMgrWarn := rules.NewManager(&opts) - thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgrAbort) - thanosRuleMgr.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgrWarn) - - ruleMgrAbort.Run() - ruleMgrWarn.Run() - defer ruleMgrAbort.Stop() - defer ruleMgrWarn.Stop() - + func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + return func(ctx context.Context, q string, t time.Time) (vectors promql.Vector, e error) { + queryOnce.Do(func() { + query = q + close(queryDone) + }) + return promql.Vector{}, nil + } + }, + labels.FromStrings("replica", "1"), + ) testutil.Ok(t, thanosRuleMgr.Update(10*time.Second, []string{filepath.Join(dir, "rule.yaml")})) + thanosRuleMgr.Run() + defer thanosRuleMgr.Stop() + select { case <-time.After(2 * time.Minute): t.Fatal("timeout while waiting on rule manager query evaluation") @@ -87,7 +88,7 @@ groups: testutil.Equals(t, "rate(some_metric[1h:5m] offset 1d)", query) } -func TestUpdate(t *testing.T) { +func TestUpdate_Error_UpdatePartial(t *testing.T) { dir, err := ioutil.TempDir("", "test_rule_rule_groups") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() @@ -151,14 +152,21 @@ groups: expr: "up" `), os.ModePerm)) - opts := rules.ManagerOptions{ - Logger: log.NewLogfmtLogger(os.Stderr), - } - m := NewManager(dir) - m.SetRuleManager(storepb.PartialResponseStrategy_ABORT, rules.NewManager(&opts)) - m.SetRuleManager(storepb.PartialResponseStrategy_WARN, rules.NewManager(&opts)) - - err = m.Update(10*time.Second, []string{ + thanosRuleMgr := NewManager( + context.Background(), + nil, + dir, + rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + }, + func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + return nil, nil + } + }, + labels.FromStrings("replica", "1"), + ) + err = thanosRuleMgr.Update(10*time.Second, []string{ filepath.Join(dir, "no_strategy.yaml"), filepath.Join(dir, "abort.yaml"), filepath.Join(dir, "warn.yaml"), @@ -169,10 +177,10 @@ groups: }) testutil.NotOk(t, err) - testutil.Assert(t, strings.Contains(err.Error(), "wrong.yaml: failed to unmarshal 'partial_response_strategy'"), err.Error()) + testutil.Assert(t, strings.Contains(err.Error(), "wrong.yaml: failed to unmarshal \"afafsdgsdgs\" as 'partial_response_strategy'"), err.Error()) testutil.Assert(t, strings.Contains(err.Error(), "non_existing.yaml: no such file or directory"), err.Error()) - g := m.RuleGroups() + g := thanosRuleMgr.RuleGroups() sort.Slice(g, func(i, j int) bool { return g[i].Name() < g[j].Name() }) @@ -219,51 +227,18 @@ groups: }, } testutil.Equals(t, len(exp), len(g)) + for i := range exp { t.Run(exp[i].name, func(t *testing.T) { testutil.Equals(t, exp[i].strategy, g[i].PartialResponseStrategy) testutil.Equals(t, exp[i].name, g[i].Name()) - testutil.Equals(t, exp[i].file, g[i].OriginalFile()) - }) - } -} - -func TestUpdateAfterClear(t *testing.T) { - dir, err := ioutil.TempDir("", "test_rule_rule_groups") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - testutil.Ok(t, ioutil.WriteFile(filepath.Join(dir, "no_strategy.yaml"), []byte(` -groups: -- name: "something1" - rules: - - alert: "some" - expr: "up" -`), os.ModePerm)) - - opts := rules.ManagerOptions{ - Logger: log.NewLogfmtLogger(os.Stderr), + p := g[i].toProto() + testutil.Equals(t, exp[i].strategy, p.PartialResponseStrategy) + testutil.Equals(t, exp[i].name, p.Name) + testutil.Equals(t, exp[i].file, p.File) + }) } - m := NewManager(dir) - ruleMgrAbort := rules.NewManager(&opts) - ruleMgrWarn := rules.NewManager(&opts) - m.SetRuleManager(storepb.PartialResponseStrategy_ABORT, ruleMgrAbort) - m.SetRuleManager(storepb.PartialResponseStrategy_WARN, ruleMgrWarn) - - ruleMgrAbort.Run() - ruleMgrWarn.Run() - defer ruleMgrAbort.Stop() - defer ruleMgrWarn.Stop() - - err = m.Update(1*time.Second, []string{ - filepath.Join(dir, "no_strategy.yaml"), - }) - testutil.Ok(t, err) - testutil.Equals(t, 1, len(m.RuleGroups())) - - err = m.Update(1*time.Second, []string{}) - testutil.Ok(t, err) - testutil.Equals(t, 0, len(m.RuleGroups())) } func TestRuleGroupMarshalYAML(t *testing.T) { @@ -280,8 +255,8 @@ func TestRuleGroupMarshalYAML(t *testing.T) { ` a := storepb.PartialResponseStrategy_ABORT - var input = RuleGroups{ - Groups: []RuleGroup{ + var input = configRuleGroups{ + Groups: []configRuleGroup{ { RuleGroup: rulefmt.RuleGroup{ Name: "something1", @@ -313,3 +288,40 @@ func TestRuleGroupMarshalYAML(t *testing.T) { testutil.Equals(t, expected, string(b)) } + +func TestManager_Rules(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + dir, err := ioutil.TempDir("", "test_rule_run") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + curr, err := os.Getwd() + testutil.Ok(t, err) + + thanosRuleMgr := NewManager( + context.Background(), + nil, + dir, + rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + }, + func(partialResponseStrategy storepb.PartialResponseStrategy) rules.QueryFunc { + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + return nil, nil + } + }, + labels.FromStrings("replica", "test1"), + ) + testutil.Ok(t, thanosRuleMgr.Update(60*time.Second, []string{ + filepath.Join(curr, "../../examples/alerts/alerts.yaml"), + filepath.Join(curr, "../../examples/alerts/rules.yaml"), + })) + defer func() { + // Update creates go routines. We don't need rules mngrs to run, just to parse things, but let it start and stop + // at the end to correctly test leaked go routines. + thanosRuleMgr.Run() + thanosRuleMgr.Stop() + }() + testRulesAgainstExamples(t, filepath.Join(curr, "../../examples/alerts"), thanosRuleMgr) +} diff --git a/pkg/rules/prometheus.go b/pkg/rules/prometheus.go index 34a775a5744..6089d547258 100644 --- a/pkg/rules/prometheus.go +++ b/pkg/rules/prometheus.go @@ -5,22 +5,28 @@ package rules import ( "net/url" + "strings" + "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/storepb" ) -// Prometheus implements rulespb.Rules. +// Prometheus implements rulespb.Rules gRPC that allows to fetch rules from Prometheus HTTP api/v1/rules endpoint. type Prometheus struct { base *url.URL client *promclient.Client + + extLabels func() labels.Labels } // NewPrometheus creates new rules.Prometheus. -func NewPrometheus(base *url.URL, client *promclient.Client) *Prometheus { +func NewPrometheus(base *url.URL, client *promclient.Client, extLabels func() labels.Labels) *Prometheus { return &Prometheus{ - base: base, - client: client, + base: base, + client: client, + extLabels: extLabels, } } @@ -28,13 +34,16 @@ func NewPrometheus(base *url.URL, client *promclient.Client) *Prometheus { func (p *Prometheus) Rules(r *rulespb.RulesRequest, s rulespb.Rules_RulesServer) error { var typeRules string if r.Type != rulespb.RulesRequest_ALL { - typeRules = r.Type.String() + typeRules = strings.ToLower(r.Type.String()) } groups, err := p.client.RulesInGRPC(s.Context(), p.base, typeRules) if err != nil { return err } + // Prometheus does not add external labels, so we need to add on our own. + enrichRulesWithExtLabels(groups, p.extLabels()) + for _, g := range groups { if err := s.Send(&rulespb.RulesResponse{Result: &rulespb.RulesResponse_Group{Group: g}}); err != nil { return err @@ -42,3 +51,25 @@ func (p *Prometheus) Rules(r *rulespb.RulesRequest, s rulespb.Rules_RulesServer) } return nil } + +func enrichRulesWithExtLabels(groups []*rulespb.RuleGroup, extLset labels.Labels) { + for _, g := range groups { + for i, r := range g.Rules { + if a := r.GetAlert(); a != nil { + if a.Labels == nil { + a.Labels = &rulespb.PromLabels{} + } + a.Labels.Labels = storepb.ExtendLabels(a.Labels.Labels, extLset) + g.Rules[i] = rulespb.NewAlertingRule(a) + continue + } + if ru := r.GetRecording(); ru != nil { + if ru.Labels == nil { + ru.Labels = &rulespb.PromLabels{} + } + ru.Labels.Labels = storepb.ExtendLabels(ru.Labels.Labels, extLset) + g.Rules[i] = rulespb.NewRecordingRule(ru) + } + } + } +} diff --git a/pkg/rules/prometheus_test.go b/pkg/rules/prometheus_test.go index 125c3e07e25..49b7134d09c 100644 --- a/pkg/rules/prometheus_test.go +++ b/pkg/rules/prometheus_test.go @@ -4,175 +4,47 @@ package rules import ( - "context" "fmt" "net/url" "os" + "path/filepath" "testing" "time" "github.com/fortytw2/leaktest" + "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/promclient" - "github.com/thanos-io/thanos/pkg/rules/rulespb" - "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" ) -func TestPrometheusStore_Rules_e2e(t *testing.T) { - t.Helper() - +func TestPrometheus_Rules_e2e(t *testing.T) { defer leaktest.CheckTimeout(t, 10*time.Second)() p, err := e2eutil.NewPrometheus() testutil.Ok(t, err) defer func() { testutil.Ok(t, p.Stop()) }() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - curr, err := os.Getwd() testutil.Ok(t, err) + root := filepath.Join(curr, "../../") + testutil.Ok(t, p.SetConfig(fmt.Sprintf(` global: external_labels: region: eu-west rule_files: - - %s/../../examples/alerts/alerts.yaml - - %s/../../examples/alerts/rules.yaml -`, curr, curr))) + - %s/examples/alerts/alerts.yaml + - %s/examples/alerts/rules.yaml +`, root, root))) testutil.Ok(t, p.Start()) u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) testutil.Ok(t, err) - promRules := NewPrometheus(u, promclient.NewDefaultClient()) - - someAlert := &rulespb.Rule{Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{Name: "some"}}} - someRecording := &rulespb.Rule{Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{Name: "some"}}} - - for _, tcase := range []struct { - expected []*rulespb.RuleGroup - expectedErr error - }{ - { - expected: []*rulespb.RuleGroup{ - { - Name: "thanos-bucket-replicate.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-compact.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-component-absent.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-query.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-receive.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-rule.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-sidecar.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-store.rules", - File: fmt.Sprintf("%s/../../examples/alerts/alerts.yaml", curr), - Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-bucket-replicate.rules", - File: fmt.Sprintf("%s/../../examples/alerts/rules.yaml", curr), - Rules: []*rulespb.Rule{}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-query.rules", - File: fmt.Sprintf("%s/../../examples/alerts/rules.yaml", curr), - Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording, someRecording}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-receive.rules", File: fmt.Sprintf("%s/../../examples/alerts/rules.yaml", curr), - Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording, someRecording, someRecording}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - { - Name: "thanos-store.rules", - File: fmt.Sprintf("%s/../../examples/alerts/rules.yaml", curr), - Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording}, - Interval: 60, - DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_WARN, - }, - }, - // TODO(bwplotka): Potentially add more cases. - }, - } { - t.Run("", func(t *testing.T) { - srv := &rulesServer{ctx: ctx} - err = promRules.Rules(&rulespb.RulesRequest{}, srv) - if tcase.expectedErr != nil { - testutil.NotOk(t, err) - testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) - return - } - - // We don't want to be picky, just check what number and types of rules within group are. - got := srv.groups - for i, g := range got { - for j, r := range g.Rules { - if r.GetAlert() != nil { - got[i].Rules[j] = someAlert - continue - } - if r.GetRecording() != nil { - got[i].Rules[j] = someRecording - continue - } - t.Fatalf("Found rule in group %s that is neither recording not alert.", g.Name) - } - } - - testutil.Ok(t, err) - testutil.Equals(t, []error(nil), srv.warnings) - testutil.Equals(t, tcase.expected, srv.groups) - }) - } + promRules := NewPrometheus(u, promclient.NewDefaultClient(), func() labels.Labels { + return labels.FromStrings("replica", "test1") + }) + testRulesAgainstExamples(t, filepath.Join(root, "examples/alerts"), promRules) } diff --git a/pkg/rules/proxy.go b/pkg/rules/proxy.go index 72e4e8a9ba5..0dfebec8078 100644 --- a/pkg/rules/proxy.go +++ b/pkg/rules/proxy.go @@ -18,7 +18,7 @@ import ( "google.golang.org/grpc/status" ) -// Proxy implements rulespb.Rules that fanouts requests to given rulespb.Rules and deduplication on the way. +// Proxy implements rulespb.Rules gRPC that fanouts requests to given rulespb.Rules and deduplication on the way. type Proxy struct { logger log.Logger rules func() []rulespb.RulesClient diff --git a/pkg/rules/rules.go b/pkg/rules/rules.go index b8646fc17e4..ec4ed0c9eb1 100644 --- a/pkg/rules/rules.go +++ b/pkg/rules/rules.go @@ -9,27 +9,32 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/rules/rulespb" - "github.com/thanos-io/thanos/pkg/store/storepb" ) -// Retriever allows to retrieve rules from gRPC server implementation. +var _ UnaryClient = &GRPCClient{} + +// UnaryClient is gRPC rulespb.Rules client which expands streaming rules API. Useful for consumers that does not +// support streaming. +type UnaryClient interface { + Rules(ctx context.Context, req *rulespb.RulesRequest) ([]*rulespb.RuleGroup, storage.Warnings, error) +} + +// GRPCClient allows to retrieve rules from local gRPC streaming server implementation. // TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available. -type Retriever struct { +type GRPCClient struct { proxy rulespb.RulesServer } -func NewRetriever(rs rulespb.RulesServer) *Retriever { - return &Retriever{ +func NewGRPCClient(rs rulespb.RulesServer) *GRPCClient { + return &GRPCClient{ proxy: rs, } } -func (rr *Retriever) RuleGroups(ctx context.Context) ([]*rulespb.RuleGroup, storage.Warnings, error) { +func (rr *GRPCClient) Rules(ctx context.Context, req *rulespb.RulesRequest) ([]*rulespb.RuleGroup, storage.Warnings, error) { resp := &rulesServer{ctx: ctx} - if err := rr.proxy.Rules(&rulespb.RulesRequest{ - PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, - }, resp); err != nil { + if err := rr.proxy.Rules(req, resp); err != nil { return nil, nil, errors.Wrap(err, "proxy RuleGroups()") } diff --git a/pkg/rules/rules_test.go b/pkg/rules/rules_test.go new file mode 100644 index 00000000000..04284dd76a0 --- /dev/null +++ b/pkg/rules/rules_test.go @@ -0,0 +1,188 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package rules + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/thanos-io/thanos/pkg/testutil" +) + +// testRulesAgainstExamples tests against alerts.yaml and rules.yaml examples. +func testRulesAgainstExamples(t *testing.T, dir string, server rulespb.RulesServer) { + t.Helper() + + // We don't test internals, just if groups are expected. + // TODO(bwplotka): Test internals as well, especially labels! + someAlert := &rulespb.Rule{Result: &rulespb.Rule_Alert{Alert: &rulespb.Alert{Name: "some"}}} + someRecording := &rulespb.Rule{Result: &rulespb.Rule_Recording{Recording: &rulespb.RecordingRule{Name: "some"}}} + + expected := []*rulespb.RuleGroup{ + { + Name: "thanos-bucket-replicate.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-compact.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-component-absent.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-query.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-receive.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-rule.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-sidecar.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-store.rules", + File: filepath.Join(dir, "alerts.yaml"), + Rules: []*rulespb.Rule{someAlert, someAlert, someAlert, someAlert}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-bucket-replicate.rules", + File: filepath.Join(dir, "rules.yaml"), + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-query.rules", + File: filepath.Join(dir, "rules.yaml"), + Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording, someRecording}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-receive.rules", + File: filepath.Join(dir, "rules.yaml"), + Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording, someRecording, someRecording}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + { + Name: "thanos-store.rules", + File: filepath.Join(dir, "rules.yaml"), + Rules: []*rulespb.Rule{someRecording, someRecording, someRecording, someRecording}, + Interval: 60, + DeprecatedPartialResponseStrategy: storepb.PartialResponseStrategy_WARN, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT, + }, + } + + for _, tcase := range []struct { + requestedType rulespb.RulesRequest_Type + expectedErr error + }{ + { + requestedType: rulespb.RulesRequest_ALL, + }, + { + requestedType: rulespb.RulesRequest_ALERT, + }, + { + requestedType: rulespb.RulesRequest_RECORD, + }, + } { + t.Run(tcase.requestedType.String(), func(t *testing.T) { + got, w, err := NewGRPCClient(server).Rules(context.Background(), &rulespb.RulesRequest{ + Type: tcase.requestedType, + }) + testutil.Equals(t, storage.Warnings(nil), w) + if tcase.expectedErr != nil { + testutil.NotOk(t, err) + testutil.Equals(t, tcase.expectedErr.Error(), err.Error()) + return + } + testutil.Ok(t, err) + + expectedForType := expected + if tcase.requestedType != rulespb.RulesRequest_ALL { + expectedForType = make([]*rulespb.RuleGroup, len(expected)) + for i, g := range expected { + expectedForType[i] = proto.Clone(g).(*rulespb.RuleGroup) + expectedForType[i].Rules = nil + + for _, r := range g.Rules { + switch tcase.requestedType { + case rulespb.RulesRequest_ALERT: + if r.GetAlert() != nil { + expectedForType[i].Rules = append(expectedForType[i].Rules, someAlert) + } + case rulespb.RulesRequest_RECORD: + if r.GetRecording() != nil { + expectedForType[i].Rules = append(expectedForType[i].Rules, someRecording) + } + } + } + } + } + + // We don't want to be picky, just check what number and types of rules within group are. + for i := range got { + for j, r := range got[i].Rules { + if r.GetAlert() != nil { + got[i].Rules[j] = someAlert + continue + } + if r.GetRecording() != nil { + got[i].Rules[j] = someRecording + continue + } + t.Fatalf("Found rule in group %s that is neither recording not alert.", got[i].Name) + } + if len(got[i].Rules) == 0 { + // Fix, for test purposes. + got[i].Rules = nil + } + // Mask nondeterministic fields. + got[i].EvaluationDurationSeconds = 0 + got[i].LastEvaluation = time.Time{} + + testutil.Equals(t, expectedForType[i], got[i]) + } + testutil.Equals(t, expectedForType, got) + }) + } +} diff --git a/pkg/rules/rulespb/custom.go b/pkg/rules/rulespb/custom.go index a77bc411211..5ffaa21bc4b 100644 --- a/pkg/rules/rulespb/custom.go +++ b/pkg/rules/rulespb/custom.go @@ -41,6 +41,12 @@ func NewRecordingRule(r *RecordingRule) *Rule { } } +func NewAlertingRule(a *Alert) *Rule { + return &Rule{ + Result: &Rule_Alert{Alert: a}, + } +} + func (m *Rule) UnmarshalJSON(entry []byte) error { decider := struct { Type string `json:"type"` diff --git a/pkg/rules/rulespb/rpc.pb.go b/pkg/rules/rulespb/rpc.pb.go index 413bacf45cc..8b1e2201a29 100644 --- a/pkg/rules/rulespb/rpc.pb.go +++ b/pkg/rules/rulespb/rpc.pb.go @@ -76,21 +76,24 @@ func (AlertState) EnumDescriptor() ([]byte, []int) { type RulesRequest_Type int32 const ( - RulesRequest_ALL RulesRequest_Type = 0 - RulesRequest_ALERTING RulesRequest_Type = 1 - RulesRequest_RECORDING RulesRequest_Type = 2 + RulesRequest_ALL RulesRequest_Type = 0 + /// This will make sure strings.ToLower(.String()) will match 'alert' and 'record' values for + /// Prometheus HTTP API. + /// NOTE: The implementation has to return empty rule groups as well. + RulesRequest_ALERT RulesRequest_Type = 1 + RulesRequest_RECORD RulesRequest_Type = 2 ) var RulesRequest_Type_name = map[int32]string{ 0: "ALL", - 1: "ALERTING", - 2: "RECORDING", + 1: "ALERT", + 2: "RECORD", } var RulesRequest_Type_value = map[string]int32{ - "ALL": 0, - "ALERTING": 1, - "RECORDING": 2, + "ALL": 0, + "ALERT": 1, + "RECORD": 2, } func (x RulesRequest_Type) String() string { @@ -587,72 +590,72 @@ func init() { func init() { proto.RegisterFile("rules/rulespb/rpc.proto", fileDescriptor_91b1d28f30eb5efb) } var fileDescriptor_91b1d28f30eb5efb = []byte{ - // 1026 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x41, 0x4f, 0xe3, 0x46, - 0x14, 0xb6, 0x49, 0xec, 0xc4, 0x0f, 0x58, 0xe8, 0x74, 0x11, 0x26, 0xdb, 0xc6, 0x34, 0xd2, 0x4a, - 0xb4, 0xdd, 0x4d, 0x2a, 0xd0, 0x52, 0xed, 0xa9, 0x22, 0x90, 0x2e, 0x48, 0x88, 0xae, 0x06, 0xd4, - 0x43, 0x7b, 0x48, 0x87, 0x30, 0x1b, 0x22, 0x39, 0xb6, 0x77, 0x66, 0x42, 0x95, 0xff, 0xd0, 0xc3, - 0xde, 0xfa, 0x3b, 0xfa, 0x2f, 0x38, 0xee, 0xb1, 0x87, 0xca, 0x6d, 0xe1, 0xe6, 0x3f, 0xd1, 0x6a, - 0x66, 0xec, 0x38, 0xd0, 0xa4, 0xb0, 0x6d, 0xf6, 0xe2, 0x99, 0x79, 0xef, 0x7b, 0xf3, 0x66, 0xde, - 0xfb, 0xde, 0xf3, 0xc0, 0x2a, 0x1b, 0xf8, 0x94, 0x37, 0xd4, 0x37, 0x3a, 0x6d, 0xb0, 0xa8, 0x53, - 0x8f, 0x58, 0x28, 0x42, 0x64, 0x8b, 0x73, 0x12, 0x84, 0xbc, 0xb2, 0xc6, 0x45, 0xc8, 0x68, 0x43, - 0x7d, 0xa3, 0xd3, 0x86, 0x18, 0x46, 0x94, 0x6b, 0x48, 0xe5, 0x61, 0x37, 0xec, 0x86, 0x6a, 0xda, - 0x90, 0xb3, 0x54, 0xea, 0x75, 0xc3, 0xb0, 0xeb, 0xd3, 0x86, 0x5a, 0x9d, 0x0e, 0x5e, 0x35, 0x44, - 0xaf, 0x4f, 0xb9, 0x20, 0xfd, 0x48, 0x03, 0x6a, 0x97, 0x26, 0x2c, 0x60, 0xe9, 0x0f, 0xd3, 0xd7, - 0x03, 0xca, 0x05, 0x7a, 0x0a, 0x45, 0xb9, 0xad, 0x6b, 0xae, 0x9b, 0x1b, 0x0f, 0x36, 0xd7, 0xea, - 0xda, 0x73, 0x7d, 0x1c, 0x53, 0x3f, 0x19, 0x46, 0x14, 0x2b, 0x18, 0xfa, 0x1e, 0xd6, 0x22, 0xc2, - 0x44, 0x8f, 0xf8, 0x6d, 0x46, 0x79, 0x14, 0x06, 0x9c, 0xb6, 0xb9, 0x60, 0x44, 0xd0, 0xee, 0xd0, - 0x9d, 0x53, 0x7b, 0x78, 0xd9, 0x1e, 0x2f, 0x35, 0x10, 0xa7, 0xb8, 0xe3, 0x14, 0x86, 0x57, 0xa3, - 0xc9, 0x8a, 0xda, 0x13, 0x28, 0x4a, 0x57, 0xa8, 0x04, 0x85, 0x9d, 0xc3, 0xc3, 0x65, 0x03, 0x2d, - 0x40, 0x79, 0xe7, 0xb0, 0x85, 0x4f, 0x0e, 0x8e, 0x5e, 0x2c, 0x9b, 0x68, 0x11, 0x1c, 0xdc, 0xda, - 0xfd, 0x06, 0xef, 0xc9, 0xe5, 0x5c, 0xed, 0x07, 0x58, 0x4c, 0x4f, 0xa9, 0xb7, 0x41, 0x9f, 0x82, - 0xd5, 0x65, 0xe1, 0x20, 0x52, 0x77, 0x99, 0xdf, 0xfc, 0x60, 0xfc, 0x2e, 0x2f, 0xa4, 0x62, 0xdf, - 0xc0, 0x1a, 0x81, 0x2a, 0x50, 0xfa, 0x91, 0xb0, 0xa0, 0x17, 0x74, 0xd5, 0xa1, 0x9d, 0x7d, 0x03, - 0x67, 0x82, 0x66, 0x19, 0x6c, 0x46, 0xf9, 0xc0, 0x17, 0xb5, 0x5d, 0x80, 0x91, 0x2d, 0x47, 0xcf, - 0xc0, 0x56, 0xc6, 0xdc, 0x35, 0xd7, 0x0b, 0x13, 0xf7, 0x6f, 0x42, 0x12, 0x7b, 0x29, 0x08, 0xa7, - 0x63, 0xed, 0xb7, 0x22, 0x38, 0x23, 0x04, 0xfa, 0x08, 0x8a, 0x01, 0xe9, 0xeb, 0x70, 0x3b, 0xcd, - 0x72, 0x12, 0x7b, 0x6a, 0x8d, 0xd5, 0x57, 0x6a, 0x5f, 0xf5, 0x7c, 0xaa, 0xcf, 0xa4, 0xb5, 0x72, - 0x8d, 0xd5, 0x17, 0x3d, 0x05, 0x4b, 0x51, 0xc5, 0x2d, 0x28, 0xff, 0x0b, 0xe3, 0xfe, 0x9b, 0x4e, - 0x12, 0x7b, 0x5a, 0x8d, 0xf5, 0x80, 0x36, 0xa0, 0xdc, 0x0b, 0x04, 0x65, 0x17, 0xc4, 0x77, 0x8b, - 0xeb, 0xe6, 0x86, 0xd9, 0x5c, 0x48, 0x62, 0x6f, 0x24, 0xc3, 0xa3, 0x19, 0xc2, 0xf0, 0x88, 0x5e, - 0x10, 0x7f, 0x40, 0x44, 0x2f, 0x0c, 0xda, 0x67, 0x03, 0xa6, 0x27, 0x9c, 0x76, 0xc2, 0xe0, 0x8c, - 0xbb, 0x96, 0x32, 0x46, 0x49, 0xec, 0x3d, 0xc8, 0x61, 0x27, 0xbd, 0x3e, 0xc5, 0x6b, 0xf9, 0x7a, - 0x2f, 0xb5, 0x3a, 0xd6, 0x46, 0xa8, 0x0d, 0x4b, 0x3e, 0xe1, 0xa2, 0x9d, 0x23, 0x5c, 0x5b, 0xa5, - 0xa5, 0x52, 0xd7, 0x1c, 0xad, 0x67, 0x1c, 0xad, 0x9f, 0x64, 0x1c, 0x6d, 0x56, 0x2e, 0x63, 0xcf, - 0x90, 0x7e, 0xa4, 0x69, 0x6b, 0x64, 0xf9, 0xe6, 0x77, 0xcf, 0xc4, 0xb7, 0x64, 0xe8, 0x27, 0x13, - 0x3e, 0xd9, 0xa3, 0x11, 0xa3, 0x1d, 0x22, 0xe8, 0xd9, 0x14, 0xae, 0xb9, 0xa5, 0x7b, 0x51, 0xb2, - 0xf9, 0x71, 0x12, 0x7b, 0xd3, 0x89, 0x8d, 0xef, 0x76, 0x84, 0x2e, 0x60, 0x75, 0xda, 0x19, 0xca, - 0xf7, 0x3b, 0xc3, 0xa3, 0x24, 0xf6, 0xa6, 0x95, 0x06, 0x9e, 0xb6, 0x79, 0x2d, 0x80, 0xa2, 0xcc, - 0x3f, 0x7a, 0x06, 0x0e, 0xa3, 0x9d, 0x90, 0x9d, 0x49, 0x4e, 0xeb, 0x02, 0x58, 0x19, 0x11, 0x24, - 0x53, 0x48, 0xe4, 0xbe, 0x81, 0x73, 0x24, 0x7a, 0x0c, 0x16, 0xf1, 0x29, 0x13, 0x8a, 0x72, 0xf3, - 0x9b, 0x8b, 0x99, 0xc9, 0x8e, 0x14, 0xca, 0x7a, 0x51, 0xda, 0xb1, 0x9a, 0xf8, 0xa5, 0x00, 0x8b, - 0x4a, 0x79, 0x10, 0x70, 0x41, 0x82, 0x0e, 0x45, 0xdb, 0x60, 0xfb, 0xe4, 0x94, 0xfa, 0x3c, 0x75, - 0x8b, 0x46, 0x17, 0x65, 0x61, 0xff, 0x50, 0x69, 0x74, 0x61, 0x68, 0x14, 0x4e, 0x47, 0xd4, 0x82, - 0x79, 0x12, 0x04, 0xa1, 0x50, 0xe9, 0xe4, 0xe9, 0x01, 0x26, 0x19, 0x2f, 0x25, 0xb1, 0x37, 0x0e, - 0xc5, 0xe3, 0x0b, 0xb4, 0x05, 0x16, 0x17, 0x44, 0x50, 0xb7, 0xa0, 0xc2, 0x8c, 0x6e, 0xdc, 0xe0, - 0x58, 0x6a, 0x74, 0x6d, 0x28, 0x10, 0xd6, 0x03, 0x3a, 0x06, 0x87, 0x74, 0x44, 0xef, 0x82, 0xb6, - 0x89, 0x50, 0xc5, 0x71, 0x07, 0x2f, 0x93, 0xd8, 0x43, 0xda, 0x60, 0x47, 0x3c, 0x09, 0xfb, 0x3d, - 0x41, 0xfb, 0x91, 0x18, 0x2a, 0x5e, 0x96, 0x33, 0x39, 0xf2, 0xc0, 0x92, 0xf4, 0xa4, 0xaa, 0x60, - 0x1c, 0xed, 0x55, 0x09, 0xb0, 0x1e, 0xfe, 0x8d, 0x23, 0xf6, 0xfb, 0xe4, 0xc8, 0x5f, 0x45, 0xb0, - 0x54, 0x38, 0xf2, 0x60, 0x99, 0xef, 0x10, 0xac, 0xac, 0x67, 0xcd, 0x4d, 0xec, 0x59, 0x1e, 0x58, - 0xaf, 0x07, 0x94, 0x0d, 0x55, 0xfc, 0xd3, 0x5b, 0x2b, 0x01, 0xd6, 0x03, 0xfa, 0x12, 0x96, 0xff, - 0xd1, 0x52, 0xc6, 0xfa, 0x51, 0xa6, 0xc3, 0x4b, 0x67, 0xb7, 0x5a, 0x48, 0x4e, 0x2c, 0xeb, 0xff, - 0x10, 0xcb, 0xfe, 0x8f, 0xc4, 0x7a, 0x0e, 0xb6, 0x22, 0x3f, 0x77, 0x4b, 0xaa, 0xdf, 0xae, 0xdc, - 0x08, 0x56, 0x46, 0x7f, 0x7d, 0x02, 0x0d, 0xc4, 0xe9, 0x88, 0x6a, 0x60, 0x9f, 0x53, 0xe2, 0x8b, - 0x73, 0x55, 0xfb, 0x8e, 0xc6, 0x68, 0x09, 0x4e, 0x47, 0xb4, 0x0d, 0xa0, 0x1b, 0x24, 0x63, 0x21, - 0x73, 0x1d, 0x85, 0x5b, 0x4d, 0x62, 0xef, 0x43, 0xd5, 0xe7, 0xa4, 0x30, 0x27, 0x1a, 0x76, 0x46, - 0xc2, 0xbb, 0x9a, 0x35, 0xcc, 0xa8, 0x59, 0xcf, 0xcf, 0xb2, 0x59, 0xd7, 0x7e, 0x2e, 0xc0, 0xe2, - 0x8d, 0x2e, 0x74, 0xc7, 0x8f, 0x70, 0x44, 0xaa, 0xb9, 0x29, 0xa4, 0xca, 0xb9, 0x51, 0x78, 0x27, - 0x6e, 0xe4, 0x99, 0x29, 0xde, 0x33, 0x33, 0xd6, 0xac, 0x32, 0x63, 0xcf, 0x28, 0x33, 0xa5, 0x99, - 0x66, 0xe6, 0x39, 0x40, 0x1e, 0x32, 0xf4, 0xf9, 0x58, 0x2f, 0x2f, 0x8c, 0xff, 0x0f, 0x94, 0xbe, - 0x59, 0x94, 0x1b, 0x67, 0xb1, 0xfc, 0x6c, 0x0b, 0x20, 0xef, 0x1b, 0xf2, 0xad, 0x76, 0x70, 0xb4, - 0xb3, 0x7b, 0x72, 0xf0, 0x6d, 0x6b, 0xd9, 0x40, 0xf3, 0x50, 0x7a, 0xd9, 0x3a, 0xda, 0xd3, 0x0f, - 0x37, 0x00, 0xfb, 0xeb, 0x03, 0xac, 0x5e, 0x6d, 0x9b, 0x5f, 0x81, 0xa5, 0x5e, 0x6d, 0x68, 0x3b, - 0x9b, 0x3c, 0x9c, 0xf4, 0xe6, 0xac, 0xac, 0xdc, 0x92, 0xea, 0x96, 0xf6, 0x85, 0xd9, 0x7c, 0x7c, - 0xf9, 0x67, 0xd5, 0xb8, 0xbc, 0xaa, 0x9a, 0x6f, 0xaf, 0xaa, 0xe6, 0x1f, 0x57, 0x55, 0xf3, 0xcd, - 0x75, 0xd5, 0x78, 0x7b, 0x5d, 0x35, 0x7e, 0xbd, 0xae, 0x1a, 0xdf, 0x95, 0xd2, 0xc7, 0xf4, 0xa9, - 0xad, 0xe2, 0xb2, 0xf5, 0x77, 0x00, 0x00, 0x00, 0xff, 0xff, 0x93, 0x0e, 0xd2, 0xf3, 0x64, 0x0b, - 0x00, 0x00, + // 1025 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0x4f, 0x6f, 0xe3, 0x44, + 0x14, 0xb7, 0x9b, 0xd8, 0x89, 0x5f, 0xdb, 0x6d, 0x19, 0xb6, 0xaa, 0x9b, 0x85, 0xb8, 0x44, 0x5a, + 0x54, 0xfe, 0x6c, 0x82, 0x5a, 0x6d, 0xd1, 0x9e, 0x50, 0xd3, 0x86, 0x6d, 0xa4, 0xaa, 0xac, 0xa6, + 0x11, 0x07, 0x38, 0x84, 0x49, 0x32, 0x9b, 0x46, 0x72, 0x6c, 0xef, 0xcc, 0xa4, 0x28, 0xdf, 0x81, + 0xc3, 0xde, 0xf8, 0x1c, 0x7c, 0x05, 0x4e, 0x3d, 0xee, 0x91, 0x03, 0x32, 0xd0, 0xde, 0xfc, 0x25, + 0x40, 0x9e, 0xb1, 0xe3, 0xb4, 0x24, 0xb4, 0x0b, 0xe1, 0xe2, 0x99, 0x79, 0xef, 0xf7, 0xe6, 0xcd, + 0xbc, 0xf7, 0x7b, 0xcf, 0x03, 0x9b, 0x6c, 0xe4, 0x52, 0x5e, 0x93, 0xdf, 0xa0, 0x53, 0x63, 0x41, + 0xb7, 0x1a, 0x30, 0x5f, 0xf8, 0xc8, 0x14, 0xe7, 0xc4, 0xf3, 0x79, 0x69, 0x8b, 0x0b, 0x9f, 0xd1, + 0x9a, 0xfc, 0x06, 0x9d, 0x9a, 0x18, 0x07, 0x94, 0x2b, 0x48, 0xe9, 0x61, 0xdf, 0xef, 0xfb, 0x72, + 0x5a, 0x8b, 0x67, 0x89, 0xd4, 0xe9, 0xfb, 0x7e, 0xdf, 0xa5, 0x35, 0xb9, 0xea, 0x8c, 0x5e, 0xd6, + 0xc4, 0x60, 0x48, 0xb9, 0x20, 0xc3, 0x40, 0x01, 0x2a, 0x3f, 0xeb, 0xb0, 0x82, 0x63, 0x7f, 0x98, + 0xbe, 0x1a, 0x51, 0x2e, 0xd0, 0x13, 0xc8, 0xc7, 0xdb, 0xda, 0xfa, 0xb6, 0xbe, 0xf3, 0x60, 0x77, + 0xab, 0xaa, 0x3c, 0x57, 0xa7, 0x31, 0xd5, 0xd6, 0x38, 0xa0, 0x58, 0xc2, 0xd0, 0xb7, 0xb0, 0x15, + 0x10, 0x26, 0x06, 0xc4, 0x6d, 0x33, 0xca, 0x03, 0xdf, 0xe3, 0xb4, 0xcd, 0x05, 0x23, 0x82, 0xf6, + 0xc7, 0xf6, 0x92, 0xdc, 0xc3, 0x49, 0xf7, 0x78, 0xa1, 0x80, 0x38, 0xc1, 0x9d, 0x25, 0x30, 0xbc, + 0x19, 0xcc, 0x56, 0x54, 0x3e, 0x84, 0x7c, 0xec, 0x0a, 0x15, 0x20, 0x77, 0x70, 0x72, 0xb2, 0xae, + 0x21, 0x0b, 0x8c, 0x83, 0x93, 0x06, 0x6e, 0xad, 0xeb, 0x08, 0xc0, 0xc4, 0x8d, 0xc3, 0xaf, 0xf0, + 0xd1, 0xfa, 0x52, 0xe5, 0x3b, 0x58, 0x4d, 0xce, 0xa7, 0x36, 0x40, 0x1f, 0x81, 0xd1, 0x67, 0xfe, + 0x28, 0x90, 0xb7, 0x58, 0xde, 0x7d, 0x67, 0xfa, 0x16, 0xcf, 0x63, 0xc5, 0xb1, 0x86, 0x15, 0x02, + 0x95, 0xa0, 0xf0, 0x3d, 0x61, 0xde, 0xc0, 0xeb, 0xcb, 0xe3, 0x5a, 0xc7, 0x1a, 0x4e, 0x05, 0xf5, + 0x22, 0x98, 0x8c, 0xf2, 0x91, 0x2b, 0x2a, 0x87, 0x00, 0x13, 0x5b, 0x8e, 0x9e, 0x82, 0x29, 0x8d, + 0xb9, 0xad, 0x6f, 0xe7, 0x66, 0xee, 0x5f, 0x87, 0x28, 0x74, 0x12, 0x10, 0x4e, 0xc6, 0xca, 0xaf, + 0x79, 0xb0, 0x26, 0x08, 0xf4, 0x1e, 0xe4, 0x3d, 0x32, 0x54, 0x81, 0xb6, 0xea, 0xc5, 0x28, 0x74, + 0xe4, 0x1a, 0xcb, 0x6f, 0xac, 0x7d, 0x39, 0x70, 0xa9, 0x3a, 0x93, 0xd2, 0xc6, 0x6b, 0x2c, 0xbf, + 0xe8, 0x09, 0x18, 0x92, 0x24, 0x76, 0x4e, 0xfa, 0x5f, 0x99, 0xf6, 0x5f, 0xb7, 0xa2, 0xd0, 0x51, + 0x6a, 0xac, 0x06, 0xb4, 0x03, 0xc5, 0x81, 0x27, 0x28, 0xbb, 0x20, 0xae, 0x9d, 0xdf, 0xd6, 0x77, + 0xf4, 0xfa, 0x4a, 0x14, 0x3a, 0x13, 0x19, 0x9e, 0xcc, 0x10, 0x86, 0x47, 0xf4, 0x82, 0xb8, 0x23, + 0x22, 0x06, 0xbe, 0xd7, 0xee, 0x8d, 0x98, 0x9a, 0x70, 0xda, 0xf5, 0xbd, 0x1e, 0xb7, 0x0d, 0x69, + 0x8c, 0xa2, 0xd0, 0x79, 0x90, 0xc1, 0x5a, 0x83, 0x21, 0xc5, 0x5b, 0xd9, 0xfa, 0x28, 0xb1, 0x3a, + 0x53, 0x46, 0xa8, 0x0d, 0x6b, 0x2e, 0xe1, 0xa2, 0x9d, 0x21, 0x6c, 0x53, 0xa6, 0xa5, 0x54, 0x55, + 0xec, 0xac, 0xa6, 0xec, 0xac, 0xb6, 0x52, 0x76, 0xd6, 0x4b, 0x97, 0xa1, 0xa3, 0xc5, 0x7e, 0x62, + 0xd3, 0xc6, 0xc4, 0xf2, 0xf5, 0x6f, 0x8e, 0x8e, 0x6f, 0xc9, 0xd0, 0x0f, 0x3a, 0x7c, 0x70, 0x44, + 0x03, 0x46, 0xbb, 0x44, 0xd0, 0xde, 0x1c, 0x96, 0xd9, 0x85, 0x7b, 0x91, 0xb1, 0xfe, 0x7e, 0x14, + 0x3a, 0xf3, 0x29, 0x8d, 0xef, 0x76, 0x84, 0x2e, 0x60, 0x73, 0xde, 0x19, 0x8a, 0xf7, 0x3b, 0xc3, + 0xa3, 0x28, 0x74, 0xe6, 0x15, 0x05, 0x9e, 0xb7, 0x79, 0xc5, 0x83, 0x7c, 0x9c, 0x7f, 0xf4, 0x14, + 0x2c, 0x46, 0xbb, 0x3e, 0xeb, 0xc5, 0x9c, 0x56, 0x05, 0xb0, 0x31, 0x21, 0x48, 0xaa, 0x88, 0x91, + 0xc7, 0x1a, 0xce, 0x90, 0xe8, 0x31, 0x18, 0xc4, 0xa5, 0x4c, 0x48, 0xca, 0x2d, 0xef, 0xae, 0xa6, + 0x26, 0x07, 0xb1, 0x30, 0xae, 0x17, 0xa9, 0x9d, 0xaa, 0x89, 0x9f, 0x72, 0xb0, 0x2a, 0x95, 0x4d, + 0x8f, 0x0b, 0xe2, 0x75, 0x29, 0xda, 0x07, 0xd3, 0x25, 0x1d, 0xea, 0xf2, 0xc4, 0x2d, 0x9a, 0x5c, + 0x94, 0xf9, 0xc3, 0x13, 0xa9, 0x51, 0x85, 0xa1, 0x50, 0x38, 0x19, 0x51, 0x03, 0x96, 0x89, 0xe7, + 0xf9, 0x42, 0xa6, 0x93, 0x27, 0x07, 0x98, 0x65, 0xbc, 0x16, 0x85, 0xce, 0x34, 0x14, 0x4f, 0x2f, + 0xd0, 0x1e, 0x18, 0x5c, 0x10, 0x41, 0xed, 0x9c, 0x0c, 0x33, 0xba, 0x71, 0x83, 0xb3, 0x58, 0xa3, + 0x6a, 0x43, 0x82, 0xb0, 0x1a, 0xd0, 0x19, 0x58, 0xa4, 0x2b, 0x06, 0x17, 0xb4, 0x4d, 0x84, 0x2c, + 0x8e, 0x3b, 0x78, 0x19, 0x85, 0x0e, 0x52, 0x06, 0x07, 0xe2, 0x53, 0x7f, 0x38, 0x10, 0x74, 0x18, + 0x88, 0xb1, 0xe4, 0x65, 0x31, 0x95, 0x23, 0x07, 0x8c, 0x98, 0x9e, 0x54, 0x16, 0x8c, 0xa5, 0xbc, + 0x4a, 0x01, 0x56, 0xc3, 0x3f, 0x71, 0xc4, 0xfc, 0x3f, 0x39, 0xf2, 0x67, 0x1e, 0x0c, 0x19, 0x8e, + 0x2c, 0x58, 0xfa, 0x5b, 0x04, 0x2b, 0xed, 0x59, 0x4b, 0x33, 0x7b, 0x96, 0x03, 0xc6, 0xab, 0x11, + 0x65, 0x63, 0x19, 0xff, 0xe4, 0xd6, 0x52, 0x80, 0xd5, 0x80, 0x3e, 0x87, 0xf5, 0xbf, 0xb5, 0x94, + 0xa9, 0x7e, 0x94, 0xea, 0xf0, 0x5a, 0xef, 0x56, 0x0b, 0xc9, 0x88, 0x65, 0xfc, 0x17, 0x62, 0x99, + 0xff, 0x92, 0x58, 0xcf, 0xc0, 0x94, 0xe4, 0xe7, 0x76, 0x41, 0xf6, 0xdb, 0x8d, 0x1b, 0xc1, 0x4a, + 0xe9, 0xaf, 0x4e, 0xa0, 0x80, 0x38, 0x19, 0x51, 0x05, 0xcc, 0x73, 0x4a, 0x5c, 0x71, 0x2e, 0x6b, + 0xdf, 0x52, 0x18, 0x25, 0xc1, 0xc9, 0x88, 0xf6, 0x01, 0x54, 0x83, 0x64, 0xcc, 0x67, 0xb6, 0x25, + 0x71, 0x9b, 0x51, 0xe8, 0xbc, 0x2b, 0xfb, 0x5c, 0x2c, 0xcc, 0x88, 0x86, 0xad, 0x89, 0xf0, 0xae, + 0x66, 0x0d, 0x0b, 0x6a, 0xd6, 0xcb, 0x8b, 0x6c, 0xd6, 0x95, 0x1f, 0x73, 0xb0, 0x7a, 0xa3, 0x0b, + 0xdd, 0xf1, 0x23, 0x9c, 0x90, 0x6a, 0x69, 0x0e, 0xa9, 0x32, 0x6e, 0xe4, 0xde, 0x8a, 0x1b, 0x59, + 0x66, 0xf2, 0xf7, 0xcc, 0x8c, 0xb1, 0xa8, 0xcc, 0x98, 0x0b, 0xca, 0x4c, 0x61, 0xa1, 0x99, 0x79, + 0x06, 0x90, 0x85, 0x0c, 0x7d, 0x32, 0xd5, 0xcb, 0x73, 0xd3, 0xff, 0x03, 0xa9, 0xaf, 0xe7, 0xe3, + 0x8d, 0xd3, 0x58, 0x7e, 0xbc, 0x07, 0x90, 0xf5, 0x0d, 0xb4, 0x02, 0xc5, 0xe6, 0xe9, 0xc1, 0x61, + 0xab, 0xf9, 0x75, 0x63, 0x5d, 0x43, 0xcb, 0x50, 0x78, 0xd1, 0x38, 0x3d, 0x6a, 0x9e, 0x3e, 0x57, + 0xaf, 0xb6, 0x2f, 0x9b, 0x38, 0x9e, 0x2f, 0xed, 0x7e, 0x01, 0x86, 0x7c, 0xb5, 0xa1, 0xfd, 0x74, + 0xf2, 0x70, 0xd6, 0x6b, 0xb3, 0xb4, 0x71, 0x4b, 0xaa, 0x5a, 0xda, 0x67, 0x7a, 0xfd, 0xf1, 0xe5, + 0x1f, 0x65, 0xed, 0xf2, 0xaa, 0xac, 0xbf, 0xb9, 0x2a, 0xeb, 0xbf, 0x5f, 0x95, 0xf5, 0xd7, 0xd7, + 0x65, 0xed, 0xcd, 0x75, 0x59, 0xfb, 0xe5, 0xba, 0xac, 0x7d, 0x53, 0x48, 0x9e, 0xd1, 0x1d, 0x53, + 0xc6, 0x65, 0xef, 0xaf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x69, 0x27, 0xb9, 0x35, 0x5e, 0x0b, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. diff --git a/pkg/rules/rulespb/rpc.proto b/pkg/rules/rulespb/rpc.proto index 4a28a01441b..bdaab6f66d3 100644 --- a/pkg/rules/rulespb/rpc.proto +++ b/pkg/rules/rulespb/rpc.proto @@ -30,9 +30,12 @@ service Rules { message RulesRequest { enum Type { - ALL = 0; - ALERTING = 1; - RECORDING = 2; + ALL = 0; + /// This will make sure strings.ToLower(.String()) will match 'alert' and 'record' values for + /// Prometheus HTTP API. + /// NOTE: The implementation has to return empty rule groups as well. + ALERT = 1; + RECORD = 2; } Type type = 1; PartialResponseStrategy partial_response_strategy = 2; @@ -40,7 +43,7 @@ message RulesRequest { message RulesResponse { oneof result { - // It is up to server implementation to decide how many of those to put here. + /// group for rule groups. It is up to server implementation to decide how many of those to put here within single frame. RuleGroup group = 1; /// warning is considered an information piece in place of series for warning purposes. diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index 4b98cbf5fc7..dd530d504c7 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -515,11 +515,7 @@ Outer: } lset = append(lset, l) } - for ei < len(pbExtend) { - lset = append(lset, pbExtend[ei]) - ei++ - } - return lset + return storepb.ExtendLabels(lset, extend) } // LabelNames returns all known label names. diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index ba96426483d..2551e75908f 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -4,6 +4,8 @@ package storepb import ( + "fmt" + "sort" "strconv" "strings" "unsafe" @@ -249,18 +251,18 @@ func LabelSetsToString(lsets []LabelSet) string { func (x *PartialResponseStrategy) UnmarshalJSON(entry []byte) error { fieldStr, err := strconv.Unquote(string(entry)) if err != nil { - return errors.Wrapf(err, "partialResponseStrategy: unquote %v", string(entry)) + return errors.Wrapf(err, fmt.Sprintf("failed to unqote %v, in order to unmarshal as 'partial_response_strategy'. Possible values are %s", string(entry), strings.Join(PartialResponseStrategyValues, ","))) } if len(fieldStr) == 0 { - // Default. - *x = PartialResponseStrategy_WARN + // NOTE: For Rule default is abort as this is recommended for alerting. + *x = PartialResponseStrategy_ABORT return nil } strategy, ok := PartialResponseStrategy_value[strings.ToUpper(fieldStr)] if !ok { - return errors.Errorf("unknown partialResponseStrategy: %v", string(entry)) + return errors.Errorf(fmt.Sprintf("failed to unmarshal %v as 'partial_response_strategy'. Possible values are %s", string(entry), strings.Join(PartialResponseStrategyValues, ","))) } *x = PartialResponseStrategy(strategy) return nil @@ -269,3 +271,31 @@ func (x *PartialResponseStrategy) UnmarshalJSON(entry []byte) error { func (x *PartialResponseStrategy) MarshalJSON() ([]byte, error) { return []byte(strconv.Quote(x.String())), nil } + +// ExtendLabels extend given labels by extend in labels format. +// The type conversion is done safely, which means we don't modify extend labels underlying array. +// +// In case of existing labels already present in given label set, it will be overwritten by external one. +func ExtendLabels(lset []Label, extend labels.Labels) []Label { + overwritten := map[string]struct{}{} + for i, l := range lset { + if v := extend.Get(l.Name); v != "" { + lset[i].Value = v + overwritten[l.Name] = struct{}{} + } + } + + for _, l := range extend { + if _, ok := overwritten[l.Name]; ok { + continue + } + lset = append(lset, Label{ + Name: l.Name, + Value: l.Value, + }) + } + sort.Slice(lset, func(i, j int) bool { + return lset[i].Name < lset[j].Name + }) + return lset +} diff --git a/pkg/store/storepb/custom_test.go b/pkg/store/storepb/custom_test.go index cbeaed69500..6b8e0ab82b5 100644 --- a/pkg/store/storepb/custom_test.go +++ b/pkg/store/storepb/custom_test.go @@ -269,7 +269,17 @@ func seriesEquals(t *testing.T, expected []rawSeries, gotSS SeriesSet) { testutil.Equals(t, len(expected[i].chunks[k]), j) } } +} + +func TestExtendLabels(t *testing.T) { + testutil.Equals(t, []Label{{Name: "a", Value: "1"}, {Name: "replica", Value: "01"}, {Name: "xb", Value: "2"}}, + ExtendLabels([]Label{{Name: "xb", Value: "2"}, {Name: "a", Value: "1"}}, labels.FromStrings("replica", "01"))) + + testutil.Equals(t, []Label{{Name: "replica", Value: "01"}}, + ExtendLabels([]Label{}, labels.FromStrings("replica", "01"))) + testutil.Equals(t, []Label{{Name: "a", Value: "1"}, {Name: "replica", Value: "01"}, {Name: "xb", Value: "2"}}, + ExtendLabels([]Label{{Name: "xb", Value: "2"}, {Name: "replica", Value: "NOT01"}, {Name: "a", Value: "1"}}, labels.FromStrings("replica", "01"))) } // Test the cost of merging series sets for different number of merged sets and their size. diff --git a/pkg/testutil/e2eutil/prometheus.go b/pkg/testutil/e2eutil/prometheus.go index becbd236728..ef4af115db2 100644 --- a/pkg/testutil/e2eutil/prometheus.go +++ b/pkg/testutil/e2eutil/prometheus.go @@ -36,7 +36,7 @@ import ( ) const ( - defaultPrometheusVersion = "v2.13.0" + defaultPrometheusVersion = "v2.18.0" defaultAlertmanagerVersion = "v0.20.0" defaultMinioVersion = "RELEASE.2018-10-06T00-15-16Z" diff --git a/pkg/ui/rule.go b/pkg/ui/rule.go index c9125e3052c..6e78cbc1662 100644 --- a/pkg/ui/rule.go +++ b/pkg/ui/rule.go @@ -18,7 +18,7 @@ import ( "github.com/prometheus/prometheus/rules" "github.com/thanos-io/thanos/pkg/component" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" - "github.com/thanos-io/thanos/pkg/rules/manager" + thanosrules "github.com/thanos-io/thanos/pkg/rules" ) type Rule struct { @@ -26,12 +26,12 @@ type Rule struct { externalPrefix, prefixHeader string - ruleManager *manager.Manager + ruleManager *thanosrules.Manager queryURL string reg prometheus.Registerer } -func NewRuleUI(logger log.Logger, reg prometheus.Registerer, ruleManager *manager.Manager, queryURL string, externalPrefix, prefixHeader string) *Rule { +func NewRuleUI(logger log.Logger, reg prometheus.Registerer, ruleManager *thanosrules.Manager, queryURL, externalPrefix, prefixHeader string) *Rule { return &Rule{ BaseUI: NewBaseUI(logger, "rule_menu.html", ruleTmplFuncs(queryURL), externalPrefix, prefixHeader, component.Rule), externalPrefix: externalPrefix, @@ -118,7 +118,7 @@ func ruleTmplFuncs(queryURL string) template.FuncMap { } func (ru *Rule) alerts(w http.ResponseWriter, r *http.Request) { - var groups []manager.Group + var groups []thanosrules.Group for _, group := range ru.ruleManager.RuleGroups() { if group.HasAlertingRules() { groups = append(groups, group) @@ -178,7 +178,7 @@ func (ru *Rule) Register(r *route.Router, ins extpromhttp.InstrumentationMiddlew // AlertStatus bundles alerting rules and the mapping of alert states to row classes. type AlertStatus struct { - Groups []manager.Group + Groups []thanosrules.Group AlertStateToRowClass map[rules.AlertState]string Counts AlertByStateCount } @@ -189,7 +189,7 @@ type AlertByStateCount struct { Firing int32 } -func alertCounts(groups []manager.Group) AlertByStateCount { +func alertCounts(groups []thanosrules.Group) AlertByStateCount { result := AlertByStateCount{} for _, group := range groups { for _, alert := range group.AlertingRules() { diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index ff3424bb35a..7efcdf45e3c 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -36,7 +36,7 @@ var defaultBackoffConfig = util.BackoffConfig{ // TODO(bwplotka): Run against multiple? func DefaultPrometheusImage() string { - return "quay.io/prometheus/prometheus:v2.16.0" + return "quay.io/prometheus/prometheus:v2.18.0" } func DefaultAlertmanagerImage() string { diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 2baf62783b2..11f911c1919 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -18,7 +18,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/thanos-io/thanos/pkg/promclient" - "github.com/thanos-io/thanos/pkg/rules/rulespb" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" @@ -168,39 +167,6 @@ func TestQuery(t *testing.T) { }) } -func TestRulesFanout(t *testing.T) { - t.Parallel() - - netName := "e2e_test_rules_fanout" - - s, err := e2e.NewScenario(netName) - testutil.Ok(t, err) - defer s.Close() - - rulesSubDir := filepath.Join("rules") - testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), rulesSubDir), os.ModePerm)) - createRuleFiles(t, filepath.Join(s.SharedDir(), rulesSubDir)) - - prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar(s.SharedDir(), netName, "alone", defaultPromConfig("prom-alone", 0, "", filepath.Join(e2e.ContainerSharedDir, rulesSubDir, "*.yaml")), e2ethanos.DefaultPrometheusImage()) - testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1)) - - q, err := e2ethanos.NewQuerier(s.SharedDir(), "1", - []string{sidecar1.GRPCNetworkEndpoint()}, - nil, - []string{sidecar1.GRPCNetworkEndpoint()}, - ) - testutil.Ok(t, err) - testutil.Ok(t, s.StartAndWaitReady(q)) - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer cancel() - - testutil.Ok(t, q.WaitSumMetrics(e2e.Equals(1), "thanos_store_nodes_grpc_connections")) - - ruleAndAssert(t, ctx, q.HTTPEndpoint(), "", 1) -} - func urlParse(t *testing.T, addr string) *url.URL { u, err := url.Parse(addr) testutil.Ok(t, err) @@ -243,37 +209,3 @@ func queryAndAssertSeries(t *testing.T, ctx context.Context, addr string, q stri testutil.Equals(t, exp, result[i].Metric) } } - -func ruleAndAssert(t *testing.T, ctx context.Context, addr string, typ string, expectedLen int) { - t.Helper() - - fmt.Println("ruleAndAssert: Waiting for", expectedLen, "results for rules type", typ) - var result []*rulespb.RuleGroup - testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { - res, err := promclient.NewDefaultClient().RulesInGRPC(ctx, urlParse(t, "http://"+addr), typ) - if err != nil { - return err - } - - if len(result) != len(res) { - fmt.Println("ruleAndAssert: New result:", res) - } - - if len(res) != expectedLen { - return errors.Errorf("unexpected result size, expected %d; result: %v", expectedLen, res) - } - result = res - return nil - })) -} - -func queryAndAssert(t *testing.T, ctx context.Context, addr string, q string, opts promclient.QueryOptions, expected model.Vector) { - t.Helper() - - sortResults(expected) - result := instantQuery(t, ctx, addr, q, opts, len(expected)) - for _, r := range result { - r.Timestamp = 0 // Does not matter for us. - } - testutil.Equals(t, expected, result) -} diff --git a/test/e2e/rules_api_test.go b/test/e2e/rules_api_test.go new file mode 100644 index 00000000000..d0fad51dec0 --- /dev/null +++ b/test/e2e/rules_api_test.go @@ -0,0 +1,103 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package e2e_test + +import ( + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/cortexproject/cortex/integration/e2e" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/rules/rulespb" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/testutil" + "github.com/thanos-io/thanos/test/e2e/e2ethanos" +) + +func TestRulesAPI_Fanout(t *testing.T) { + t.Parallel() + + netName := "e2e_test_rules_fanout" + + s, err := e2e.NewScenario(netName) + testutil.Ok(t, err) + defer s.Close() + + rulesSubDir := filepath.Join("rules") + testutil.Ok(t, os.MkdirAll(filepath.Join(s.SharedDir(), rulesSubDir), os.ModePerm)) + createRuleFiles(t, filepath.Join(s.SharedDir(), rulesSubDir)) + + // 2x Prometheus. + prom1, sidecar1, err := e2ethanos.NewPrometheusWithSidecar( + s.SharedDir(), + netName, + "prom1", + defaultPromConfig("prom1", 0, "", filepath.Join(e2e.ContainerSharedDir, rulesSubDir, "*.yaml")), + e2ethanos.DefaultPrometheusImage(), + ) + testutil.Ok(t, err) + prom2, sidecar2, err := e2ethanos.NewPrometheusWithSidecar( + s.SharedDir(), + netName, + "prom2", + defaultPromConfig("prom2", 0, "", filepath.Join(e2e.ContainerSharedDir, rulesSubDir, "*.yaml")), + e2ethanos.DefaultPrometheusImage(), + ) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(prom1, sidecar1, prom2, sidecar2)) + + // 2x Rulers. + r1, err := e2ethanos.NewRuler(s.SharedDir(), "rule1", rulesSubDir, nil, nil) + testutil.Ok(t, err) + r2, err := e2ethanos.NewRuler(s.SharedDir(), "rule2", rulesSubDir, nil, nil) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(r1, r2)) + + q, err := e2ethanos.NewQuerier(s.SharedDir(), "query", + []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, + nil, + []string{sidecar1.GRPCNetworkEndpoint(), sidecar2.GRPCNetworkEndpoint(), r1.GRPCNetworkEndpoint(), r2.GRPCNetworkEndpoint()}, + ) + testutil.Ok(t, err) + testutil.Ok(t, s.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + + testutil.Ok(t, q.WaitSumMetrics(e2e.Equals(1), "thanos_store_nodes_grpc_connections")) + + // TODO(bwplotka): Let's not be lazy and expect EXACT rules and alerts for all request types. + // TODO(bwplotka): Test dedup true and false. + + // For now expects two, as we should deduplicate both rulers and prometheus. + ruleAndAssert(t, ctx, q.HTTPEndpoint(), "", 2) +} + +func ruleAndAssert(t *testing.T, ctx context.Context, addr string, typ string, expectedLen int) { + t.Helper() + + fmt.Println("ruleAndAssert: Waiting for", expectedLen, "results for rules type", typ) + var result []*rulespb.RuleGroup + testutil.Ok(t, runutil.Retry(time.Second, ctx.Done(), func() error { + res, err := promclient.NewDefaultClient().RulesInGRPC(ctx, urlParse(t, "http://"+addr), typ) + if err != nil { + return err + } + + if len(result) != len(res) { + fmt.Println("ruleAndAssert: New result:", res) + } + + if len(res) != expectedLen { + return errors.Errorf("unexpected result size, expected %d; result: %v", expectedLen, res) + } + result = res + return nil + })) +}