diff --git a/CHANGELOG.md b/CHANGELOG.md index 59607a440e3..be84189e832 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ * [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345 * [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346 * [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336 +* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335 ## 1.10.0-rc.0 / 2021-06-28 diff --git a/integration/e2e/db/db.go b/integration/e2e/db/db.go index fe6de79411b..ed397bbda82 100644 --- a/integration/e2e/db/db.go +++ b/integration/e2e/db/db.go @@ -14,18 +14,22 @@ const ( ) // NewMinio returns minio server, used as a local replacement for S3. -func NewMinio(port int, bktName string) *e2e.HTTPService { +func NewMinio(port int, bktNames ...string) *e2e.HTTPService { minioKESGithubContent := "https://raw.githubusercontent.com/minio/kes/master" commands := []string{ - "curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'", - "mkdir -p /data/%s && minio server --address :%v --quiet /data", + fmt.Sprintf("curl -sSL --tlsv1.2 -O '%s/root.key' -O '%s/root.cert'", minioKESGithubContent, minioKESGithubContent), } + for _, bkt := range bktNames { + commands = append(commands, fmt.Sprintf("mkdir -p /data/%s", bkt)) + } + commands = append(commands, fmt.Sprintf("minio server --address :%v --quiet /data", port)) + m := e2e.NewHTTPService( fmt.Sprintf("minio-%v", port), images.Minio, // Create the "cortex" bucket before starting minio - e2e.NewCommandWithoutEntrypoint("sh", "-c", fmt.Sprintf(strings.Join(commands, " && "), minioKESGithubContent, minioKESGithubContent, bktName, port)), + e2e.NewCommandWithoutEntrypoint("sh", "-c", strings.Join(commands, " && ")), e2e.NewHTTPReadinessProbe(port, "/minio/health/ready", 200, 200), port, ) diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 4b47849dfc9..72fed34997e 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -318,6 +318,11 @@ func (c *Client) SetRuleGroup(rulegroup rulefmt.RuleGroup, namespace string) err } defer res.Body.Close() + + if res.StatusCode != 202 { + return fmt.Errorf("unexpected status code: %d", res.StatusCode) + } + return nil } diff --git a/integration/ruler_test.go b/integration/ruler_test.go index 885f84a645a..08dec2e8a39 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -523,6 +523,166 @@ func TestRulerAlertmanagerTLS(t *testing.T) { require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_notifications_alertmanagers_discovered"}, e2e.WaitMissingMetrics)) } +func TestRulerMetricsForInvalidQueries(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Configure the ruler. + flags := mergeFlags( + BlocksStorageFlags(), + RulerFlags(false), + map[string]string{ + // Since we're not going to run any rule (our only rule is invalid), we don't need the + // store-gateway to be configured to a valid address. + "-querier.store-gateway-addresses": "localhost:12345", + // Enable the bucket index so we can skip the initial bucket scan. + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + // Evaluate rules often, so that we don't need to wait for metrics to show up. + "-ruler.evaluation-interval": "2s", + "-ruler.poll-interval": "2s", + // No delay + "-ruler.evaluation-delay-duration": "0", + + "-blocks-storage.tsdb.block-ranges-period": "1h", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": "2h", + + // We run single ingester only, no replication. + "-distributor.replication-factor": "1", + + // Very low limit so that ruler hits it. + "-querier.max-fetched-chunks-per-query": "5", + // We need this to make limit work. + "-ingester.stream-chunks-when-using-blocks": "true", + }, + ) + + const namespace = "test" + const user = "user" + + distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "") + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler)) + + // Wait until both the distributor and ruler have updated the ring. The querier will also watch + // the store-gateway ring if blocks sharding is enabled. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + // Push some series to Cortex -- enough so that we can hit some limits. + for i := 0; i < 10; i++ { + series, _ := generateSeries("metric", time.Now(), prompb.Label{Name: "foo", Value: fmt.Sprintf("%d", i)}) + + res, err := c.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + } + + totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}) + require.NoError(t, err) + + // Verify that user-failures don't increase cortex_ruler_queries_failed_total + for groupName, expression := range map[string]string{ + // Syntactically correct expression (passes check in ruler), but failing because of invalid regex. This fails in PromQL engine. + "invalid_group": `label_replace(metric, "foo", "$1", "service", "[")`, + + // This one fails in querier code, because of limits. + "too_many_chunks_group": `sum(metric)`, + } { + t.Run(groupName, func(t *testing.T) { + require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, "rule", expression), namespace)) + m := ruleGroupMatcher(user, namespace, groupName) + + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + // Wait until rule group has tried to evaluate the rule. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + // Verify that evaluation of the rule failed. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + // But these failures were not reported as "failed queries" + sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}) + require.NoError(t, err) + require.Equal(t, float64(0), sum[0]) + + // Delete rule before checkin "cortex_ruler_queries_total", as we want to reuse value for next test. + require.NoError(t, c.DeleteRuleGroup(namespace, groupName)) + + // Wait until ruler has unloaded the group. We don't use any matcher, so there should be no groups (in fact, metric disappears). + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics)) + + // Check that cortex_ruler_queries_total went up since last test. + newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}) + require.NoError(t, err) + require.Greater(t, newTotalQueries[0], totalQueries[0]) + + // Remember totalQueries for next test. + totalQueries = newTotalQueries + }) + } + + // Now let's upload a non-failing rule, and make sure that it works. + t.Run("real_error", func(t *testing.T) { + const groupName = "good_rule" + const expression = `sum(metric{foo=~"1|2"})` + + require.NoError(t, c.SetRuleGroup(ruleGroupWithRule(groupName, "rule", expression), namespace)) + m := ruleGroupMatcher(user, namespace, groupName) + + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + // Wait until rule group has tried to evaluate the rule, and succeeded. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + // Still no failures. + sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}) + require.NoError(t, err) + require.Equal(t, float64(0), sum[0]) + + // Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_queries_failed_total failures. + require.NoError(t, s.Stop(ingester)) + + // We should start getting "real" failures now. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"})) + }) +} + +func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher { + return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName)) +} + +func ruleGroupWithRule(groupName string, ruleName string, expression string) rulefmt.RuleGroup { + // Prepare rule group with invalid rule. + var recordNode = yaml.Node{} + var exprNode = yaml.Node{} + + recordNode.SetString(ruleName) + exprNode.SetString(expression) + + return rulefmt.RuleGroup{ + Name: groupName, + Interval: 10, + Rules: []rulefmt.RuleNode{{ + Record: recordNode, + Expr: exprNode, + }}, + } +} + func createTestRuleGroup(t *testing.T) rulefmt.RuleGroup { t.Helper() diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 79a231c16f4..9af0055db9d 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -194,7 +194,7 @@ func NewQuerierHandler( api := v1.NewAPI( engine, - querier.NewErrorTranslateQueryable(queryable), // Translate errors to errors expected by API. + querier.NewErrorTranslateSampleAndChunkQueryable(queryable), // Translate errors to errors expected by API. nil, // No remote write support. exemplarQueryable, func(context.Context) v1.TargetRetriever { return &querier.DummyTargetRetriever{} }, diff --git a/pkg/querier/error_translate_queryable.go b/pkg/querier/error_translate_queryable.go index 97c1584d41c..ce9289b18bb 100644 --- a/pkg/querier/error_translate_queryable.go +++ b/pkg/querier/error_translate_queryable.go @@ -69,72 +69,103 @@ func TranslateToPromqlAPIError(err error) error { } } -func NewErrorTranslateQueryable(q storage.SampleAndChunkQueryable) storage.SampleAndChunkQueryable { - return errorTranslateQueryable{q} +// ErrTranslateFn is used to translate or wrap error before returning it by functions in +// storage.SampleAndChunkQueryable interface. +// Input error may be nil. +type ErrTranslateFn func(err error) error + +func NewErrorTranslateQueryable(q storage.Queryable) storage.Queryable { + return NewErrorTranslateQueryableWithFn(q, TranslateToPromqlAPIError) +} + +func NewErrorTranslateQueryableWithFn(q storage.Queryable, fn ErrTranslateFn) storage.Queryable { + return errorTranslateQueryable{q: q, fn: fn} +} + +func NewErrorTranslateSampleAndChunkQueryable(q storage.SampleAndChunkQueryable) storage.SampleAndChunkQueryable { + return NewErrorTranslateSampleAndChunkQueryableWithFn(q, TranslateToPromqlAPIError) +} + +func NewErrorTranslateSampleAndChunkQueryableWithFn(q storage.SampleAndChunkQueryable, fn ErrTranslateFn) storage.SampleAndChunkQueryable { + return errorTranslateSampleAndChunkQueryable{q: q, fn: fn} } type errorTranslateQueryable struct { - q storage.SampleAndChunkQueryable + q storage.Queryable + fn ErrTranslateFn } func (e errorTranslateQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { q, err := e.q.Querier(ctx, mint, maxt) - return errorTranslateQuerier{q: q}, TranslateToPromqlAPIError(err) + return errorTranslateQuerier{q: q, fn: e.fn}, e.fn(err) +} + +type errorTranslateSampleAndChunkQueryable struct { + q storage.SampleAndChunkQueryable + fn ErrTranslateFn +} + +func (e errorTranslateSampleAndChunkQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { + q, err := e.q.Querier(ctx, mint, maxt) + return errorTranslateQuerier{q: q, fn: e.fn}, e.fn(err) } -func (e errorTranslateQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { +func (e errorTranslateSampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.ChunkQuerier, error) { q, err := e.q.ChunkQuerier(ctx, mint, maxt) - return errorTranslateChunkQuerier{q: q}, TranslateToPromqlAPIError(err) + return errorTranslateChunkQuerier{q: q, fn: e.fn}, e.fn(err) } type errorTranslateQuerier struct { - q storage.Querier + q storage.Querier + fn ErrTranslateFn } func (e errorTranslateQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { values, warnings, err := e.q.LabelValues(name, matchers...) - return values, warnings, TranslateToPromqlAPIError(err) + return values, warnings, e.fn(err) } func (e errorTranslateQuerier) LabelNames() ([]string, storage.Warnings, error) { values, warnings, err := e.q.LabelNames() - return values, warnings, TranslateToPromqlAPIError(err) + return values, warnings, e.fn(err) } func (e errorTranslateQuerier) Close() error { - return TranslateToPromqlAPIError(e.q.Close()) + return e.fn(e.q.Close()) } func (e errorTranslateQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { s := e.q.Select(sortSeries, hints, matchers...) - return errorTranslateSeriesSet{s} + return errorTranslateSeriesSet{s: s, fn: e.fn} } type errorTranslateChunkQuerier struct { - q storage.ChunkQuerier + q storage.ChunkQuerier + fn ErrTranslateFn } func (e errorTranslateChunkQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { values, warnings, err := e.q.LabelValues(name, matchers...) - return values, warnings, TranslateToPromqlAPIError(err) + return values, warnings, e.fn(err) } func (e errorTranslateChunkQuerier) LabelNames() ([]string, storage.Warnings, error) { values, warnings, err := e.q.LabelNames() - return values, warnings, TranslateToPromqlAPIError(err) + return values, warnings, e.fn(err) } func (e errorTranslateChunkQuerier) Close() error { - return TranslateToPromqlAPIError(e.q.Close()) + return e.fn(e.q.Close()) } func (e errorTranslateChunkQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.ChunkSeriesSet { s := e.q.Select(sortSeries, hints, matchers...) - return errorTranslateChunkSeriesSet{s} + return errorTranslateChunkSeriesSet{s: s, fn: e.fn} } type errorTranslateSeriesSet struct { - s storage.SeriesSet + s storage.SeriesSet + fn ErrTranslateFn } func (e errorTranslateSeriesSet) Next() bool { @@ -146,7 +177,7 @@ func (e errorTranslateSeriesSet) At() storage.Series { } func (e errorTranslateSeriesSet) Err() error { - return TranslateToPromqlAPIError(e.s.Err()) + return e.fn(e.s.Err()) } func (e errorTranslateSeriesSet) Warnings() storage.Warnings { @@ -154,7 +185,8 @@ func (e errorTranslateSeriesSet) Warnings() storage.Warnings { } type errorTranslateChunkSeriesSet struct { - s storage.ChunkSeriesSet + s storage.ChunkSeriesSet + fn ErrTranslateFn } func (e errorTranslateChunkSeriesSet) Next() bool { @@ -166,7 +198,7 @@ func (e errorTranslateChunkSeriesSet) At() storage.ChunkSeries { } func (e errorTranslateChunkSeriesSet) Err() error { - return TranslateToPromqlAPIError(e.s.Err()) + return e.fn(e.s.Err()) } func (e errorTranslateChunkSeriesSet) Warnings() storage.Warnings { diff --git a/pkg/querier/error_translate_queryable_test.go b/pkg/querier/error_translate_queryable_test.go index afeddf7c78d..35e882212aa 100644 --- a/pkg/querier/error_translate_queryable_test.go +++ b/pkg/querier/error_translate_queryable_test.go @@ -113,7 +113,7 @@ func TestApiStatusCodes(t *testing.T) { "error from seriesset": errorTestQueryable{q: errorTestQuerier{s: errorTestSeriesSet{err: tc.err}}}, } { t.Run(fmt.Sprintf("%s/%d", k, ix), func(t *testing.T) { - r := createPrometheusAPI(errorTranslateQueryable{q: q}) + r := createPrometheusAPI(NewErrorTranslateSampleAndChunkQueryable(q)) rec := httptest.NewRecorder() req := httptest.NewRequest("GET", "/api/v1/query?query=up", nil) diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 9ac5fe899b1..8ee0fe51917 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -149,16 +149,30 @@ func MetricsQueryFunc(qf rules.QueryFunc, queries, failedQueries prometheus.Coun return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { queries.Inc() result, err := qf(ctx, qs, t) - // We rely on TranslateToPromqlApiError to do its job here... it returns nil, if err is nil. - // It returns promql.ErrStorage, if error should be reported back as 500. - // Other errors it returns are either for canceled or timed-out queriers (we're not reporting those as failures), - // or various user-errors (limits, duplicate samples, etc. ... also not failures). - // - // All errors will still be counted towards "evaluation failures" metrics and logged by Prometheus Ruler, - // but we only want internal errors here. - if _, ok := querier.TranslateToPromqlAPIError(err).(promql.ErrStorage); ok { - failedQueries.Inc() + + // We only care about errors returned by underlying Queryable. Errors returned by PromQL engine are "user-errors", + // and not interesting here. + qerr := QueryableError{} + if err != nil && errors.As(err, &qerr) { + origErr := qerr.Unwrap() + + // Not all errors returned by Queryable are interesting, only those that would result in 500 status code. + // + // We rely on TranslateToPromqlApiError to do its job here... it returns nil, if err is nil. + // It returns promql.ErrStorage, if error should be reported back as 500. + // Other errors it returns are either for canceled or timed-out queriers (we're not reporting those as failures), + // or various user-errors (limits, duplicate samples, etc. ... also not failures). + // + // All errors will still be counted towards "evaluation failures" metrics and logged by Prometheus Ruler, + // but we only want internal errors here. + if _, ok := querier.TranslateToPromqlAPIError(origErr).(promql.ErrStorage); ok { + failedQueries.Inc() + } + + // Return unwrapped error. + return result, origErr } + return result, err } } @@ -234,6 +248,11 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi }, []string{"user"}) } + // Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors + // and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors. + // Errors from PromQL are always "user" errors. + q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors) + return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager { var queryTime prometheus.Counter = nil if rulerQuerySeconds != nil { @@ -255,3 +274,23 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi }) } } + +type QueryableError struct { + err error +} + +func (q QueryableError) Unwrap() error { + return q.err +} + +func (q QueryableError) Error() string { + return q.err.Error() +} + +func WrapQueryableErrors(err error) error { + if err == nil { + return err + } + + return QueryableError{err: err} +} diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index dfcb251803a..968f5cc66da 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -229,7 +229,7 @@ func TestMetricsQueryFuncErrors(t *testing.T) { failures := prometheus.NewCounter(prometheus.CounterOpts{}) mockFunc := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { - return promql.Vector{}, tc.returnedError + return promql.Vector{}, WrapQueryableErrors(tc.returnedError) } qf := MetricsQueryFunc(mockFunc, queries, failures)