Skip to content

Commit

Permalink
Proxy hints from StoreGW to Querier
Browse files Browse the repository at this point in the history
Signed-off-by: Saswata Mukherjee <[email protected]>
Signed-off-by: bazooka3000 <[email protected]>
  • Loading branch information
saswatamcode authored and bazooka3000 committed Nov 19, 2023
1 parent 7bee2d0 commit b91b8ec
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func runStore(
conf.blockSyncConcurrency,
conf.advertiseCompatibilityLabel,
conf.postingOffsetsInMemSampling,
false,
true,
conf.lazyIndexReaderEnabled,
conf.lazyIndexReaderIdleTimeout,
options...,
Expand Down
2 changes: 1 addition & 1 deletion examples/interactive/interactive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func createData() (perr error) {
// TestReadOnlyThanosSetup runs read only Thanos setup that has data from `maxTimeFresh - 2w` to `maxTimeOld`, with extra monitoring and tracing for full playground experience.
// Run with test args `-timeout 9999m`.
func TestReadOnlyThanosSetup(t *testing.T) {
t.Skip("This is interactive test - it will run until you will kill it or curl 'finish' endpoint. Comment and run as normal test to use it!")
// t.Skip("This is interactive test - it will run until you will kill it or curl 'finish' endpoint. Comment and run as normal test to use it!")

// Create series of TSDB blocks. Cache them to 'data' dir so we don't need to re-create on every run.
_, err := os.Stat(data)
Expand Down
5 changes: 0 additions & 5 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,12 +295,9 @@ type queryData struct {
Stats stats.QueryStats `json:"stats,omitempty"`
// Additional Thanos Response field.
QueryAnalysis queryTelemetry `json:"analysis,omitempty"`
QueryMetadata query.QueryMetadata `json:"metadata,omitempty"`
Warnings []error `json:"warnings,omitempty"`
}



type queryTelemetry struct {
// TODO(saswatamcode): Replace with engine.TrackedTelemetry once it has exported fields.
// TODO(saswatamcode): Add aggregate fields to enrich data.
Expand Down Expand Up @@ -555,7 +552,6 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api.
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)

var seriesStats []storepb.SeriesStatsCounter
var QueryMetadata QueryMetadata
qry, err := engine.NewInstantQuery(
ctx,
qapi.queryableCreate(
Expand All @@ -568,7 +564,6 @@ func (qapi *QueryAPI) queryExplain(r *http.Request) (interface{}, []error, *api.
false,
shardInfo,
query.NewAggregateStatsReporter(&seriesStats),
QueryMetadata,
),
promql.NewPrometheusQueryOpts(false, lookbackDelta),
r.FormValue("query"),
Expand Down
46 changes: 29 additions & 17 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ package query

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/gogo/protobuf/types"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

Expand All @@ -23,6 +25,7 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/hintspb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
Expand All @@ -41,13 +44,6 @@ func NewAggregateStatsReporter(stats *[]storepb.SeriesStatsCounter) seriesStatsR
}
}

type QueryMetadata struct {
StoreResults map[string]store.StoreAPIResult
//minTime time.Time
//maxTime time.Time
//cpuUsage
}

// QueryableCreator returns implementation of promql.Queryable that fetches data from the proxy store API endpoints.
// If deduplication is enabled, all data retrieved from it will be deduplicated along all replicaLabels by default.
// When the replicaLabels argument is not empty it overwrites the global replicaLabels flag. This allows specifying
Expand All @@ -64,7 +60,6 @@ type QueryableCreator func(
skipChunks bool,
shardInfo *storepb.ShardInfo,
seriesStatsReporter seriesStatsReporter,
QueryMetadata QueryMetadata,
) storage.Queryable

// NewQueryableCreator creates QueryableCreator.
Expand All @@ -88,7 +83,6 @@ func NewQueryableCreator(
skipChunks bool,
shardInfo *storepb.ShardInfo,
seriesStatsReporter seriesStatsReporter,
nil,
) storage.Queryable {
return &queryable{
logger: logger,
Expand All @@ -107,7 +101,6 @@ func NewQueryableCreator(
enableQueryPushdown: enableQueryPushdown,
shardInfo: shardInfo,
seriesStatsReporter: seriesStatsReporter,
QueryMetadata: QueryMetadata{},
}
}
}
Expand All @@ -127,7 +120,6 @@ type queryable struct {
enableQueryPushdown bool
shardInfo *storepb.ShardInfo
seriesStatsReporter seriesStatsReporter
QueryMetadata QueryMetadata
}

// Querier returns a new storage querier against the underlying proxy store API.
Expand Down Expand Up @@ -212,6 +204,7 @@ type seriesServer struct {
storepb.Store_SeriesServer
ctx context.Context

hints []hintspb.QueryStats
seriesSet []storepb.Series
seriesSetStats storepb.SeriesStatsCounter
warnings annotations.Annotations
Expand All @@ -229,6 +222,16 @@ func (s *seriesServer) Send(r *storepb.SeriesResponse) error {
return nil
}

if r.GetHints() != nil {
tmp := hintspb.SeriesResponseHints{}
if err := types.UnmarshalAny(r.GetHints(), &tmp); err != nil {
return err
}
fmt.Println(tmp)

s.hints = append(s.hints, *tmp.QueryStats)
}

// Unsupported field, skip.
return nil
}
Expand Down Expand Up @@ -316,7 +319,7 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints
span, ctx := tracing.StartSpan(ctx, "querier_select_select_fn")
defer span.Finish()
//add an mapping aggr function to collect metadata
set, stats, err := q.selectFn(ctx, hints, ms...)
set, stats, _, err := q.selectFn(ctx, hints, ms...)
if err != nil {
promise <- storage.ErrSeriesSet(err)
return
Expand All @@ -339,10 +342,10 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints
}}
}

func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, error) {
func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms ...*labels.Matcher) (storage.SeriesSet, storepb.SeriesStatsCounter, []hintspb.QueryStats, error) {
sms, err := storepb.PromMatchersToMatchers(ms...)
if err != nil {
return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "convert matchers")
return nil, storepb.SeriesStatsCounter{}, nil, errors.Wrap(err, "convert matchers")
}

aggrs := aggrsFromFunc(hints.Func)
Expand Down Expand Up @@ -372,8 +375,17 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
req.WithoutReplicaLabels = q.replicaLabels
}

reqHints := hintspb.SeriesRequestHints{
EnableQueryStats: true,
}
anyHints, err := types.MarshalAny(&reqHints)
if err != nil {
return nil, storepb.SeriesStatsCounter{}, nil, errors.Wrap(err, "marshaling hints")
}
req.Hints = anyHints

if err := q.proxy.Series(&req, resp); err != nil {
return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()")
return nil, storepb.SeriesStatsCounter{}, nil, errors.Wrap(err, "proxy Series()")
}
warns := annotations.New().Merge(resp.warnings)

Expand All @@ -400,7 +412,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
set: newStoreSeriesSet(resp.seriesSet),
aggrs: aggrs,
warns: warns,
}, resp.seriesSetStats, nil
}, resp.seriesSetStats, resp.hints, nil
}

// TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger().
Expand All @@ -414,7 +426,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
warns: warns,
}

return dedup.NewSeriesSet(set, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil
return dedup.NewSeriesSet(set, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, resp.hints, nil
}

// LabelValues returns all potential values for a label name.
Expand Down
8 changes: 1 addition & 7 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,6 @@ type Client interface {
Addr() (addr string, isLocalClient bool)
}

type StoreAPIResult struct {
Series int
Chunks int
Labels int
Duration time.Duration
}

// ProxyStore implements the store API that proxies request to all given underlying stores.
type ProxyStore struct {
logger log.Logger
Expand Down Expand Up @@ -308,6 +301,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
PartialResponseStrategy: originalRequest.PartialResponseStrategy,
ShardInfo: originalRequest.ShardInfo,
WithoutReplicaLabels: originalRequest.WithoutReplicaLabels,
Hints: originalRequest.Hints,
}

// We may arrive here either via the promql engine
Expand Down

0 comments on commit b91b8ec

Please sign in to comment.