Skip to content

Commit

Permalink
Add support for TSDB selector in querier
Browse files Browse the repository at this point in the history
This PR allows using the query distributed mode against a set of multi-tenant receivers
as described in https://github.com/thanos-io/thanos/blob/main/docs/proposals-done/202301-distributed-query-execution.md#distributed-execution-against-receive-components.

The feature is enabled by a selector.relabel-config flag in the Query component
which allows it to select a subset of TSDBs to query based on their external labels.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Mar 9, 2024
1 parent 5910ed6 commit 78e9c86
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 14 deletions.
22 changes: 19 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -208,6 +209,8 @@ func registerQuery(app *extkingpin.App) {
Default("1s"))

storeResponseTimeout := extkingpin.ModelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))
storeSelectorRelabelConf := *extkingpin.RegisterSelectorRelabelFlags(cmd)

reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd)

alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field.").String()
Expand Down Expand Up @@ -274,6 +277,15 @@ func registerQuery(app *extkingpin.App) {
level.Warn(logger).Log("msg", "different values for --web.route-prefix and --web.external-prefix detected, web UI may not work without a reverse-proxy.")
}

tsdbRelabelConfig, err := storeSelectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "error while parsing tsdb selector configuration")
}
tsdbSelector, err := block.ParseRelabelConfig(tsdbRelabelConfig, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}

return runQuery(
g,
logger,
Expand Down Expand Up @@ -343,6 +355,7 @@ func registerQuery(app *extkingpin.App) {
*defaultEngine,
storeRateLimits,
*extendedFunctionsEnabled,
store.NewTSDBSelector(tsdbSelector),
queryMode(*promqlQueryMode),
*tenantHeader,
*defaultTenant,
Expand Down Expand Up @@ -424,6 +437,7 @@ func runQuery(
defaultEngine string,
storeRateLimits store.SeriesSelectLimits,
extendedFunctionsEnabled bool,
tsdbSelector *store.TSDBSelector,
queryMode queryMode,
tenantHeader string,
defaultTenant string,
Expand Down Expand Up @@ -501,9 +515,9 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
options := []store.ProxyStoreOption{
store.WithTSDBSelector(tsdbSelector),
store.WithProxyStoreDebugLogging(debugLogging),
}

var (
Expand All @@ -524,6 +538,7 @@ func runQuery(
strictEndpoints,
endpointGroupAddrs,
strictEndpointGroups,
tsdbSelector,
dialOpts,
unhealthyStoreTimeout,
endpointInfoTimeout,
Expand Down Expand Up @@ -839,6 +854,7 @@ func prepareEndpointSet(
strictEndpoints []string,
endpointGroupAddrs []string,
strictEndpointGroups []string,
tsdbSelector *store.TSDBSelector,

Check failure on line 857 in cmd/thanos/query.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

`prepareEndpointSet` - `tsdbSelector` is unused (unparam)
dialOpts []grpc.DialOption,
unhealthyStoreTimeout time.Duration,
endpointInfoTimeout time.Duration,
Expand Down
5 changes: 2 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,8 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
}

proxy := store.NewProxyStore(
Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ func runRule(
nil,
nil,
nil,
store.DefaultSelector,
dialOpts,
5*time.Minute,
5*time.Second,
Expand Down
28 changes: 23 additions & 5 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type ProxyStore struct {
metrics *proxyStoreMetrics
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
}

type proxyStoreMetrics struct {
Expand All @@ -111,10 +112,17 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
// BucketStoreOption are functions that configure BucketStore.
type ProxyStoreOption func(s *ProxyStore)

// WithProxyStoreDebugLogging enables debug logging.
func WithProxyStoreDebugLogging() ProxyStoreOption {
// WithProxyStoreDebugLogging toggles debug logging.
func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption {
return func(s *ProxyStore) {
s.debugLogging = true
s.debugLogging = enable
}
}

// WithTSDBSelector sets the TSDB selector for the proxy.
func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
return func(s *ProxyStore) {
s.tsdbSelector = selector
}
}

Expand Down Expand Up @@ -147,6 +155,7 @@ func NewProxyStore(
responseTimeout: responseTimeout,
metrics: metrics,
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
}

for _, option := range options {
Expand Down Expand Up @@ -316,7 +325,10 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant)
level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant)

stores := []Client{}
var (
stores []Client
storeLabelSets []labels.Labels
)
for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok {
Expand All @@ -326,13 +338,19 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
continue
}

keep, matchedLabelSets := s.tsdbSelector.MatchStore(st.LabelSets()...)
if !keep {
continue
}

storeLabelSets = append(storeLabelSets, matchedLabelSets...)
stores = append(stores, st)
}

if len(stores) == 0 {
level.Debug(reqLogger).Log("err", ErrorNoStoresMatched, "stores", strings.Join(storeDebugMsgs, ";"))
return nil
}
r.Matchers = append(r.Matchers, s.tsdbSelector.buildTSDBMatchers(storeLabelSets)...)

storeResponses := make([]respSet, 0, len(stores))

Expand Down
127 changes: 125 additions & 2 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb"
Expand All @@ -30,6 +31,7 @@ import (

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -67,7 +69,7 @@ func TestProxyStore_Info(t *testing.T) {
nil,
func() []Client { return nil },
component.Query,
labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval),
labels.EmptyLabels(), 0*time.Second, EagerRetrieval,
)

resp, err := q.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -120,6 +122,7 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries []rawSeries
expectedErr error
expectedWarningsLen int
relabelConfig string
}{
{
title: "no storeAPI available",
Expand Down Expand Up @@ -622,6 +625,123 @@ func TestProxyStore_Series(t *testing.T) {
},
},
},
{
title: "relabel config with flat store layout",
storeAPIs: []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "2"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "1"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{
{Name: "zone", Value: ".+", Type: storepb.LabelMatcher_RE},
},
},
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("zone", "1"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
},
relabelConfig: `
- source_labels: [ext]
action: hashmod
target_label: shard
modulus: 2
- action: keep
source_labels: [shard]
regex: 1
`,
},
{
title: "relabel config with nested store layout",
storeAPIs: []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1", "ext2", "2"), labels.FromStrings("ext", "2")},
StoreClient: storepb.ServerAsClient(NewProxyStore(log.NewNopLogger(), prometheus.NewRegistry(), func() []Client {
return []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1", "ext2", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "1"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "2"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
}
}, component.Store, labels.FromStrings("role", "proxy"), 1*time.Minute, EagerRetrieval), 1),
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "3")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "3"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{
{Name: "zone", Value: ".+", Type: storepb.LabelMatcher_RE},
},
},
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("zone", "1"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
{
lset: labels.FromStrings("zone", "3"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
},
relabelConfig: `
- source_labels: [ext]
action: hashmod
target_label: shard
modulus: 2
- action: keep
source_labels: [shard]
regex: 1
`,
},
} {
t.Run(tc.title, func(t *testing.T) {
for _, replicaLabelSupport := range []bool{false, true} {
Expand All @@ -632,12 +752,15 @@ func TestProxyStore_Series(t *testing.T) {
}
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
t.Run(string(strategy), func(t *testing.T) {
relabelConfig, err := block.ParseRelabelConfig([]byte(tc.relabelConfig), block.SelectorSupportedRelabelActions)
testutil.Ok(t, err)
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
5*time.Second, strategy,
WithTSDBSelector(NewTSDBSelector(relabelConfig)),
)

ctx := context.Background()
Expand All @@ -646,7 +769,7 @@ func TestProxyStore_Series(t *testing.T) {
}

s := newStoreSeriesServer(ctx)
err := q.Series(tc.req, s)
err = q.Series(tc.req, s)
if tc.expectedErr != nil {
testutil.NotOk(t, err)
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
Expand Down
Loading

0 comments on commit 78e9c86

Please sign in to comment.