Skip to content

Commit

Permalink
Fixed tests.
Browse files Browse the repository at this point in the history
Signed-off-by: bwplotka <[email protected]>
  • Loading branch information
bwplotka committed Dec 23, 2022
1 parent 6f26748 commit 3df92ac
Show file tree
Hide file tree
Showing 15 changed files with 212 additions and 157 deletions.
Binary file modified docs/img/globalsort-nonoptimized.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/img/globalsort-optimized.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
58 changes: 35 additions & 23 deletions docs/proposals-accepted/20221129-avoid-global-sort.md
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
## Avoid Global Sort on Querier Select

* **Owners:**
* @bwplotka, @fpetkovski
* @bwplotka, @fpetkovski

* **Related Tickets:**
* https://github.com/thanos-io/thanos/issues/5719
* https://github.com/thanos-io/thanos/commit/043c5bfcc2464d3ae7af82a1428f6e0d6510f020
* https://github.com/thanos-io/thanos/pull/5796 also alternatives (https://github.com/thanos-io/thanos/pull/5692)
* https://github.com/thanos-io/thanos/issues/5719
* https://github.com/thanos-io/thanos/commit/043c5bfcc2464d3ae7af82a1428f6e0d6510f020
* https://github.com/thanos-io/thanos/pull/5796 also alternatives (https://github.com/thanos-io/thanos/pull/5692)

> TL;DR: We propose solution that allows saving query and query_range latency on common setups when deduplication on and data replication. Initial benchmarks indicate ~20% latency improvement for data replicated 2 times.
>
> To make it work we propose adding field to Store API Series call "WithoutReplicaLabels []string", guarded by "SupportsWithoutReplicaLabels" field propagated via Info API. It allows telling store implementations to remove given labels (if they are replica labels) from result, preserving sorting by labels after the removal.
>
> NOTE: This change will break unlikely setups that deduplicate on non-replica label (misconfiguration or wrong setup).
>
> To make it work we propose adding field to Store API Series call "WithoutReplicaLabels []string", guarded by "SupportsWithoutReplicaLabels" field propagated via Info API. It allows telling store implementations to remove given labels (if they are replica labels) from result, preserving sorting by labels after the removal.
>
> NOTE: This change will break unlikely setups that deduplicate on non-replica label (misconfiguration or wrong setup).
## Glossary

**replica**: We use term "replica labels" as a subset of (or equal to) "external labels": Labels that indicate unique replication group for our data, usually taken from the metadata about origin/source.

## Why

Currently, we spent a lof of storage selection CPU time on resorting resulting time series needed for deduplication (exactly in [`sortDedupLabels`](https://github.com/thanos-io/thanos/blob/main/pkg/query/querier.go#L400)). However, given distributed effort and current sorting guarantees of StoreAPI there is potential to reduce sorting effort or/and distribute it to leafs or multiple threads.
Currently, we spent a lof of storage selection CPU time on resorting resulting time series needed for deduplication (exactly in [`sortDedupLabels`](https://github.com/thanos-io/thanos/blob/main/pkg/query/querier.go#L400)). However, given distributed effort and current sorting guarantees of StoreAPI there is potential to reduce sorting effort or/and distribute it to leafs or multiple threads.

### Pitfalls of the current solution

Expand All @@ -31,11 +31,11 @@ Current flow can be represented as follows:
1. Querier PromQL Engine selects data. At this point we know if users asked for deduplicated data or not and [what replica labels to use](https://thanos.io/tip/components/query.md/#deduplication-replica-labels).
2. Querier selection asks internal, in-process Store API which is represented by Proxy code component. It asks relevant store API for data, using StoreAPI.Series.
3. Responses are pulled and k-way merged by the time series. StoreAPI guarantees the responses are sorted by series and the external labels (including replica) are included in the time series.
* There was a [bug in receiver](https://github.com/thanos-io/thanos/commit/043c5bfcc2464d3ae7af82a1428f6e0d6510f020#diff-b3f73a54121d88de203946e84955da7027e3cfce7f0cd82580bf215ac57c02f4) that caused series to be not sorted when returned. Fixed in v0.29.0.
* There was a [bug in receiver](https://github.com/thanos-io/thanos/commit/043c5bfcc2464d3ae7af82a1428f6e0d6510f020#diff-b3f73a54121d88de203946e84955da7027e3cfce7f0cd82580bf215ac57c02f4) that caused series to be not sorted when returned. Fixed in v0.29.0.
4. Querier selection waits until all responses are buffered and then it deduplicates the data, given the requested replica labels. Before it's done it globally sort data with moving replica label at the end of the time series in `sortDedupLabels`.
5. Data is deduplicated using `dedup` package.

The pittfall is in the fact that global sort can be in many cases completely avoided, even when deduplication is enabled. Many storeAPIs can drop certain replica labels without need to resort and others can k-way merge different data sets without certain replica labels without extra effort.
The pittfall is in the fact that global sort can be in many cases completely avoided, even when deduplication is enabled. Many storeAPIs can drop certain replica labels without need to resort and others can k-way merge different data sets without certain replica labels without extra effort.

## Goals

Expand All @@ -44,12 +44,12 @@ Goals and use cases for the solution as proposed in [How](#how):
* Avoid expensive global sort of all series before passing them to PromQL engine in Querier.
* Allow StoreAPI implementation to announce if it supports sorting feature or not. The rationale is that we want to make it possible to create simpler StoreAPI servers, if operator wants to trade-off it with latency.
* Clear the behaviour in tricky cases when there is an overlap of replica labels between what's in TSDB vs what's attached as external labels.
* Ensure this change can be rolled out in compatible way.
* Ensure this change can be rolled out in compatible way.

## Non-Goals

* Allow consuming series in streamed way in PromQL engine.
* While this pitfall (global sort) blocks the above idea, it's currently still more beneficial to pull all series upfront (eager approach) as soon as possible. This is due to current PromQL architecture which requires info upfront for query planners and execution. We don't plan to change it yet, thus no need to push explicitly for that.
* While this pitfall (global sort) blocks the above idea, it's currently still more beneficial to pull all series upfront (eager approach) as soon as possible. This is due to current PromQL architecture which requires info upfront for query planners and execution. We don't plan to change it yet, thus no need to push explicitly for that.

## How

Expand All @@ -59,12 +59,12 @@ To understand proposal, let's go through important, yet perhaps not trivial, fac

* For StoreAPI or generally data that belongs to one replica, if you exclude certain replica label during sort, it does not impact sorting order for returned series. This means, any feature that desired different sort for replicated series is generally noop for sidecars, rules, single tenant receiver or within single block (or one stream of blocks).
* You can't stream sorting of unsorted data. Furthermore, it's not possible to detect that data is unsorted, unless we fetch and buffer all series.
* In v0.29 and below, you can deduplicate on any labels, including non replicas. This is assumed semantically wrong, yet someone might depend on it.
* In v0.29 and below, you can deduplicate on any labels, including non replicas. This is assumed semantically wrong, yet someone might depend on it.
* Thanos never handled overlap of chunks within one set of store API response.

### Solution

To avoid global sort, we propose removing required replica labels and sort on store API level.
To avoid global sort, we propose removing required replica labels and sort on store API level.

For the first step (which is required for compatibility purposes anyway), we propose a logic in proxy Store API implementation that when deduplication is requested with given replica labels will:

Expand All @@ -79,7 +79,7 @@ As the second step we propose adding `without_replica_labels` field to `SeriesRe
```protobuf
message SeriesRequest {
// ...
// without_replica_labels are replica labels which have to be excluded from series set results.
// The sorting requirement has to be preserved, so series should be sorted without those labels.
// If the requested label is NOT a replica label (labels that identify replication group) it should be not affected by
Expand Down Expand Up @@ -107,10 +107,9 @@ message StoreInfo {
}
```

Thanks of that implementations can optionally support this feature. We can make all Thanos StoreAPI support it, which will allow faster
deduplication queries on all types of setups.
Thanks of that implementations can optionally support this feature. We can make all Thanos StoreAPI support it, which will allow faster deduplication queries on all types of setups.

In the initial tests we see ~2x improvements on my test data (8M series block, requests for ~200k series) with querier and store gateway.
In the initial tests we see 60% improvements on my test data (8M series block, requests for ~200k series) with querier and store gateway.

Without this change:

Expand All @@ -120,10 +119,9 @@ After implementing this proposal:

![2](../img/globalsort-optimized.png)


## Alternatives

1. Version StoreAPI.
1. Version StoreAPI.

As a best practice gRPC services should be versioned. This should allow easier iterations for everybody implementing or using it. However, having multiple versions (vs extra feature enablement field) might make client side more complex, so we propose to postpone it.

Expand All @@ -133,7 +131,7 @@ Extra slice in all Series might feel redundant, given all series are always grou

3. Instead of removing some replica labels, just sort without them and leave at the end.

For debugging purposes we could keep the replica labels we want to dedup on at the end of label set.
For debugging purposes we could keep the replica labels we want to dedup on at the end of label set.

This might however be less clean way of providing better debuggability, which is not yet required.

Expand Down Expand Up @@ -161,11 +159,25 @@ Cons:
* Extra code and protobuf complexity
* Semantics of replica labels are hard to maintain when partial deduplication is configured (we only dedup by part of replica labels, not by all of them). This dynamic policy makes it hard to have clean response with separation of replica labels (i.e. should included replica labels be in "labels" or "replica labels")?

This might be not needed for now. We can add more awareness of replication later on.
This might be not needed for now. We can add more awareness of replication later on.

## Action Plan

The tasks to do in order to migrate to the new idea.

* [ ] Merging the PR with the proposal (also includes implementation)
* [ ] Add support for `without_replica_label` to other store API servers.
* [ ] Move to deduplicate over chunks from series See [TODO in querier.go:405](../../pkg/query/querier.go)

```go
// TODO(bwplotka): Move to deduplication on chunk level inside promSeriesSet, similar to what we have in dedup.NewDedupChunkMerger().
// This however require big refactor, caring about correct AggrChunk to iterator conversion, pushdown logic and counter reset apply.
// For now we apply simple logic that splits potential overlapping chunks into separate replica series, so we can split the work.
set := &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
set: dedup.NewOverlapSplit(newStoreSeriesSet(resp.seriesSet)),
aggrs: aggrs,
warns: warns,
}
```
1 change: 1 addition & 0 deletions internal/cortex/querier/queryrange/queryrange.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 22 additions & 3 deletions pkg/api/query/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"github.com/thanos-io/thanos/pkg/testutil/testpromcompatibility"
Expand Down Expand Up @@ -192,7 +193,7 @@ func TestQueryEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
Expand Down Expand Up @@ -642,6 +643,24 @@ func TestQueryEndpoints(t *testing.T) {
}
}

func newProxyStoreWithTSDBStore(db store.TSDBReader) *store.ProxyStore {
c := &storetestutil.TestClient{
Name: "1",
StoreClient: storepb.ServerAsClient(store.NewTSDBStore(nil, db, component.Query, nil), 0),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
}

return store.NewProxyStore(
nil,
nil,
func() []store.Client { return []store.Client{c} },
component.Query,
nil,
0,
store.EagerRetrieval,
)
}

func TestMetadataEndpoints(t *testing.T) {
var old = []labels.Labels{
{
Expand Down Expand Up @@ -733,7 +752,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
Expand All @@ -746,7 +765,7 @@ func TestMetadataEndpoints(t *testing.T) {
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
queryableCreate: query.NewQueryableCreator(nil, nil, store.NewTSDBStore(nil, db, component.Query, nil), 2, timeout),
queryableCreate: query.NewQueryableCreator(nil, nil, newProxyStoreWithTSDBStore(db), 2, timeout),
queryEngine: qe,
lookbackDeltaCreate: func(m int64) time.Duration { return time.Duration(0) },
gate: gate.New(nil, 4),
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (s *storeRef) SupportsSharding() bool {
return false
}

func (s *storeRef) SendsSortedSeries() bool {
func (s *storeRef) SupportsWithoutReplicaLabels() bool {
return false
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type querier struct {
mint, maxt int64
replicaLabels []string
storeDebugMatchers [][]*labels.Matcher
proxy storepb.StoreServer
proxy *store.ProxyStore
deduplicate bool
maxResolutionMillis int64
partialResponseStrategy storepb.PartialResponseStrategy
Expand All @@ -152,7 +152,7 @@ func newQuerier(
maxt int64,
replicaLabels []string,
storeDebugMatchers [][]*labels.Matcher,
proxy storepb.StoreServer,
proxy *store.ProxyStore,
deduplicate bool,
maxResolutionMillis int64,
partialResponse,
Expand Down
16 changes: 8 additions & 8 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil/teststore"
storetestutil "github.com/thanos-io/thanos/pkg/store/storepb/testutil"
)

type sample struct {
Expand Down Expand Up @@ -338,7 +338,7 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) {

for _, tcase := range []struct {
name string
storeAPI storepb.StoreServer
storeAPI *store.ProxyStore
replicaLabels []string // Replica label groups chunks by the label value and strips it from the final result.
hints *storage.SelectHints
equivalentQuery string
Expand All @@ -350,11 +350,11 @@ func TestQuerier_Select_AfterPromQL(t *testing.T) {
{
// Regression test 1 against https://github.com/thanos-io/thanos/issues/2890.
name: "when switching replicas don't miss samples when set with a big enough lookback delta",
storeAPI: func() storepb.StoreServer {
storeAPI: newProxyStore(func() storepb.StoreServer {
s, err := store.NewLocalStoreFromJSONMmappableFile(logger, component.Debug, nil, "./testdata/issue2890-seriesresponses.json", store.ScanGRPCCurlProtoStreamMessages)
testutil.Ok(t, err)
return s
}(),
}()),
equivalentQuery: `cluster_version{}`,
replicaLabels: []string{"replica"},
hints: &storage.SelectHints{
Expand Down Expand Up @@ -786,7 +786,7 @@ func TestQuerier_Select(t *testing.T) {
g,
timeout,
nil,
func(i storepb.SeriesStatsCounter) {},
NoopSeriesStatsReporter,
)
t.Cleanup(func() { testutil.Ok(t, q.Close()) })

Expand Down Expand Up @@ -837,7 +837,7 @@ func newProxyStore(storeAPIs ...storepb.StoreServer) *store.ProxyStore {
if srv, ok := s.(*testStoreServer); ok {
withoutReplicaLabelsEnabled = len(srv.respsWithoutReplicaLabels) > 0
}
cls[i] = &teststore.TestClient{
cls[i] = &storetestutil.TestClient{
Name: fmt.Sprintf("%v", i),
StoreClient: storepb.ServerAsClient(s, 0),
MinTime: math.MinInt64, MaxTime: math.MaxInt64,
Expand Down Expand Up @@ -1078,7 +1078,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 100 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, false, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), false, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down Expand Up @@ -1148,7 +1148,7 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {

timeout := 5 * time.Second
g := gate.New(2)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, s, true, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter)
q := newQuerier(context.Background(), logger, realSeriesWithStaleMarkerMint, realSeriesWithStaleMarkerMaxt, []string{"replica"}, nil, newProxyStore(s), true, 0, true, false, false, g, timeout, nil, NoopSeriesStatsReporter)
t.Cleanup(func() {
testutil.Ok(t, q.Close())
})
Expand Down
Loading

0 comments on commit 3df92ac

Please sign in to comment.