From b91b8ecf70d85348e25395109c4e43619617cdc7 Mon Sep 17 00:00:00 2001 From: Saswata Mukherjee Date: Mon, 16 Oct 2023 19:07:40 +0530 Subject: [PATCH] Proxy hints from StoreGW to Querier Signed-off-by: Saswata Mukherjee Signed-off-by: bazooka3000 --- cmd/thanos/store.go | 2 +- examples/interactive/interactive_test.go | 2 +- pkg/api/query/v1.go | 5 --- pkg/query/querier.go | 46 +++++++++++++++--------- pkg/store/proxy.go | 8 +---- 5 files changed, 32 insertions(+), 31 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 73e4b838ddc..8f41c36e8bc 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -404,7 +404,7 @@ func runStore( conf.blockSyncConcurrency, conf.advertiseCompatibilityLabel, conf.postingOffsetsInMemSampling, - false, + true, conf.lazyIndexReaderEnabled, conf.lazyIndexReaderIdleTimeout, options..., diff --git a/examples/interactive/interactive_test.go b/examples/interactive/interactive_test.go index b5c7cb63463..9ea00b1bf5f 100644 --- a/examples/interactive/interactive_test.go +++ b/examples/interactive/interactive_test.go @@ -106,7 +106,7 @@ func createData() (perr error) { // TestReadOnlyThanosSetup runs read only Thanos setup that has data from `maxTimeFresh - 2w` to `maxTimeOld`, with extra monitoring and tracing for full playground experience. // Run with test args `-timeout 9999m`. func TestReadOnlyThanosSetup(t *testing.T) { - t.Skip("This is interactive test - it will run until you will kill it or curl 'finish' endpoint. Comment and run as normal test to use it!") + // t.Skip("This is interactive test - it will run until you will kill it or curl 'finish' endpoint. Comment and run as normal test to use it!") // Create series of TSDB blocks. Cache them to 'data' dir so we don't need to re-create on every run. _, err := os.Stat(data) diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 0db6f62657b..fc01b714a09 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -295,12 +295,9 @@ type queryData struct { Stats stats.QueryStats `json:"stats,omitempty"` // Additional Thanos Response field. QueryAnalysis queryTelemetry `json:"analysis,omitempty"` - QueryMetadata query.QueryMetadata `json:"metadata,omitempty"` Warnings []error `json:"warnings,omitempty"` } - - type queryTelemetry struct { // TODO(saswatamcode): Replace with engine.TrackedTelemetry once it has exported fields. // TODO(saswatamcode): Add aggregate fields to enrich data. @@ -555,7 +552,6 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api. ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) var seriesStats []storepb.SeriesStatsCounter - var QueryMetadata QueryMetadata qry, err := engine.NewInstantQuery( ctx, qapi.queryableCreate( @@ -568,7 +564,6 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api. false, shardInfo, query.NewAggregateStatsReporter(&seriesStats), - QueryMetadata, ), promql.NewPrometheusQueryOpts(false, lookbackDelta), r.FormValue("query"), diff --git a/pkg/query/querier.go b/pkg/query/querier.go index a70e6d2c798..f1f94b29ab7 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -5,11 +5,13 @@ package query import ( "context" + "fmt" "strings" "sync" "time" "github.com/go-kit/log" + "github.com/gogo/protobuf/types" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" @@ -23,6 +25,7 @@ import ( "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/hintspb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" @@ -41,13 +44,6 @@ func NewAggregateStatsReporter(stats *[]storepb.SeriesStatsCounter) seriesStatsR } } -type QueryMetadata struct { - StoreResults map[string]store.StoreAPIResult - //minTime time.Time - //maxTime time.Time - //cpuUsage -} - // QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints. // If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default. // When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifying @@ -64,7 +60,6 @@ type QueryableCreator func( skipChunks bool, shardInfo *storepb.ShardInfo, seriesStatsReporter seriesStatsReporter, - QueryMetadata QueryMetadata, ) storage.Queryable // NewQueryableCreator creates QueryableCreator. @@ -88,7 +83,6 @@ func NewQueryableCreator( skipChunks bool, shardInfo *storepb.ShardInfo, seriesStatsReporter seriesStatsReporter, - nil, ) storage.Queryable { return &queryable{ logger: logger, @@ -107,7 +101,6 @@ func NewQueryableCreator( enableQueryPushdown: enableQueryPushdown, shardInfo: shardInfo, seriesStatsReporter: seriesStatsReporter, - QueryMetadata: QueryMetadata{}, } } } @@ -127,7 +120,6 @@ type queryable struct { enableQueryPushdown bool shardInfo *storepb.ShardInfo seriesStatsReporter seriesStatsReporter - QueryMetadata QueryMetadata } // Querier returns a new storage querier against the underlying proxy store API. @@ -212,6 +204,7 @@ type seriesServer struct { storepb.Store_SeriesServer ctx context.Context + hints []hintspb.QueryStats seriesSet []storepb.Series seriesSetStats storepb.SeriesStatsCounter warnings annotations.Annotations @@ -229,6 +222,16 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error { return nil } + if r.GetHints() != nil { + tmp := hintspb.SeriesResponseHints{} + if err := types.UnmarshalAny(r.GetHints(), &tmp); err != nil { + return err + } + fmt.Println(tmp) + + s.hints = append(s.hints, *tmp.QueryStats) + } + // Unsupported field, skip. return nil } @@ -316,7 +319,7 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn") defer span.Finish() //add an mapping aggr function to collect metadata - set, stats, err := q.selectFn(ctx, hints, ms...) + set, stats, _, err := q.selectFn(ctx, hints, ms...) if err != nil { promise <- storage.ErrSeriesSet(err) return @@ -339,10 +342,10 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints }} } -func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, error) { +func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, []hintspb.QueryStats, error) { sms, err := storepb.PromMatchersToMatchers(ms...) if err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers") + return nil, storepb.SeriesStatsCounter{}, nil, errors.Wrap(err, "convert matchers") } aggrs := aggrsFromFunc(hints.Func) @@ -372,8 +375,17 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . req.WithoutReplicaLabels = q.replicaLabels } + reqHints := hintspb.SeriesRequestHints{ + EnableQueryStats: true, + } + anyHints, err := types.MarshalAny(&reqHints) + if err != nil { + return nil, storepb.SeriesStatsCounter{}, nil, errors.Wrap(err, "marshaling hints") + } + req.Hints = anyHints + if err := q.proxy.Series(&req, resp); err != nil { - return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()") + return nil, storepb.SeriesStatsCounter{}, nil, errors.Wrap(err, "proxy Series()") } warns := annotations.New().Merge(resp.warnings) @@ -400,7 +412,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . set: newStoreSeriesSet(resp.seriesSet), aggrs: aggrs, warns: warns, - }, resp.seriesSetStats, nil + }, resp.seriesSetStats, resp.hints, nil } // TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger(). @@ -414,7 +426,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms . warns: warns, } - return dedup.NewSeriesSet(set, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil + return dedup.NewSeriesSet(set, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, resp.hints, nil } // LabelValues returns all potential values for a label name. diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 6e364da809f..dde676189df 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -73,13 +73,6 @@ type Client interface { Addr() (addr string, isLocalClient bool) } -type StoreAPIResult struct { - Series int - Chunks int - Labels int - Duration time.Duration -} - // ProxyStore implements the store API that proxies request to all given underlying stores. type ProxyStore struct { logger log.Logger @@ -308,6 +301,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. PartialResponseStrategy: originalRequest.PartialResponseStrategy, ShardInfo: originalRequest.ShardInfo, WithoutReplicaLabels: originalRequest.WithoutReplicaLabels, + Hints: originalRequest.Hints, } // We may arrive here either via the promql engine