Skip to content

Commit

Permalink
Sort series set fully in stores
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Oct 19, 2022
1 parent b5bc292 commit 64650f4
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 328 deletions.
16 changes: 14 additions & 2 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import (
"github.com/prometheus/prometheus/discovery/targetgroup"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"google.golang.org/grpc"

v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-community/promql-engine/engine"
"google.golang.org/grpc"

apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
Expand Down Expand Up @@ -733,11 +734,22 @@ func runQuery(
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
mint, maxt := proxy.TimeRange()

clients := endpoints.GetStoreClients()

sortedSeries := true
for _, cl := range clients {
if !cl.SendsSortedSeries() {
sortedSeries = false
break
}
}

return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SendsSortedSeries: true,
SendsSortedSeries: sortedSeries,
}
}
return nil
Expand Down
31 changes: 0 additions & 31 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package query

import (
"context"
"sort"
"strings"
"sync"
"time"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
)
Expand Down Expand Up @@ -416,35 +414,6 @@ func (q *querier) selectFn(ctx context.Context, hints *storage.SelectHints, ms .
return dedup.NewSeriesSet(set, q.replicaLabelSet, hints.Func, q.enableQueryPushdown), resp.seriesSetStats, nil
}

// sortDedupLabels re-sorts the set so that the same series with different replica
// labels are coming right after each other.
func sortDedupLabels(set []storepb.Series, replicaLabels map[string]struct{}) {
for _, s := range set {
// Move the replica labels to the very end.
sort.Slice(s.Labels, func(i, j int) bool {
if _, ok := replicaLabels[s.Labels[i].Name]; ok {
return false
}
if _, ok := replicaLabels[s.Labels[j].Name]; ok {
return true
}
// Ensure that dedup marker goes just right before the replica labels.
if s.Labels[i].Name == dedup.PushdownMarker.Name {
return false
}
if s.Labels[j].Name == dedup.PushdownMarker.Name {
return true
}
return s.Labels[i].Name < s.Labels[j].Name
})
}
// With the re-ordered label sets, re-sorting all series aligns the same series
// from different replicas sequentially.
sort.Slice(set, func(i, j int) bool {
return labels.Compare(labelpb.ZLabelsToPromLabels(set[i].Labels), labelpb.ZLabelsToPromLabels(set[j].Labels)) < 0
})
}

// LabelValues returns all potential values for a label name.
func (q *querier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
span, ctx := tracing.StartSpan(q.ctx, "querier_label_values")
Expand Down
66 changes: 0 additions & 66 deletions pkg/query/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/prometheus/prometheus/util/gate"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/dedup"
"github.com/thanos-io/thanos/pkg/receive"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -1016,71 +1015,6 @@ func TestQuerierWithDedupUnderstoodByPromQL_Rate(t *testing.T) {
})
}

func TestSortReplicaLabel(t *testing.T) {
tests := []struct {
input []storepb.Series
exp []storepb.Series
dedupLabels map[string]struct{}
}{
// 0 Single deduplication label.
{
input: []storepb.Series{
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "c", Value: "3"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "c", Value: "4"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-2"}, {Name: "c", Value: "3"}}},
},
exp: []storepb.Series{
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "b", Value: "replica-1"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "b", Value: "replica-2"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}, {Name: "b", Value: "replica-1"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: "b", Value: "replica-1"}}},
},
dedupLabels: map[string]struct{}{"b": {}},
},
// 1 Multi deduplication labels.
{
input: []storepb.Series{
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "b1", Value: "replica-1"}, {Name: "c", Value: "3"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "b1", Value: "replica-1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "b1", Value: "replica-1"}, {Name: "c", Value: "4"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-2"}, {Name: "b1", Value: "replica-2"}, {Name: "c", Value: "3"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-2"}, {Name: "c", Value: "3"}}},
},
exp: []storepb.Series{
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "b", Value: "replica-1"}, {Name: "b1", Value: "replica-1"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "b", Value: "replica-2"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "b", Value: "replica-2"}, {Name: "b1", Value: "replica-2"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}, {Name: "b", Value: "replica-1"}, {Name: "b1", Value: "replica-1"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: "b", Value: "replica-1"}, {Name: "b1", Value: "replica-1"}}},
},
dedupLabels: map[string]struct{}{"b": {}, "b1": {}},
},
// Pushdown label at the end.
{
input: []storepb.Series{
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "c", Value: "3"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-1"}, {Name: "c", Value: "4"}, {Name: dedup.PushdownMarker.Name, Value: dedup.PushdownMarker.Value}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "b", Value: "replica-2"}, {Name: "c", Value: "3"}}},
},
exp: []storepb.Series{
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "b", Value: "replica-1"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "b", Value: "replica-2"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "3"}, {Name: "d", Value: "4"}, {Name: "b", Value: "replica-1"}}},
{Labels: []labelpb.ZLabel{{Name: "a", Value: "1"}, {Name: "c", Value: "4"}, {Name: dedup.PushdownMarker.Name, Value: dedup.PushdownMarker.Value}, {Name: "b", Value: "replica-1"}}},
},
dedupLabels: map[string]struct{}{"b": {}},
},
}
for _, test := range tests {
t.Run("", func(t *testing.T) {
sortDedupLabels(test.input, test.dedupLabels)
testutil.Equals(t, test.exp, test.input)
})
}
}

const hackyStaleMarker = float64(-99999999)

func expandSeries(t testing.TB, it chunkenc.Iterator) (res []sample) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/query/query_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ func testSelect(t testutil.TB, q *querier, expectedSeries []labels.Labels, match
}
testutil.Ok(t, iter.Err())
}

fmt.Println(gotSeries)
testutil.Equals(t, expectedSeries, gotSeries)
}
testutil.Ok(t, ss.Err())
Expand Down
2 changes: 1 addition & 1 deletion pkg/receive/multitsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (l *localClient) SupportsSharding() bool {
}

func (l *localClient) SendsSortedSeries() bool {
return true
return false
}

type tenant struct {
Expand Down
69 changes: 21 additions & 48 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,6 @@ func blockSeries(
skipChunks bool, // If true, chunks are not loaded.
minTime, maxTime int64, // Series must have data in this time range to be returned.
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks.
replicaLabels map[string]struct{},
shardMatcher *storepb.ShardMatcher,
emptyPostingsCount prometheus.Counter,
) (storepb.SeriesSet, *queryStats, error) {
Expand Down Expand Up @@ -847,7 +846,6 @@ func blockSeries(
if !shardMatcher.MatchesLabels(completeLabelset) {
continue
}
sortLabelsForDedup(completeLabelset, replicaLabels)

s := seriesEntry{}
s.lset = completeLabelset
Expand Down Expand Up @@ -877,11 +875,6 @@ func blockSeries(

res = append(res, s)
}
// With re-sort all series in order to align the same series
// from different replicas sequentially.
sort.Slice(res, func(i, j int) bool {
return labels.Compare(res[i].lset, res[j].lset) < 0
})

if skipChunks {
return newBucketSeriesSet(res), indexr.stats, nil
Expand All @@ -894,23 +887,6 @@ func blockSeries(
return newBucketSeriesSet(res), indexr.stats.merge(chunkr.stats), nil
}

func sortLabelsForDedup(labelSet labels.Labels, replicaLabels map[string]struct{}) {
if len(replicaLabels) == 0 {
return
}

sort.Slice(labelSet, func(i, j int) bool {
if _, ok := replicaLabels[labelSet[i].Name]; ok {
return false
}
if _, ok := replicaLabels[labelSet[j].Name]; ok {
return true
}

return labelSet[i].Name < labelSet[j].Name
})
}

func populateChunk(out *storepb.AggrChunk, in chunkenc.Chunk, aggrs []storepb.Aggr, save func([]byte) ([]byte, error)) error {
if in.Encoding() == chunkenc.EncXOR {
b, err := save(in.Bytes())
Expand Down Expand Up @@ -1050,6 +1026,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
)

sortedSeriesSrv := newSortedSeriesServer(srv, req.ReplicaLabelSet(), false)
if req.Hints != nil {
reqHints := &hintspb.SeriesRequestHints{}
if err := types.UnmarshalAny(req.Hints, reqHints); err != nil {
Expand All @@ -1062,8 +1039,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
}
}

resHints.SeriesSorted = true
replicaLabelSet := req.ReplicaLabelSet()
s.mtx.RLock()
for _, bs := range s.blockSets {
blockMatchers, ok := bs.labelMatchers(matchers...)
Expand Down Expand Up @@ -1119,7 +1094,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
req.SkipChunks,
req.MinTime, req.MaxTime,
req.Aggregates,
replicaLabelSet,
shardMatcher,
s.metrics.emptyPostingCount,
)
Expand Down Expand Up @@ -1188,23 +1162,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.seriesGetAllDuration.Observe(stats.GetAllDuration.Seconds())
s.metrics.seriesBlocksQueried.Observe(float64(stats.blocksQueried))
}

// Send response hints at the beginning of the stream. This helps the querier know very early in the
// streaming process whether a global sort on the entire data set is required.
if s.enableSeriesResponseHints {
var anyHints *types.Any

if anyHints, err = types.MarshalAny(resHints); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "marshal series response hints").Error())
return
}

if err = srv.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error())
return
}
}

// Merge the sub-results from each selected block.
tracing.DoInSpan(ctx, "bucket_store_merge_all", func(ctx context.Context) {
begin := time.Now()
Expand All @@ -1227,7 +1184,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
s.metrics.chunkSizeBytes.Observe(float64(chunksSize(series.Chunks)))
}
series.Labels = labelpb.ZLabelsFromPromLabels(lset)
if err = srv.Send(storepb.NewSeriesResponse(&series)); err != nil {
if err = sortedSeriesSrv.Send(storepb.NewSeriesResponse(&series)); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
return
}
Expand All @@ -1242,7 +1199,25 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie
err = nil
})

return err
if s.enableSeriesResponseHints {
var anyHints *types.Any

if anyHints, err = types.MarshalAny(resHints); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "marshal series response hints").Error())
return
}

if err = sortedSeriesSrv.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil {
err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error())
return
}
}

if err != nil {
return err
}

return sortedSeriesSrv.Flush()
}

func chunksSize(chks []storepb.AggrChunk) (size int) {
Expand Down Expand Up @@ -1345,7 +1320,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
req.End,
nil,
nil,
nil,
s.metrics.emptyPostingCount,
)
if err != nil {
Expand Down Expand Up @@ -1514,7 +1488,6 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
req.End,
nil,
nil,
nil,
s.metrics.emptyPostingCount,
)
if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1624,7 +1624,6 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
SeriesSorted: true,
},
},
}, {
Expand All @@ -1643,7 +1642,6 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
{Id: block1.String()},
{Id: block2.String()},
},
SeriesSorted: true,
},
},
}, {
Expand All @@ -1666,7 +1664,6 @@ func TestSeries_RequestAndResponseHints(t *testing.T) {
QueriedBlocks: []hintspb.Block{
{Id: block1.String()},
},
SeriesSorted: true,
},
},
},
Expand Down Expand Up @@ -2505,7 +2502,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
indexReader := blk.indexReader()
chunkReader := blk.chunkReader()

seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, nil, dummyCounter)
seriesSet, _, err := blockSeries(ctx, nil, indexReader, chunkReader, matchers, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, req.Aggregates, nil, dummyCounter)
testutil.Ok(b, err)

// Ensure at least 1 series has been returned (as expected).
Expand Down
Loading

0 comments on commit 64650f4

Please sign in to comment.