Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dedup-ready label sorting in stores #5796

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-community/promql-engine/engine"

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -734,10 +735,11 @@ func runQuery(
if httpProbe.IsReady() {
mint, maxt := proxy.TimeRange()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
SendsSortedSeriesWithoutLabels: true,
}
}
return nil
Expand Down
9 changes: 5 additions & 4 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,11 @@ func runReceive(
if httpProbe.IsReady() {
minTime, maxTime := mts.TimeRange()
return &infopb.StoreInfo{
MinTime: minTime,
MaxTime: maxTime,
SupportsSharding: true,
SendsSortedSeries: true,
MinTime: minTime,
MaxTime: maxTime,
SupportsSharding: true,
SendsSortedSeries: true,
SendsSortedSeriesWithoutLabels: true,
}
}
return nil
Expand Down
9 changes: 5 additions & 4 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,10 +607,11 @@ func runRule(
if httpProbe.IsReady() {
mint, maxt := tsdbStore.TimeRange()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
SendsSortedSeriesWithoutLabels: true,
}
}
return nil
Expand Down
9 changes: 5 additions & 4 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,11 @@ func runSidecar(
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
SendsSortedSeriesWithoutLabels: true,
}
}
return nil
Expand Down
9 changes: 5 additions & 4 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,11 @@ func runStore(
if httpProbe.IsReady() {
mint, maxt := bs.TimeRange()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
SendsSortedSeriesWithoutLabels: true,
}
}
return nil
Expand Down
114 changes: 76 additions & 38 deletions pkg/info/infopb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/info/infopb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,13 @@ message StoreInfo {
int64 min_time = 1;
int64 max_time = 2;
bool supports_sharding = 3;
// TODO(fpetkovski): Remove in v1.0
bool sends_sorted_series = 4;
// TODO(fpetkovski): Remove in v1.0
bool sends_sorted_series_without_labels = 5;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a new field here because sends_sorted_series does not mean the same as sending series sorted without certain labels.

}


// RulesInfo holds the metadata related to Rules API exposed by the component.
message RulesInfo {
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,17 @@ func (er *endpointRef) SendsSortedSeries() bool {
return er.metadata.Store.SendsSortedSeries
}

func (er *endpointRef) SendsSeriesSortedForDedup() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

if er.metadata == nil || er.metadata.Store == nil {
return false
}

return er.metadata.Store.SendsSortedSeriesWithoutLabels
}

func (er *endpointRef) String() string {
mint, maxt := er.TimeRange()
return fmt.Sprintf(
Expand Down
4 changes: 4 additions & 0 deletions pkg/query/internal/test-storeset-pre-v0.8.0/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func (s *storeRef) SendsSortedSeries() bool {
return false
}

func (s *storeRef) SendsSeriesSortedForDedup() bool {
return false
}

func (s *storeRef) String() string {
mint, maxt := s.TimeRange()
return fmt.Sprintf(
Expand Down
48 changes: 11 additions & 37 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package query

import (
"context"
"sort"
"strings"
"sync"
"time"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -131,7 +129,8 @@ type querier struct {
logger log.Logger
cancel func()
mint, maxt int64
replicaLabels map[string]struct{}
replicaLabels []string
replicaLabelSet map[string]struct{}
storeDebugMatchers [][]*labels.Matcher
proxy storepb.StoreServer
deduplicate bool
Expand Down Expand Up @@ -183,7 +182,8 @@ func newQuerier(

mint: mint,
maxt: maxt,
replicaLabels: rl,
replicaLabels: replicaLabels,
replicaLabelSet: rl,
storeDebugMatchers: storeDebugMatchers,
proxy: proxy,
deduplicate: deduplicate,
Expand All @@ -197,7 +197,7 @@ func newQuerier(
}

func (q *querier) isDedupEnabled() bool {
return q.deduplicate && len(q.replicaLabels) > 0
return q.deduplicate && len(q.replicaLabelSet) > 0
}

type seriesServer struct {
Expand Down Expand Up @@ -348,6 +348,10 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
queryHints = storeHintsFromPromHints(hints)
}

replicaLabels := q.replicaLabels
if !q.isDedupEnabled() {
replicaLabels = nil
}
if err := q.proxy.Series(&storepb.SeriesRequest{
MinTime: hints.Start,
MaxTime: hints.End,
Expand All @@ -360,6 +364,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
SkipChunks: q.skipChunks,
Step: hints.Step,
Range: hints.Range,
SortWithoutLabels: replicaLabels,
}, resp); err != nil {
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
return nil, storepb.SeriesStatsCounter{}, errors.Wrap(err, "proxy Series()")
}
Expand Down Expand Up @@ -396,8 +401,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
}, resp.seriesSetStats, nil
}

// TODO(fabxc): this could potentially pushed further down into the store API to make true streaming possible.
sortDedupLabels(resp.seriesSet, q.replicaLabels)
set := &promSeriesSet{
mint: q.mint,
maxt: q.maxt,
Expand All @@ -408,36 +411,7 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .

// The merged series set assembles all potentially-overlapping time ranges of the same series into a single one.
// TODO(bwplotka): We could potentially dedup on chunk level, use chunk iterator for that when available.
return dedup.NewSeriesSet(set, q.replicaLabels, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil
}

// sortDedupLabels re-sorts the set so that the same series with different replica
// labels are coming right after each other.
func sortDedupLabels(set []storepb.Series, replicaLabels map[string]struct{}) {
for _, s := range set {
// Move the replica labels to the very end.
sort.Slice(s.Labels, func(i, j int) bool {
if _, ok := replicaLabels[s.Labels[i].Name]; ok {
return false
}
if _, ok := replicaLabels[s.Labels[j].Name]; ok {
return true
}
// Ensure that dedup marker goes just right before the replica labels.
if s.Labels[i].Name == dedup.PushdownMarker.Name {
return false
}
if s.Labels[j].Name == dedup.PushdownMarker.Name {
return true
}
return s.Labels[i].Name < s.Labels[j].Name
})
}
// With the re-ordered label sets, re-sorting all series aligns the same series
// from different replicas sequentially.
sort.Slice(set, func(i, j int) bool {
return labels.Compare(labelpb.ZLabelsToPromLabels(set[i].Labels), labelpb.ZLabelsToPromLabels(set[j].Labels)) < 0
})
return dedup.NewSeriesSet(set, q.replicaLabelSet, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil
}

// LabelValues returns all potential values for a label name.
Expand Down
Loading