From 78e9c862bfc2736e22355e230b43377e68fe46ea Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Sat, 9 Mar 2024 15:58:54 +0100 Subject: [PATCH] Add support for TSDB selector in querier This PR allows using the query distributed mode against a set of multi-tenant receivers as described in https://github.com/thanos-io/thanos/blob/main/docs/proposals-done/202301-distributed-query-execution.md#distributed-execution-against-receive-components. The feature is enabled by a selector.relabel-config flag in the Query component which allows it to select a subset of TSDBs to query based on their external labels. Signed-off-by: Filip Petkovski --- cmd/thanos/query.go | 22 +++++- cmd/thanos/receive.go | 5 +- cmd/thanos/rule.go | 1 + pkg/store/proxy.go | 28 ++++++-- pkg/store/proxy_test.go | 127 ++++++++++++++++++++++++++++++++- pkg/store/tsdb_selector.go | 83 +++++++++++++++++++++ test/e2e/e2ethanos/services.go | 11 ++- test/e2e/query_test.go | 63 ++++++++++++++++ 8 files changed, 326 insertions(+), 14 deletions(-) create mode 100644 pkg/store/tsdb_selector.go diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 811d9d2f740..57d043abe48 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -32,6 +32,7 @@ import ( apiv1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/discovery/cache" @@ -208,6 +209,8 @@ func registerQuery(app *extkingpin.App) { Default("1s")) storeResponseTimeout := extkingpin.ModelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms")) + storeSelectorRelabelConf := *extkingpin.RegisterSelectorRelabelFlags(cmd) + reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd) alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field.").String() @@ -274,6 +277,15 @@ func registerQuery(app *extkingpin.App) { level.Warn(logger).Log("msg", "different values for --web.route-prefix and --web.external-prefix detected, web UI may not work without a reverse-proxy.") } + tsdbRelabelConfig, err := storeSelectorRelabelConf.Content() + if err != nil { + return errors.Wrap(err, "error while parsing tsdb selector configuration") + } + tsdbSelector, err := block.ParseRelabelConfig(tsdbRelabelConfig, block.SelectorSupportedRelabelActions) + if err != nil { + return err + } + return runQuery( g, logger, @@ -343,6 +355,7 @@ func registerQuery(app *extkingpin.App) { *defaultEngine, storeRateLimits, *extendedFunctionsEnabled, + store.NewTSDBSelector(tsdbSelector), queryMode(*promqlQueryMode), *tenantHeader, *defaultTenant, @@ -424,6 +437,7 @@ func runQuery( defaultEngine string, storeRateLimits store.SeriesSelectLimits, extendedFunctionsEnabled bool, + tsdbSelector *store.TSDBSelector, queryMode queryMode, tenantHeader string, defaultTenant string, @@ -501,9 +515,9 @@ func runQuery( dns.ResolverType(dnsSDResolver), ) - options := []store.ProxyStoreOption{} - if debugLogging { - options = append(options, store.WithProxyStoreDebugLogging()) + options := []store.ProxyStoreOption{ + store.WithTSDBSelector(tsdbSelector), + store.WithProxyStoreDebugLogging(debugLogging), } var ( @@ -524,6 +538,7 @@ func runQuery( strictEndpoints, endpointGroupAddrs, strictEndpointGroups, + tsdbSelector, dialOpts, unhealthyStoreTimeout, endpointInfoTimeout, @@ -839,6 +854,7 @@ func prepareEndpointSet( strictEndpoints []string, endpointGroupAddrs []string, strictEndpointGroups []string, + tsdbSelector *store.TSDBSelector, dialOpts []grpc.DialOption, unhealthyStoreTimeout time.Duration, endpointInfoTimeout time.Duration, diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 17066d131fa..031b82abc62 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -317,9 +317,8 @@ func runReceive( return errors.Wrap(err, "setup gRPC server") } - options := []store.ProxyStoreOption{} - if debugLogging { - options = append(options, store.WithProxyStoreDebugLogging()) + options := []store.ProxyStoreOption{ + store.WithProxyStoreDebugLogging(debugLogging), } proxy := store.NewProxyStore( diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 1be8a8b406a..fa4322efcf0 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -436,6 +436,7 @@ func runRule( nil, nil, nil, + store.DefaultSelector, dialOpts, 5*time.Minute, 5*time.Second, diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 96a67f9b752..f57ec8a0d74 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -85,6 +85,7 @@ type ProxyStore struct { metrics *proxyStoreMetrics retrievalStrategy RetrievalStrategy debugLogging bool + tsdbSelector *TSDBSelector } type proxyStoreMetrics struct { @@ -111,10 +112,17 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(* // BucketStoreOption are functions that configure BucketStore. type ProxyStoreOption func(s *ProxyStore) -// WithProxyStoreDebugLogging enables debug logging. -func WithProxyStoreDebugLogging() ProxyStoreOption { +// WithProxyStoreDebugLogging toggles debug logging. +func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption { return func(s *ProxyStore) { - s.debugLogging = true + s.debugLogging = enable + } +} + +// WithTSDBSelector sets the TSDB selector for the proxy. +func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption { + return func(s *ProxyStore) { + s.tsdbSelector = selector } } @@ -147,6 +155,7 @@ func NewProxyStore( responseTimeout: responseTimeout, metrics: metrics, retrievalStrategy: retrievalStrategy, + tsdbSelector: DefaultSelector, } for _, option := range options { @@ -316,7 +325,10 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant) level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant) - stores := []Client{} + var ( + stores []Client + storeLabelSets []labels.Labels + ) for _, st := range s.stores() { // We might be able to skip the store if its meta information indicates it cannot have series matching our query. if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok { @@ -326,13 +338,19 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. continue } + keep, matchedLabelSets := s.tsdbSelector.MatchStore(st.LabelSets()...) + if !keep { + continue + } + + storeLabelSets = append(storeLabelSets, matchedLabelSets...) stores = append(stores, st) } - if len(stores) == 0 { level.Debug(reqLogger).Log("err", ErrorNoStoresMatched, "stores", strings.Join(storeDebugMsgs, ";")) return nil } + r.Matchers = append(r.Matchers, s.tsdbSelector.buildTSDBMatchers(storeLabelSets)...) storeResponses := make([]respSet, 0, len(stores)) diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index ad1f65f9888..c108ffeb919 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -20,6 +20,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/tsdb" @@ -30,6 +31,7 @@ import ( "github.com/efficientgo/core/testutil" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -67,7 +69,7 @@ func TestProxyStore_Info(t *testing.T) { nil, func() []Client { return nil }, component.Query, - labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval), + labels.EmptyLabels(), 0*time.Second, EagerRetrieval, ) resp, err := q.Info(ctx, &storepb.InfoRequest{}) @@ -120,6 +122,7 @@ func TestProxyStore_Series(t *testing.T) { expectedSeries []rawSeries expectedErr error expectedWarningsLen int + relabelConfig string }{ { title: "no storeAPI available", @@ -622,6 +625,123 @@ func TestProxyStore_Series(t *testing.T) { }, }, }, + { + title: "relabel config with flat store layout", + storeAPIs: []Client{ + &storetestutil.TestClient{ + MinTime: 1, + MaxTime: 300, + ExtLset: []labels.Labels{labels.FromStrings("ext", "2")}, + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("zone", "2"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + }, + &storetestutil.TestClient{ + MinTime: 1, + MaxTime: 300, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1")}, + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("zone", "1"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{ + {Name: "zone", Value: ".+", Type: storepb.LabelMatcher_RE}, + }, + }, + expectedSeries: []rawSeries{ + { + lset: labels.FromStrings("zone", "1"), + chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}}, + }, + }, + relabelConfig: ` + - source_labels: [ext] + action: hashmod + target_label: shard + modulus: 2 + - action: keep + source_labels: [shard] + regex: 1 + `, + }, + { + title: "relabel config with nested store layout", + storeAPIs: []Client{ + &storetestutil.TestClient{ + MinTime: 1, + MaxTime: 300, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1", "ext2", "2"), labels.FromStrings("ext", "2")}, + StoreClient: storepb.ServerAsClient(NewProxyStore(log.NewNopLogger(), prometheus.NewRegistry(), func() []Client { + return []Client{ + &storetestutil.TestClient{ + MinTime: 1, + MaxTime: 300, + ExtLset: []labels.Labels{labels.FromStrings("ext", "1", "ext2", "2")}, + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("zone", "1"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + }, + &storetestutil.TestClient{ + MinTime: 1, + MaxTime: 300, + ExtLset: []labels.Labels{labels.FromStrings("ext", "2")}, + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("zone", "2"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + }, + } + }, component.Store, labels.FromStrings("role", "proxy"), 1*time.Minute, EagerRetrieval), 1), + }, + &storetestutil.TestClient{ + MinTime: 1, + MaxTime: 300, + ExtLset: []labels.Labels{labels.FromStrings("ext", "3")}, + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("zone", "3"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + }, + }, + req: &storepb.SeriesRequest{ + MinTime: 1, + MaxTime: 300, + Matchers: []storepb.LabelMatcher{ + {Name: "zone", Value: ".+", Type: storepb.LabelMatcher_RE}, + }, + }, + expectedSeries: []rawSeries{ + { + lset: labels.FromStrings("zone", "1"), + chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}}, + }, + { + lset: labels.FromStrings("zone", "3"), + chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}}, + }, + }, + relabelConfig: ` + - source_labels: [ext] + action: hashmod + target_label: shard + modulus: 2 + - action: keep + source_labels: [shard] + regex: 1 + `, + }, } { t.Run(tc.title, func(t *testing.T) { for _, replicaLabelSupport := range []bool{false, true} { @@ -632,12 +752,15 @@ func TestProxyStore_Series(t *testing.T) { } for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} { t.Run(string(strategy), func(t *testing.T) { + relabelConfig, err := block.ParseRelabelConfig([]byte(tc.relabelConfig), block.SelectorSupportedRelabelActions) + testutil.Ok(t, err) q := NewProxyStore(nil, nil, func() []Client { return tc.storeAPIs }, component.Query, tc.selectorLabels, 5*time.Second, strategy, + WithTSDBSelector(NewTSDBSelector(relabelConfig)), ) ctx := context.Background() @@ -646,7 +769,7 @@ func TestProxyStore_Series(t *testing.T) { } s := newStoreSeriesServer(ctx) - err := q.Series(tc.req, s) + err = q.Series(tc.req, s) if tc.expectedErr != nil { testutil.NotOk(t, err) testutil.Equals(t, tc.expectedErr.Error(), err.Error()) diff --git a/pkg/store/tsdb_selector.go b/pkg/store/tsdb_selector.go new file mode 100644 index 00000000000..72364054834 --- /dev/null +++ b/pkg/store/tsdb_selector.go @@ -0,0 +1,83 @@ +package store + +import ( + "strings" + + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/model/relabel" + + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +const ( + reMatchAny = "^$" +) + +var DefaultSelector = noopSelector() + +type TSDBSelector struct { + relabelConfig []*relabel.Config +} + +func noopSelector() *TSDBSelector { + return NewTSDBSelector(nil) +} + +func NewTSDBSelector(relabelConfig []*relabel.Config) *TSDBSelector { + return &TSDBSelector{ + relabelConfig: relabelConfig, + } +} + +func (sr *TSDBSelector) MatchStore(labelSets ...labels.Labels) (bool, []labels.Labels) { + if sr.relabelConfig == nil || len(labelSets) == 0 { + return true, labelSets + } + matchedLabelSets := sr.runRelabelRules(labelSets) + return len(matchedLabelSets) > 0, matchedLabelSets +} + +func (sr *TSDBSelector) runRelabelRules(labelSets []labels.Labels) []labels.Labels { + result := make([]labels.Labels, 0) + for _, labelSet := range labelSets { + if _, keep := relabel.Process(labelSet, sr.relabelConfig...); !keep { + continue + } + + result = append(result, labelSet) + } + + return result +} + +func (sr *TSDBSelector) buildTSDBMatchers(labelSets []labels.Labels) []storepb.LabelMatcher { + labelCounts := make(map[string]int) + matcherSet := make(map[string]map[string]struct{}) + for _, labelSet := range labelSets { + for _, lbl := range labelSet { + if _, ok := matcherSet[lbl.Name]; !ok { + matcherSet[lbl.Name] = make(map[string]struct{}) + } + labelCounts[lbl.Name]++ + matcherSet[lbl.Name][lbl.Value] = struct{}{} + } + } + + for k := range matcherSet { + if labelCounts[k] < len(labelSets) { + matcherSet[k][reMatchAny] = struct{}{} + } + } + + matchers := make([]storepb.LabelMatcher, 0, len(matcherSet)) + for k, v := range matcherSet { + var matchedValues []string + for val := range v { + matchedValues = append(matchedValues, val) + } + matcher := storepb.LabelMatcher{Type: storepb.LabelMatcher_RE, Name: k, Value: strings.Join(matchedValues, "|")} + matchers = append(matchers, matcher) + } + + return matchers +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 07141a4ac7d..56c2e52f040 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -268,7 +268,8 @@ type QuerierBuilder struct { enforceTenancy bool e2e.Linkable - f e2e.FutureRunnable + f e2e.FutureRunnable + relabelConfig string } func NewQuerierBuilder(e e2e.Environment, name string, storeAddresses ...string) *QuerierBuilder { @@ -398,6 +399,11 @@ func (q *QuerierBuilder) WithTenancy(enforceTenancy bool) *QuerierBuilder { return q } +func (q *QuerierBuilder) WithSelectorRelabelConfig(relabelConfig string) *QuerierBuilder { + q.relabelConfig = relabelConfig + return q +} + func (q *QuerierBuilder) Init() *e2eobs.Observable { args, err := q.collectArgs() if err != nil { @@ -507,6 +513,9 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { if q.engine != "" { args = append(args, "--query.promql-engine="+string(q.engine)) } + if q.relabelConfig != "" { + args = append(args, "--selector.relabel-config="+q.relabelConfig) + } return args, nil } diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 3fa09041720..78b92c3eacd 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -2427,3 +2427,66 @@ func TestQueryTenancyEnforcement(t *testing.T) { return reflect.DeepEqual(res, expected) }) } + +func TestQuerySelectWithRelabel(t *testing.T) { + t.Parallel() + + timeNow := time.Now().UnixNano() + + ts := []struct { + samples []fakeMetricSample + query string + result model.Vector + relabelConfig string + }{ + { + query: `my_fake_metric`, + samples: []fakeMetricSample{{"i1", 1, timeNow}}, + result: []*model.Sample{ + { + Metric: map[model.LabelName]model.LabelValue{"__name__": "my_fake_metric", "instance": "i1", "prometheus": "p1", "replica": "0"}, + Value: 1, + }, + }, + relabelConfig: ` + - source_labels: [prometheus] + regex: p1 + action: keep + `, + }, + } + + for _, tc := range ts { + t.Run(tc.query, func(t *testing.T) { + e, err := e2e.NewDockerEnvironment("pushdown-dedup") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + prom1, sidecar1 := e2ethanos.NewPrometheusWithSidecar(e, "p1", e2ethanos.DefaultPromConfig("p1", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") + testutil.Ok(t, e2e.StartAndWaitReady(prom1, sidecar1)) + + prom2, sidecar2 := e2ethanos.NewPrometheusWithSidecar(e, "p2", e2ethanos.DefaultPromConfig("p2", 0, "", ""), "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") + testutil.Ok(t, e2e.StartAndWaitReady(prom2, sidecar2)) + + endpoints := []string{ + sidecar1.InternalEndpoint("grpc"), + sidecar2.InternalEndpoint("grpc"), + } + q := e2ethanos. + NewQuerierBuilder(e, "1", endpoints...). + WithSelectorRelabelConfig(tc.relabelConfig). + Init() + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + t.Cleanup(cancel) + + testutil.Ok(t, synthesizeFakeMetricSamples(ctx, prom1, tc.samples)) + testutil.Ok(t, synthesizeFakeMetricSamples(ctx, prom2, tc.samples)) + + testQuery := func() string { return tc.query } + queryAndAssert(t, ctx, q.Endpoint("http"), testQuery, time.Now, promclient.QueryOptions{}, tc.result) + }) + } +}