Skip to content

Commit

Permalink
Include external labels in series sharding
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Jul 22, 2022
1 parent ffdb615 commit 6d0489b
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
5 changes: 3 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,12 +835,13 @@ func blockSeries(
return nil, nil, errors.Wrap(err, "Lookup labels symbols")
}

if !shardMatcher.MatchesLabels(lset) {
completeLabelset := labelpb.ExtendSortedLabels(lset, extLset)
if !shardMatcher.MatchesLabels(completeLabelset) {
continue
}

s := seriesEntry{}
s.lset = labelpb.ExtendSortedLabels(lset, extLset)
s.lset = completeLabelset

if !skipChunks {
// Schedule loading chunks.
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,8 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(

framesNum++
for _, series := range res.ChunkedSeries {
if !shardMatcher.MatchesZLabels(series.Labels) {
completeLabelset := labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLset)
if !shardMatcher.MatchesLabels(completeLabelset) {
continue
}

Expand All @@ -423,7 +424,7 @@ func (p *PrometheusStore) handleStreamedPrometheusResponse(

r := storepb.NewSeriesResponse(&storepb.Series{
Labels: labelpb.ZLabelsFromPromLabels(
labelpb.ExtendSortedLabels(labelpb.ZLabelsToPromLabels(series.Labels), extLset),
completeLabelset,
),
Chunks: thanosChks,
})
Expand Down
9 changes: 9 additions & 0 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,15 @@ func startStreamSeriesSet(
bytesProcessed := 0

defer func() {
if shardInfo != nil {
level.Info(logger).Log("msg", "Done fetching series",
"series", seriesStats.Series,
"chunks", seriesStats.Chunks,
"samples", seriesStats.Samples,
"bytes", bytesProcessed,
)
}

span.SetTag("processed.series", seriesStats.Series)
span.SetTag("processed.chunks", seriesStats.Chunks)
span.SetTag("processed.samples", seriesStats.Samples)
Expand Down
5 changes: 3 additions & 2 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,12 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSer
// Stream at most one series per frame; series may be split over multiple frames according to maxBytesInFrame.
for set.Next() {
series := set.At()
if !shardMatcher.MatchesLabels(series.Labels()) {
completeLabelset := labelpb.ExtendSortedLabels(series.Labels(), s.extLset)
if !shardMatcher.MatchesLabels(completeLabelset) {
continue
}

storeSeries := storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(labelpb.ExtendSortedLabels(series.Labels(), s.extLset))}
storeSeries := storepb.Series{Labels: labelpb.ZLabelsFromPromLabels(completeLabelset)}
if r.SkipChunks {
if err := srv.Send(storepb.NewSeriesResponse(&storeSeries)); err != nil {
return status.Error(codes.Aborted, err.Error())
Expand Down

0 comments on commit 6d0489b

Please sign in to comment.