Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for TSDB selector in querier #7200

Merged
merged 9 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6867](https://github.com/thanos-io/thanos/pull/6867) Query UI: Tenant input box added to the Query UI, in order to be able to specify which tenant the query should use.
- [#7175](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--query.mode=distributed` which enables the new distributed mode of the Thanos query engine.
- [#7199](https://github.com/thanos-io/thanos/pull/7199): Reloader: Add support for watching and decompressing Prometheus configuration directories
- [#7200](https://github.com/thanos-io/thanos/pull/7175): Query: Add `--selector.relabel-config` and `--selector.relabel-config-file` flags which allows scoping the Querier to a subset of matched TSDBs.

### Changed

Expand Down
27 changes: 24 additions & 3 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

extflag "github.com/efficientgo/tools/extkingpin"
"google.golang.org/grpc"

"github.com/go-kit/log"
Expand All @@ -32,6 +33,7 @@ import (

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -208,6 +210,14 @@ func registerQuery(app *extkingpin.App) {
Default("1s"))

storeResponseTimeout := extkingpin.ModelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

storeSelectorRelabelConf := *extflag.RegisterPathOrContent(
cmd,
"selector.relabel-config",
"YAML with relabeling configuration that allows the Querier to select specific TSDBs by their external label. It follows native Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config ",
extflag.WithEnvSubstitution(),
)

reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd)

alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field.").String()
Expand Down Expand Up @@ -274,6 +284,15 @@ func registerQuery(app *extkingpin.App) {
level.Warn(logger).Log("msg", "different values for --web.route-prefix and --web.external-prefix detected, web UI may not work without a reverse-proxy.")
}

tsdbRelabelConfig, err := storeSelectorRelabelConf.Content()
if err != nil {
return errors.Wrap(err, "error while parsing tsdb selector configuration")
}
tsdbSelector, err := block.ParseRelabelConfig(tsdbRelabelConfig, block.SelectorSupportedRelabelActions)
if err != nil {
return err
}

return runQuery(
g,
logger,
Expand Down Expand Up @@ -343,6 +362,7 @@ func registerQuery(app *extkingpin.App) {
*defaultEngine,
storeRateLimits,
*extendedFunctionsEnabled,
store.NewTSDBSelector(tsdbSelector),
queryMode(*promqlQueryMode),
*tenantHeader,
*defaultTenant,
Expand Down Expand Up @@ -424,6 +444,7 @@ func runQuery(
defaultEngine string,
storeRateLimits store.SeriesSelectLimits,
extendedFunctionsEnabled bool,
tsdbSelector *store.TSDBSelector,
queryMode queryMode,
tenantHeader string,
defaultTenant string,
Expand Down Expand Up @@ -501,9 +522,9 @@ func runQuery(
dns.ResolverType(dnsSDResolver),
)

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
options := []store.ProxyStoreOption{
store.WithTSDBSelector(tsdbSelector),
store.WithProxyStoreDebugLogging(debugLogging),
}

var (
Expand Down
5 changes: 2 additions & 3 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,9 +317,8 @@ func runReceive(
return errors.Wrap(err, "setup gRPC server")
}

options := []store.ProxyStoreOption{}
if debugLogging {
options = append(options, store.WithProxyStoreDebugLogging())
options := []store.ProxyStoreOption{
store.WithProxyStoreDebugLogging(debugLogging),
}

proxy := store.NewProxyStore(
Expand Down
15 changes: 15 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,21 @@ Flags:
--selector-label=<name>="<value>" ...
Query selector labels that will be exposed in
info endpoint (repeated).
--selector.relabel-config=<content>
Alternative to 'selector.relabel-config-file'
flag (mutually exclusive). Content of YAML
with relabeling configuration that allows
the Querier to select specific TSDBs by their
external label. It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--selector.relabel-config-file=<file-path>
Path to YAML with relabeling configuration
that allows the Querier to select
specific TSDBs by their external label.
It follows native Prometheus
relabel-config syntax. See format details:
https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config
--store=<store> ... Deprecation Warning - This flag is deprecated
and replaced with `endpoint`. Addresses of
statically configured store API servers
Expand Down
28 changes: 23 additions & 5 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type ProxyStore struct {
metrics *proxyStoreMetrics
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
}

type proxyStoreMetrics struct {
Expand All @@ -111,10 +112,17 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
// BucketStoreOption are functions that configure BucketStore.
type ProxyStoreOption func(s *ProxyStore)

// WithProxyStoreDebugLogging enables debug logging.
func WithProxyStoreDebugLogging() ProxyStoreOption {
// WithProxyStoreDebugLogging toggles debug logging.
func WithProxyStoreDebugLogging(enable bool) ProxyStoreOption {
return func(s *ProxyStore) {
s.debugLogging = true
s.debugLogging = enable
}
}

// WithTSDBSelector sets the TSDB selector for the proxy.
func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
return func(s *ProxyStore) {
s.tsdbSelector = selector
}
}

Expand Down Expand Up @@ -147,6 +155,7 @@ func NewProxyStore(
responseTimeout: responseTimeout,
metrics: metrics,
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
}

for _, option := range options {
Expand Down Expand Up @@ -316,7 +325,10 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
ctx = metadata.AppendToOutgoingContext(ctx, tenancy.DefaultTenantHeader, tenant)
level.Debug(s.logger).Log("msg", "Tenant info in Series()", "tenant", tenant)

stores := []Client{}
var (
stores []Client
storeLabelSets []labels.Labels
)
for _, st := range s.stores() {
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok {
Expand All @@ -326,13 +338,19 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
continue
}

matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
continue
}
storeLabelSets = append(storeLabelSets, extraMatchers...)

stores = append(stores, st)
}

if len(stores) == 0 {
level.Debug(reqLogger).Log("err", ErrorNoStoresMatched, "stores", strings.Join(storeDebugMsgs, ";"))
return nil
}
r.Matchers = append(r.Matchers, MatchersForLabelSets(storeLabelSets)...)

storeResponses := make([]respSet, 0, len(stores))

Expand Down
131 changes: 128 additions & 3 deletions pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/tsdb"
Expand All @@ -30,6 +31,7 @@ import (

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -67,7 +69,7 @@ func TestProxyStore_Info(t *testing.T) {
nil,
func() []Client { return nil },
component.Query,
labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval),
labels.EmptyLabels(), 0*time.Second, EagerRetrieval,
)

resp, err := q.Info(ctx, &storepb.InfoRequest{})
Expand Down Expand Up @@ -120,6 +122,7 @@ func TestProxyStore_Series(t *testing.T) {
expectedSeries []rawSeries
expectedErr error
expectedWarningsLen int
relabelConfig string
}{
{
title: "no storeAPI available",
Expand Down Expand Up @@ -622,6 +625,123 @@ func TestProxyStore_Series(t *testing.T) {
},
},
},
{
title: "relabel config with flat store layout",
storeAPIs: []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "2"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "1"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{
{Name: "zone", Value: ".+", Type: storepb.LabelMatcher_RE},
},
},
relabelConfig: `
- source_labels: [ext]
action: hashmod
target_label: shard
modulus: 2
- action: keep
source_labels: [shard]
regex: 1
`,
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("zone", "1"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
},
},
{
title: "relabel config with nested store layout",
storeAPIs: []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1", "ext2", "2"), labels.FromStrings("ext", "2")},
StoreClient: storepb.ServerAsClient(NewProxyStore(log.NewNopLogger(), prometheus.NewRegistry(), func() []Client {
return []Client{
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "1", "ext2", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "1"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "2")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "2"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
}
}, component.Store, labels.FromStrings("role", "proxy"), 1*time.Minute, EagerRetrieval)),
},
&storetestutil.TestClient{
MinTime: 1,
MaxTime: 300,
ExtLset: []labels.Labels{labels.FromStrings("ext", "3")},
StoreClient: &mockedStoreAPI{
RespSeries: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("zone", "3"), []sample{{0, 0}, {2, 1}, {3, 2}}),
},
},
},
},
req: &storepb.SeriesRequest{
MinTime: 1,
MaxTime: 300,
Matchers: []storepb.LabelMatcher{
{Name: "zone", Value: ".+", Type: storepb.LabelMatcher_RE},
},
},
relabelConfig: `
- source_labels: [ext]
action: hashmod
target_label: shard
modulus: 2
- action: keep
source_labels: [shard]
regex: 1
`,
expectedSeries: []rawSeries{
{
lset: labels.FromStrings("zone", "1"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
{
lset: labels.FromStrings("zone", "3"),
chunks: [][]sample{{{0, 0}, {2, 1}, {3, 2}}},
},
},
},
} {
t.Run(tc.title, func(t *testing.T) {
for _, replicaLabelSupport := range []bool{false, true} {
Expand All @@ -632,12 +752,15 @@ func TestProxyStore_Series(t *testing.T) {
}
for _, strategy := range []RetrievalStrategy{EagerRetrieval, LazyRetrieval} {
t.Run(string(strategy), func(t *testing.T) {
relabelConfig, err := block.ParseRelabelConfig([]byte(tc.relabelConfig), block.SelectorSupportedRelabelActions)
testutil.Ok(t, err)
q := NewProxyStore(nil,
nil,
func() []Client { return tc.storeAPIs },
component.Query,
tc.selectorLabels,
5*time.Second, strategy,
WithTSDBSelector(NewTSDBSelector(relabelConfig)),
)

ctx := context.Background()
Expand All @@ -646,7 +769,7 @@ func TestProxyStore_Series(t *testing.T) {
}

s := newStoreSeriesServer(ctx)
err := q.Series(tc.req, s)
err = q.Series(tc.req, s)
if tc.expectedErr != nil {
testutil.NotOk(t, err)
testutil.Equals(t, tc.expectedErr.Error(), err.Error())
Expand Down Expand Up @@ -1565,7 +1688,7 @@ type rawSeries struct {
}

func seriesEquals(t *testing.T, expected []rawSeries, got []storepb.Series) {
testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n %v", got)
testutil.Equals(t, len(expected), len(got), "got unexpected number of series: \n want: %v \n got: %v", expected, got)

ret := make([]rawSeries, len(got))
for i, s := range got {
Expand Down Expand Up @@ -1900,6 +2023,7 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) {
metrics: newProxyStoreMetrics(nil),
responseTimeout: 5 * time.Second,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
}

var allResps []*storepb.SeriesResponse
Expand Down Expand Up @@ -2028,6 +2152,7 @@ func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) {
metrics: newProxyStoreMetrics(nil),
responseTimeout: 0,
retrievalStrategy: EagerRetrieval,
tsdbSelector: DefaultSelector,
}

t.Run("failling send", func(t *testing.T) {
Expand Down
Loading
Loading