Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tenant federation implementation of exemplar query interfaces #927

Merged
merged 4 commits into from
Feb 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@
* User config size (`-alertmanager.max-config-size-bytes`)
* Templates count in user config (`-alertmanager.max-templates-count`)
* Max template size (`-alertmanager.max-template-size-bytes`)
* [FEATURE] Querier: Added support for tenant federation to exemplar endpoints. #927
* [ENHANCEMENT] Query-frontend: added `cortex_query_frontend_workers_enqueued_requests_total` metric to track the number of requests enqueued in each query-scheduler. #384
* [ENHANCEMENT] Add a flag (`--proxy.compare-use-relative-error`) in the query-tee to compare floating point values using relative error. #208
* [ENHANCEMENT] Add a flag (`--proxy.compare-skip-recent-samples`) in the query-tee to skip comparing recent samples. By default samples not older than 1 minute are skipped. #234
Expand Down
5 changes: 5 additions & 0 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Dur
return c.DoGetBody(addr)
}

// QueryExemplars runs an exemplar query.
func (c *Client) QueryExemplars(query string, start, end time.Time) ([]promv1.ExemplarQueryResult, error) {
return c.querierClient.QueryExemplars(context.Background(), query, start, end)
}

// QuerierAddress returns the address of the querier
func (c *Client) QuerierAddress() string {
return c.querierAddress
Expand Down
8 changes: 7 additions & 1 deletion integration/querier_tenant_federation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon
"-frontend.results-cache.backend": "memcached",
"-frontend.results-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-tenant-federation.enabled": "true",
"-ingester.max-global-exemplars-per-user": "10000",
})

// Start the query-scheduler if enabled.
Expand Down Expand Up @@ -149,14 +150,19 @@ func runQuerierTenantFederationTest(t *testing.T, cfg querierTenantFederationCon

assert.Equal(t, mergeResults(tenantIDs, expectedVectors), result.(model.Vector))

// query exemplars for all tenants
exemplars, err := c.QueryExemplars("series_1", now.Add(-1*time.Hour), now.Add(1*time.Hour))
require.NoError(t, err)
assert.Len(t, exemplars, numUsers)

// ensure a push to multiple tenants is failing
series, _ := generateSeries("series_1", now)
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 500, res.StatusCode)

// check metric label values for total queries in the query frontend
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_query_frontend_queries_total"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "user", strings.Join(tenantIDs, "|")),
labels.MustNewMatcher(labels.MatchEqual, "op", "query"))))

Expand Down
11 changes: 6 additions & 5 deletions pkg/mimir/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,9 @@ func (t *Mimir) initTenantFederation() (serv services.Service, err error) {
// Make sure the mergeQuerier is only used for request with more than a
// single tenant. This allows for a less impactful enabling of tenant
// federation.
const byPassForSingleQuerier = true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, byPassForSingleQuerier, util_log.Logger))
const bypassForSingleQuerier = true
t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, bypassForSingleQuerier, util_log.Logger))
t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, bypassForSingleQuerier, util_log.Logger)
}
return nil, nil
}
Expand Down Expand Up @@ -533,12 +534,12 @@ func (t *Mimir) initRuler() (serv services.Service, err error) {
if !t.Cfg.TenantFederation.Enabled {
return nil, errors.New("-ruler.tenant-federation.enabled=true requires -tenant-federation.enabled=true")
}
// Setting byPassForSingleQuerier=false forces `tenantfederation.NewQueryable` to add
// Setting bypassForSingleQuerier=false forces `tenantfederation.NewQueryable` to add
// the `__tenant_id__` label on all metrics regardless if they're for a single tenant or multiple tenants.
// This makes this label more consistent and hopefully less confusing to users.
const byPassForSingleQuerier = false
const bypassForSingleQuerier = false

federatedQueryable = tenantfederation.NewQueryable(queryable, byPassForSingleQuerier, util_log.Logger)
federatedQueryable = tenantfederation.NewQueryable(queryable, bypassForSingleQuerier, util_log.Logger)
}

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, federatedQueryable, eng, t.Overrides, prometheus.DefaultRegisterer)
Expand Down
224 changes: 224 additions & 0 deletions pkg/querier/tenantfederation/merge_exemplar_queryable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// SPDX-License-Identifier: AGPL-3.0-only

package tenantfederation

import (
"context"
"fmt"

"github.com/go-kit/log"
"github.com/grafana/dskit/concurrency"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/weaveworks/common/user"

"github.com/grafana/mimir/pkg/tenant"
"github.com/grafana/mimir/pkg/util/spanlogger"
)

// NewExemplarQueryable returns an exemplar queryable that makes requests for
// all tenant IDs that are part of the request and aggregates the results from
// each tenant's ExemplarQuerier.
//
// Results contain the series label __tenant_id__ to identify which tenant the
// exemplar is from.
//
// By setting bypassWithSingleQuerier to true, tenant federation logic gets
// bypassed if the request is only for a single tenant. The requests will also
// not contain the pseudo series label __tenant_id__ in this case.
func NewExemplarQueryable(upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, logger log.Logger) storage.ExemplarQueryable {
return NewMergeExemplarQueryable(defaultTenantLabel, upstream, bypassWithSingleQuerier, logger)
replay marked this conversation as resolved.
Show resolved Hide resolved
}

// NewMergeExemplarQueryable returns an exemplar queryable that makes requests for
// all tenant IDs that are part of the request and aggregates the results from
// each tenant's ExemplarQuerier.
//
// Results contain the series label `idLabelName` to identify which tenant the
// exemplar is from.
//
// By setting bypassWithSingleQuerier to true, tenant federation logic gets
// bypassed if the request is only for a single tenant. The requests will also
// not contain the pseudo series label `idLabelName` in this case.
func NewMergeExemplarQueryable(idLabelName string, upstream storage.ExemplarQueryable, bypassWithSingleQuerier bool, logger log.Logger) storage.ExemplarQueryable {
return &mergeExemplarQueryable{
logger: logger,
idLabelName: idLabelName,
bypassWithSingleQuerier: bypassWithSingleQuerier,
upstream: upstream,
}
}

type mergeExemplarQueryable struct {
logger log.Logger
idLabelName string
bypassWithSingleQuerier bool
upstream storage.ExemplarQueryable
}

// tenantsAndQueriers returns a list of tenant IDs and corresponding queriers based on the context
func (m *mergeExemplarQueryable) tenantsAndQueriers(ctx context.Context) ([]string, []storage.ExemplarQuerier, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, nil, err
}

queriers := make([]storage.ExemplarQuerier, len(tenantIDs))
for i, tenantID := range tenantIDs {
q, err := m.upstream.ExemplarQuerier(user.InjectOrgID(ctx, tenantID))
if err != nil {
return nil, nil, err
}

queriers[i] = q
}

return tenantIDs, queriers, nil
}

// ExemplarQuerier returns a new querier that aggregates results from queries run
// across multiple tenants
func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) {
ids, queriers, err := m.tenantsAndQueriers(ctx)
if err != nil {
return nil, err
}

// If desired and there is only a single querier, just return it directly instead
// of going through the federation querier. bypassWithSingleQuerier=true allows a
// bit less overhead when it's not needed while bypassWithSingleQuerier=false will
// consistently add a __tenant_id__ label to all results.
if m.bypassWithSingleQuerier && len(queriers) == 1 {
return queriers[0], nil
}

return &mergeExemplarQuerier{
logger: m.logger,
ctx: ctx,
idLabelName: m.idLabelName,
tenants: ids,
queriers: queriers,
}, nil
}

type exemplarJob struct {
tenant string
matchers [][]*labels.Matcher
querier storage.ExemplarQuerier
}

type mergeExemplarQuerier struct {
logger log.Logger
ctx context.Context
idLabelName string
tenants []string
queriers []storage.ExemplarQuerier
}

// Select returns the union exemplars within the time range that match each slice of
// matchers, across multiple tenants. The query for each tenant is forwarded to an
// instance of an upstream querier.
func (m *mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) {
spanlog, ctx := spanlogger.NewWithLogger(m.ctx, m.logger, "mergeExemplarQuerier.Select")
defer spanlog.Finish()

// If we have any matchers that are looking for __tenant_id__, use that to filter down the
// original list of tenants given to this querier and then remove those matchers from the list
// that will be passed to each querier. We do this because the label isn't an actual label on
// the series/exemplars, it's a generated label we add when performing the query.
filteredTenants, filteredMatchers := filterTenantsAndRewriteMatchers(m.idLabelName, m.tenants, matchers)

// In order to run a query for each tenant in parallel and have them all write to the same
// structure, we create a slice of jobs and results of the same size. Each job writes to its
// corresponding slot in the results slice. This way we avoid the need for locks.
jobs := make([]*exemplarJob, len(filteredTenants))
results := make([][]exemplar.QueryResult, len(filteredTenants))

jobIdx := 0
for idIdx, tenantID := range m.tenants {
replay marked this conversation as resolved.
Show resolved Hide resolved
// Upstream queriers are indexed corresponding to the original, non-filtered, IDs
// given to this querier. Iterate over the original list of tenant IDs but skip if
// this tenant ID got filtered out. Otherwise, use the index of this tenant ID to
// pick the corresponding querier.
if _, matched := filteredTenants[tenantID]; !matched {
continue
}

// Each job gets a copy of the matchers provided to this method so that upstream
// queriers don't modify our slice of filtered matchers. Otherwise, each querier
// might modify the slice and end up with an ever-growing list of matchers.
jobMatchers := make([][]*labels.Matcher, len(filteredMatchers))
copy(jobMatchers, filteredMatchers)

jobs[jobIdx] = &exemplarJob{
tenant: tenantID,
matchers: jobMatchers,
querier: m.queriers[idIdx],
}

jobIdx++
}

// Each task grabs a job object from the slice and stores its results in the corresponding
// index in the results slice. The job handles performing a tenant-specific exemplar query
// and adding a tenant ID label to each of the results.
run := func(ctx context.Context, idx int) error {
job := jobs[idx]

res, err := job.querier.Select(start, end, job.matchers...)
if err != nil {
return fmt.Errorf("unable to run federated exemplar query for %s: %w", job.tenant, err)
}

for i, e := range res {
e.SeriesLabels = setLabelsRetainExisting(e.SeriesLabels, labels.Label{
Name: m.idLabelName,
Value: job.tenant,
})

res[i] = e
}

results[idx] = res
return nil
}

err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run)
if err != nil {
return nil, err
}

var out []exemplar.QueryResult
for _, exemplars := range results {
out = append(out, exemplars...)
}

return out, nil
}

func filterTenantsAndRewriteMatchers(idLabelName string, ids []string, allMatchers [][]*labels.Matcher) (map[string]struct{}, [][]*labels.Matcher) {
// If there are no potential matchers by which we could filter down the list of tenants that
// we're getting exemplars for, just return the full set and provided matchers verbatim.
if len(allMatchers) == 0 {
return sliceToSet(ids), allMatchers
}

outIds := make(map[string]struct{})
outMatchers := make([][]*labels.Matcher, len(allMatchers))

// The ExemplarQuerier.Select method accepts a slice of slices of matchers. The matchers within
// a single slice are AND'd together (intersection) and each outer slice is OR'd together (union).
// In order to support that, we start with a set of 0 tenant IDs and add any tenant IDs that remain
// after filtering (based on the inner slice of matchers), for each outer slice.
for i, matchers := range allMatchers {
filteredIds, unrelatedMatchers := filterValuesByMatchers(idLabelName, ids, matchers...)
56quarters marked this conversation as resolved.
Show resolved Hide resolved
replay marked this conversation as resolved.
Show resolved Hide resolved
for k := range filteredIds {
outIds[k] = struct{}{}
}

outMatchers[i] = unrelatedMatchers
}

return outIds, outMatchers
}
Loading