Skip to content

Commit

Permalink
Filter external labels from matchers on LabelValues/LabelNames (thano…
Browse files Browse the repository at this point in the history
…s-io#5596)

* Filter external labels from matchers on LabelValues/LabelNames

Signed-off-by: Oron Sharabi <[email protected]>

* Fix linting

Signed-off-by: Oron Sharabi <[email protected]>

* Fix linter

Signed-off-by: Oron Sharabi <[email protected]>

* Fix comments to be full Sentences

Signed-off-by: Oron Sharabi <[email protected]>

Signed-off-by: Oron Sharabi <[email protected]>
  • Loading branch information
oronsh authored and GiedriusS committed Aug 25, 2022
1 parent 29d4b2e commit 9632157
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 14 deletions.
56 changes: 42 additions & 14 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,11 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
if len(reqBlockMatchers) > 0 && !b.matchRelabelLabels(reqBlockMatchers) {
continue
}
// Filter external labels from matchers.
reqSeriesMatchersNoExtLabels, ok := b.FilterExtLabelsMatchers(reqSeriesMatchers)
if !ok {
continue
}

resHints.AddQueriedBlock(b.meta.ULID)

Expand All @@ -1284,7 +1289,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label names")

var result []string
if len(reqSeriesMatchers) == 0 {
if len(reqSeriesMatchersNoExtLabels) == 0 {
// Do it via index reader to have pending reader registered correctly.
// LabelNames are already sorted.
res, err := indexr.block.indexHeaderReader.LabelNames()
Expand All @@ -1302,7 +1307,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

result = strutil.MergeSlices(res, extRes)
} else {
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchersNoExtLabels, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}
Expand Down Expand Up @@ -1355,6 +1360,24 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}, nil
}

func (b *bucketBlock) FilterExtLabelsMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, bool) {
// We filter external labels from matchers so we won't try to match series on them.
var result []*labels.Matcher
for _, m := range matchers {
// Get value of external label from block.
v := b.extLset.Get(m.Name)
// If value is empty string the matcher is a valid one since it's not part of external labels.
if v == "" {
result = append(result, m)
} else if v != "" && v != m.Value {
// If matcher is external label but value is different we don't want to look in block anyway.
return []*labels.Matcher{}, false
}
}

return result, true
}

// LabelValues implements the storepb.StoreServer interface.
func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
reqSeriesMatchers, err := storepb.MatchersToPromMatchers(req.Matchers...)
Expand All @@ -1380,16 +1403,6 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
}

// If we have series matchers, add <labelName> != "" matcher, to only select series that have given label name.
if len(reqSeriesMatchers) > 0 {
m, err := labels.NewMatcher(labels.MatchNotEqual, req.Label, "")
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

reqSeriesMatchers = append(reqSeriesMatchers, m)
}

s.mtx.RLock()

var mtx sync.Mutex
Expand All @@ -1405,6 +1418,21 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
if len(reqBlockMatchers) > 0 && !b.matchRelabelLabels(reqBlockMatchers) {
continue
}
// Filter external labels from matchers.
reqSeriesMatchersNoExtLabels, ok := b.FilterExtLabelsMatchers(reqSeriesMatchers)
if !ok {
continue
}

// If we have series matchers, add <labelName> != "" matcher, to only select series that have given label name.
if len(reqSeriesMatchersNoExtLabels) > 0 {
m, err := labels.NewMatcher(labels.MatchNotEqual, req.Label, "")
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}

reqSeriesMatchersNoExtLabels = append(reqSeriesMatchersNoExtLabels, m)
}

resHints.AddQueriedBlock(b.meta.ULID)

Expand All @@ -1420,7 +1448,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
defer runutil.CloseWithLogOnErr(s.logger, indexr, "label values")

var result []string
if len(reqSeriesMatchers) == 0 {
if len(reqSeriesMatchersNoExtLabels) == 0 {
// Do it via index reader to have pending reader registered correctly.
res, err := indexr.block.indexHeaderReader.LabelValues(req.Label)
if err != nil {
Expand All @@ -1433,7 +1461,7 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
}
result = res
} else {
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchers, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
seriesSet, _, err := blockSeries(newCtx, b.extLset, indexr, nil, reqSeriesMatchersNoExtLabels, nil, seriesLimiter, true, req.Start, req.End, nil, nil)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
}
Expand Down
46 changes: 46 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,52 @@ func TestBucketBlock_Property(t *testing.T) {
properties.TestingRun(t)
}

func TestBucketFilterExtLabelsMatchers(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)

dir := t.TempDir()
bkt, err := filesystem.NewBucket(dir)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()

blockID := ulid.MustNew(1, nil)
meta := &metadata.Meta{
BlockMeta: tsdb.BlockMeta{ULID: blockID},
Thanos: metadata.Thanos{
Labels: map[string]string{
"a": "b",
"c": "d",
},
},
}
b, _ := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil)
ms := []*labels.Matcher{
{Type: labels.MatchNotEqual, Name: "a", Value: "b"},
}
res, _ := b.FilterExtLabelsMatchers(ms)
testutil.Equals(t, len(res), 0)

ms = []*labels.Matcher{
{Type: labels.MatchNotEqual, Name: "a", Value: "a"},
}
_, ok := b.FilterExtLabelsMatchers(ms)
testutil.Equals(t, ok, false)

ms = []*labels.Matcher{
{Type: labels.MatchNotEqual, Name: "a", Value: "a"},
{Type: labels.MatchNotEqual, Name: "c", Value: "d"},
}
res, _ = b.FilterExtLabelsMatchers(ms)
testutil.Equals(t, len(res), 0)

ms = []*labels.Matcher{
{Type: labels.MatchNotEqual, Name: "a2", Value: "a"},
}
res, _ = b.FilterExtLabelsMatchers(ms)
testutil.Equals(t, len(res), 1)
testutil.Equals(t, res, ms)
}

func TestBucketBlock_matchLabels(t *testing.T) {
defer testutil.TolerantVerifyLeak(t)

Expand Down

0 comments on commit 9632157

Please sign in to comment.