Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: add initial symbol tables support #5906

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
17 changes: 16 additions & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ func registerQuery(app *extkingpin.App) {
maxConcurrentSelects := cmd.Flag("query.max-concurrent-select", "Maximum number of select requests made concurrently per a query.").
Default("4").Int()

maxConcurrentDecompressWorkers := cmd.Flag("query.max-concurrent-decompress-workers", "Maximum number of workers spawned to decompress a set of compressed storepb.Series. Setting this to higher than zero enables label compression during querying - CPU usage will be slightly higher whilst network usage will be significantly lower.").
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flag is a tiny bit confusing to me - it sounds like it configures the number of decompress workers, but in fact this also turns on / off the compression itself. It either sounds like two distinct flags would be better or somehow these configs should be combined, if it can be, in a reasonable way so that the flag name reflects it (but I struggle to find one).

Also I wonder if do not want to put this behind hidden flag straightway, a bit more documentation would be nice (but can be done in a separate PR).

Default("0").Int()

queryConnMetricLabels := cmd.Flag("query.conn-metric.label", "Optional selection of query connection metric labels to be collected from endpoint set").
Default(string(query.ExternalLabels), string(query.StoreType)).
Enums(string(query.ExternalLabels), string(query.StoreType))
Expand Down Expand Up @@ -321,6 +324,7 @@ func registerQuery(app *extkingpin.App) {
*queryTelemetrySamplesQuantiles,
*queryTelemetrySeriesQuantiles,
promqlEngineType(*promqlEngine),
*maxConcurrentDecompressWorkers,
)
})
}
Expand Down Expand Up @@ -397,6 +401,7 @@ func runQuery(
queryTelemetrySamplesQuantiles []int64,
queryTelemetrySeriesQuantiles []int64,
promqlEngine promqlEngineType,
maxConcurrentDecompressWorkers int,
) error {
if alertQueryURL == "" {
lastColon := strings.LastIndex(httpBindAddr, ":")
Expand Down Expand Up @@ -507,7 +512,16 @@ func runQuery(
endpointInfoTimeout,
queryConnMetricLabels...,
)
proxy = store.NewProxyStore(logger, reg, endpoints.GetStoreClients, component.Query, selectorLset, storeResponseTimeout, store.RetrievalStrategy(grpcProxyStrategy))
proxy = store.NewProxyStore(
logger,
reg,
endpoints.GetStoreClients,
component.Query,
selectorLset,
storeResponseTimeout,
store.RetrievalStrategy(grpcProxyStrategy),
maxConcurrentDecompressWorkers > 0,
)
rulesProxy = rules.NewProxy(logger, endpoints.GetRulesClients)
targetsProxy = targets.NewProxy(logger, endpoints.GetTargetsClients)
metadataProxy = metadata.NewProxy(logger, endpoints.GetMetricMetadataClients)
Expand All @@ -518,6 +532,7 @@ func runQuery(
proxy,
maxConcurrentSelects,
queryTimeout,
maxConcurrentDecompressWorkers,
)
)

Expand Down
1 change: 1 addition & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func runReceive(
labels.Labels{},
0,
store.LazyRetrieval,
false,
)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
Expand Down
7 changes: 7 additions & 0 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,13 @@ Flags:
If unset it will use the promql default of 5m.
--query.max-concurrent=20 Maximum number of queries processed
concurrently by query node.
--query.max-concurrent-decompress-workers=0
Maximum number of workers spawned to
decompress a set of compressed storepb.Series.
Setting this to higher than zero enables label
compression during querying - CPU usage will
be slightly higher whilst network usage will be
significantly lower.
--query.max-concurrent-select=4
Maximum number of select requests made
concurrently per a query.
Expand Down
6 changes: 3 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestQueryEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout, 0),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
Expand Down Expand Up @@ -733,7 +733,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout, 0),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
Expand All @@ -746,7 +746,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout, 0),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
Expand Down
Loading