-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for label matchers in LabelNames and LabelValues call to store. #4107
Changes from all commits
6d23fcf
3e63ee7
0304e3c
b5e3488
615e412
8f6ff97
db1a332
84c4e5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -282,7 +282,8 @@ type BucketStore struct { | |
|
||
// chunksLimiterFactory creates a new limiter used to limit the number of chunks fetched by each Series() call. | ||
chunksLimiterFactory ChunksLimiterFactory | ||
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call. | ||
// seriesLimiterFactory creates a new limiter used to limit the number of touched series by each Series() call, | ||
// or LabelName and LabelValues calls when used with matchers. | ||
seriesLimiterFactory SeriesLimiterFactory | ||
partitioner Partitioner | ||
|
||
|
@@ -746,14 +747,17 @@ func (s *bucketSeriesSet) Err() error { | |
return s.err | ||
} | ||
|
||
// blockSeries returns series matching given matchers, that have some data in given time range. | ||
func blockSeries( | ||
extLset labels.Labels, | ||
indexr *bucketIndexReader, | ||
chunkr *bucketChunkReader, | ||
matchers []*labels.Matcher, | ||
req *storepb.SeriesRequest, | ||
chunksLimiter ChunksLimiter, | ||
seriesLimiter SeriesLimiter, | ||
extLset labels.Labels, // External labels added to the returned series labels. | ||
indexr *bucketIndexReader, // Index reader for block. | ||
chunkr *bucketChunkReader, // Chunk reader for block. | ||
matchers []*labels.Matcher, // Series matchers. | ||
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks. | ||
seriesLimiter SeriesLimiter, // Rate limiter for loading series. | ||
skipChunks bool, // If true, chunks are not loaded. | ||
minTime, maxTime int64, // Series must have data in this time range to be returned. | ||
loadAggregates []storepb.Aggr, // List of aggregates to load when loading chunks. | ||
) (storepb.SeriesSet, *queryStats, error) { | ||
ps, err := indexr.ExpandedPostings(matchers) | ||
if err != nil { | ||
|
@@ -785,7 +789,7 @@ func blockSeries( | |
chks []chunks.Meta | ||
) | ||
for _, id := range ps { | ||
ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, req.SkipChunks, req.MinTime, req.MaxTime) | ||
ok, err := indexr.LoadSeriesForTime(id, &symbolizedLset, &chks, skipChunks, minTime, maxTime) | ||
if err != nil { | ||
return nil, nil, errors.Wrap(err, "read series") | ||
} | ||
|
@@ -795,7 +799,7 @@ func blockSeries( | |
} | ||
|
||
s := seriesEntry{} | ||
if !req.SkipChunks { | ||
if !skipChunks { | ||
// Schedule loading chunks. | ||
s.refs = make([]uint64, 0, len(chks)) | ||
s.chks = make([]storepb.AggrChunk, 0, len(chks)) | ||
|
@@ -825,11 +829,11 @@ func blockSeries( | |
res = append(res, s) | ||
} | ||
|
||
if req.SkipChunks { | ||
if skipChunks { | ||
return newBucketSeriesSet(res), indexr.stats, nil | ||
} | ||
|
||
if err := chunkr.load(res, req.Aggregates); err != nil { | ||
if err := chunkr.load(res, loadAggregates); err != nil { | ||
return nil, nil, errors.Wrap(err, "load chunks") | ||
} | ||
|
||
|
@@ -1026,9 +1030,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_Serie | |
indexr, | ||
chunkr, | ||
blockMatchers, | ||
req, | ||
chunksLimiter, | ||
seriesLimiter, | ||
req.SkipChunks, | ||
req.MinTime, req.MaxTime, | ||
req.Aggregates, | ||
) | ||
if err != nil { | ||
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) | ||
|
@@ -1154,16 +1160,14 @@ func chunksSize(chks []storepb.AggrChunk) (size int) { | |
|
||
// LabelNames implements the storepb.StoreServer interface. | ||
func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { | ||
resHints := &hintspb.LabelNamesResponseHints{} | ||
|
||
g, gctx := errgroup.WithContext(ctx) | ||
reqSeriesMatchers, err := storepb.MatchersToPromMatchers(req.Matchers...) | ||
if err != nil { | ||
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error()) | ||
} | ||
|
||
s.mtx.RLock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that |
||
resHints := &hintspb.LabelNamesResponseHints{} | ||
|
||
var mtx sync.Mutex | ||
var sets [][]string | ||
var reqBlockMatchers []*labels.Matcher | ||
|
||
if req.Hints != nil { | ||
reqHints := &hintspb.LabelNamesRequestHints{} | ||
err := types.UnmarshalAny(req.Hints, reqHints) | ||
|
@@ -1177,7 +1181,16 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq | |
} | ||
} | ||
|
||
g, gctx := errgroup.WithContext(ctx) | ||
|
||
s.mtx.RLock() | ||
|
||
var mtx sync.Mutex | ||
var sets [][]string | ||
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) | ||
|
||
for _, b := range s.blocks { | ||
b := b | ||
if !b.overlapsClosedInterval(req.Start, req.End) { | ||
continue | ||
} | ||
|
@@ -1188,32 +1201,60 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq | |
resHints.AddQueriedBlock(b.meta.ULID) | ||
|
||
indexr := b.indexReader(gctx) | ||
extLabels := b.meta.Thanos.Labels | ||
|
||
g.Go(func() error { | ||
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names") | ||
|
||
// Do it via index reader to have pending reader registered correctly. | ||
res, err := indexr.block.indexHeaderReader.LabelNames() | ||
if err != nil { | ||
return errors.Wrap(err, "label names") | ||
} | ||
var result []string | ||
if len(reqSeriesMatchers) == 0 { | ||
// Do it via index reader to have pending reader registered correctly. | ||
// LabelNames are already sorted. | ||
res, err := indexr.block.indexHeaderReader.LabelNames() | ||
if err != nil { | ||
return errors.Wrapf(err, "label names for block %s", b.meta.ULID) | ||
} | ||
|
||
// Add a set for the external labels as well. | ||
// We're not adding them directly to res because there could be duplicates. | ||
extRes := make([]string, 0, len(extLabels)) | ||
for lName := range extLabels { | ||
extRes = append(extRes, lName) | ||
} | ||
// Add a set for the external labels as well. | ||
// We're not adding them directly to res because there could be duplicates. | ||
// b.extLset is already sorted by label name, no need to sort it again. | ||
extRes := make([]string, 0, len(b.extLset)) | ||
for _, l := range b.extLset { | ||
extRes = append(extRes, l.Name) | ||
} | ||
|
||
result = strutil.MergeSlices(res, extRes) | ||
} else { | ||
seriesSet, _, err := blockSeries(b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil) | ||
if err != nil { | ||
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) | ||
} | ||
|
||
sort.Strings(res) | ||
sort.Strings(extRes) | ||
// Extract label names from all series. Many label names will be the same, so we need to deduplicate them. | ||
// Note that label names will already include external labels (passed to blockSeries), so we don't need | ||
// to add them again. | ||
pstibrany marked this conversation as resolved.
Show resolved
Hide resolved
|
||
labelNames := map[string]struct{}{} | ||
for seriesSet.Next() { | ||
ls, _ := seriesSet.At() | ||
for _, l := range ls { | ||
labelNames[l.Name] = struct{}{} | ||
} | ||
} | ||
if seriesSet.Err() != nil { | ||
return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID) | ||
} | ||
|
||
res = strutil.MergeSlices(res, extRes) | ||
result = make([]string, 0, len(labelNames)) | ||
for n := range labelNames { | ||
result = append(result, n) | ||
} | ||
sort.Strings(result) | ||
} | ||
|
||
mtx.Lock() | ||
sets = append(sets, res) | ||
mtx.Unlock() | ||
if len(result) > 0 { | ||
mtx.Lock() | ||
sets = append(sets, result) | ||
mtx.Unlock() | ||
} | ||
|
||
return nil | ||
}) | ||
|
@@ -1238,16 +1279,16 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq | |
|
||
// LabelValues implements the storepb.StoreServer interface. | ||
func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { | ||
reqSeriesMatchers, err := storepb.MatchersToPromMatchers(req.Matchers...) | ||
if err != nil { | ||
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error()) | ||
} | ||
|
||
resHints := &hintspb.LabelValuesResponseHints{} | ||
|
||
g, gctx := errgroup.WithContext(ctx) | ||
|
||
s.mtx.RLock() | ||
|
||
var mtx sync.Mutex | ||
var sets [][]string | ||
var reqBlockMatchers []*labels.Matcher | ||
|
||
if req.Hints != nil { | ||
reqHints := &hintspb.LabelValuesRequestHints{} | ||
err := types.UnmarshalAny(req.Hints, reqHints) | ||
|
@@ -1261,7 +1302,25 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR | |
} | ||
} | ||
|
||
// If we have series matchers, add <labelName> != "" matcher, to only select series that have given label name. | ||
if len(reqSeriesMatchers) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. End result will be correct, but there is an optimization we can do (preferably in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make sense. Thank you. |
||
m, err := labels.NewMatcher(labels.MatchNotEqual, req.Label, "") | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return nil, status.Error(codes.InvalidArgument, err.Error()) | ||
} | ||
|
||
reqSeriesMatchers = append(reqSeriesMatchers, m) | ||
} | ||
|
||
s.mtx.RLock() | ||
|
||
var mtx sync.Mutex | ||
var sets [][]string | ||
var seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series")) | ||
|
||
for _, b := range s.blocks { | ||
b := b | ||
|
||
if !b.overlapsClosedInterval(req.Start, req.End) { | ||
continue | ||
} | ||
|
@@ -1272,25 +1331,55 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR | |
resHints.AddQueriedBlock(b.meta.ULID) | ||
|
||
indexr := b.indexReader(gctx) | ||
extLabels := b.meta.Thanos.Labels | ||
|
||
g.Go(func() error { | ||
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values") | ||
|
||
// Do it via index reader to have pending reader registered correctly. | ||
res, err := indexr.block.indexHeaderReader.LabelValues(req.Label) | ||
if err != nil { | ||
return errors.Wrap(err, "index header label values") | ||
} | ||
var result []string | ||
if len(reqSeriesMatchers) == 0 { | ||
// Do it via index reader to have pending reader registered correctly. | ||
res, err := indexr.block.indexHeaderReader.LabelValues(req.Label) | ||
if err != nil { | ||
return errors.Wrapf(err, "index header label values for block %s", b.meta.ULID) | ||
} | ||
|
||
// Add the external label value as well. | ||
if extLabelValue, ok := extLabels[req.Label]; ok { | ||
res = strutil.MergeSlices(res, []string{extLabelValue}) | ||
// Add the external label value as well. | ||
if extLabelValue := b.extLset.Get(req.Label); extLabelValue != "" { | ||
res = strutil.MergeSlices(res, []string{extLabelValue}) | ||
} | ||
result = res | ||
} else { | ||
seriesSet, _, err := blockSeries(b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil) | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID) | ||
} | ||
|
||
// Extract given label's value from all series and deduplicate them. | ||
// We don't need to deal with external labels, since they are already added by blockSeries. | ||
values := map[string]struct{}{} | ||
for seriesSet.Next() { | ||
ls, _ := seriesSet.At() | ||
val := ls.Get(req.Label) | ||
if val != "" { // Should never be empty since we added labelName!="" matcher to the list of matchers. | ||
values[val] = struct{}{} | ||
} | ||
} | ||
if seriesSet.Err() != nil { | ||
return errors.Wrapf(seriesSet.Err(), "iterate series for block %s", b.meta.ULID) | ||
} | ||
|
||
result = make([]string, 0, len(values)) | ||
for n := range values { | ||
result = append(result, n) | ||
} | ||
sort.Strings(result) | ||
} | ||
|
||
mtx.Lock() | ||
sets = append(sets, res) | ||
mtx.Unlock() | ||
if len(result) > 0 { | ||
mtx.Lock() | ||
sets = append(sets, result) | ||
mtx.Unlock() | ||
} | ||
|
||
return nil | ||
}) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of
req
we now pass all used parameters explicitly.