Skip to content

Commit

Permalink
Upgrade Prometheus to LabelNames with matchers (cortexproject#4380)
Browse files Browse the repository at this point in the history
* Upgrade Prometheus to LabelNames with matchers

Since prometheus/prometheus#9083 prometheus now
provides matchers to the LabelNames method on the LabelQuerier
interface, so in order to upgrade Prometheus we need to support that.

This partially solves
cortexproject#3658 as now the block
store queryable uses the LabelNames method with matchers.

However, we're still using the ingesters' MetricsForLabelMatchers method
to perform the LabelNames w/matchers call on the distributor. That
change will be tackled separately as it breaks ingester's contracts and
needs to be released with a feature flag to perform a backwards
compatible release.

Signed-off-by: Oleg Zaytsev <[email protected]>

* Updated CHANGELOG.md

Signed-off-by: Oleg Zaytsev <[email protected]>
Signed-off-by: Alvin Lin <[email protected]>
  • Loading branch information
colega authored and alvinlin123 committed Jan 14, 2022
1 parent 175a57c commit 6fe1b01
Show file tree
Hide file tree
Showing 31 changed files with 413 additions and 109 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* `-store-gateway.sharding-ring.heartbeat-period`
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
* [ENHANCEMENT] Updated Prometheus to include changes from prometheus/prometheus#9083. Now whenever `/labels` API calls include matchers, blocks store is queried for `LabelNames` with matchers instead of `Series` calls which was inefficient. #4380
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.29.0
github.com/prometheus/prometheus v1.8.2-0.20210720084720-59d02b5ef003
github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1529,8 +1529,8 @@ github.com/prometheus/prometheus v1.8.2-0.20210215121130-6f488061dfb4/go.mod h1:
github.com/prometheus/prometheus v1.8.2-0.20210315220929-1cba1741828b/go.mod h1:MS/bpdil77lPbfQeKk6OqVQ9OLnpN3Rszd0hka0EOWE=
github.com/prometheus/prometheus v1.8.2-0.20210324152458-c7a62b95cea0/go.mod h1:sf7j/iAbhZahjeC0s3wwMmp5dksrJ/Za1UKdR+j6Hmw=
github.com/prometheus/prometheus v1.8.2-0.20210421143221-52df5ef7a3be/go.mod h1:WbIKsp4vWCoPHis5qQfd0QimLOR7qe79roXN5O8U8bs=
github.com/prometheus/prometheus v1.8.2-0.20210720084720-59d02b5ef003 h1:MYbsDV+OIFLkqwea5oC3jIUp/HxFyXQrvXpw/hooMRQ=
github.com/prometheus/prometheus v1.8.2-0.20210720084720-59d02b5ef003/go.mod h1:o6V+A4iPEWjLG0rSEKeev3OzfBZwP+ay+4iS4dkfLI4=
github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d h1:UnqZFF2qXa+ctCfbss/J4yn9rTVoTiuawjrokqwt4Hg=
github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d/go.mod h1:o6V+A4iPEWjLG0rSEKeev3OzfBZwP+ay+4iS4dkfLI4=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand Down
6 changes: 4 additions & 2 deletions pkg/configs/legacy_promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) stor
func (q *errQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, q.err
}
func (q *errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, q.err }
func (q *errQuerier) Close() error { return q.err }
func (q *errQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, q.err
}
func (q *errQuerier) Close() error { return q.err }

func TestQueryError(t *testing.T) {
engine := NewEngine(nil, nil, 10, 10*time.Second)
Expand Down
31 changes: 21 additions & 10 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,15 @@ type BlocksStoreQueryable struct {
subservicesWatcher *services.FailureWatcher
}

func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consistency *BlocksConsistencyChecker, limits BlocksStoreLimits, queryStoreAfter time.Duration, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
func NewBlocksStoreQueryable(
stores BlocksStoreSet,
finder BlocksFinder,
consistency *BlocksConsistencyChecker,
limits BlocksStoreLimits,
queryStoreAfter time.Duration,
logger log.Logger,
reg prometheus.Registerer,
) (*BlocksStoreQueryable, error) {
manager, err := services.NewManager(stores, finder)
if err != nil {
return nil, errors.Wrap(err, "register blocks storage queryable subservices")
Expand Down Expand Up @@ -317,20 +325,21 @@ func (q *blocksStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
return q.selectSorted(sp, matchers...)
}

func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (q *blocksStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelNames")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT

var (
resMtx sync.Mutex
resNameSets = [][]string{}
resWarnings = storage.Warnings(nil)
resMtx sync.Mutex
resNameSets = [][]string{}
resWarnings = storage.Warnings(nil)
convertedMatchers = convertMatchersToLabelMatcher(matchers)
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) {
nameSets, warnings, queriedBlocks, err := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT)
nameSets, warnings, queriedBlocks, err := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT, convertedMatchers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -693,6 +702,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
matchers []storepb.LabelMatcher,
) ([][]string, storage.Warnings, []ulid.ULID, error) {
var (
reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID)
Expand All @@ -711,7 +721,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
blockIDs := blockIDs

g.Go(func() error {
req, err := createLabelNamesRequest(minT, maxT, blockIDs)
req, err := createLabelNamesRequest(minT, maxT, blockIDs, matchers)
if err != nil {
return errors.Wrapf(err, "failed to create label names request")
}
Expand Down Expand Up @@ -870,10 +880,11 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip
}, nil
}

func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID) (*storepb.LabelNamesRequest, error) {
func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID, matchers []storepb.LabelMatcher) (*storepb.LabelNamesRequest, error) {
req := &storepb.LabelNamesRequest{
Start: minT,
End: maxT,
Start: minT,
End: maxT,
Matchers: matchers,
}

// Selectively query only specific blocks.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (q *chunkStoreQuerier) LabelValues(name string, labels ...*labels.Matcher)
return nil, nil, nil
}

func (q *chunkStoreQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (q *chunkStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}

Expand Down
37 changes: 35 additions & 2 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,44 @@ func (q *distributorQuerier) LabelValues(name string, matchers ...*labels.Matche
return lvs, nil, err
}

func (q *distributorQuerier) LabelNames() ([]string, storage.Warnings, error) {
ln, err := q.distributor.LabelNames(q.ctx, model.Time(q.mint), model.Time(q.maxt))
func (q *distributorQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
if len(matchers) > 0 {
return q.labelNamesWithMatchers(matchers...)
}

log, ctx := spanlogger.New(q.ctx, "distributorQuerier.LabelNames")
defer log.Span.Finish()

ln, err := q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt))
return ln, nil, err
}

// labelNamesWithMatchers performs the LabelNames call by calling ingester's MetricsForLabelMatchers method
func (q *distributorQuerier) labelNamesWithMatchers(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
log, ctx := spanlogger.New(q.ctx, "distributorQuerier.labelNamesWithMatchers")
defer log.Span.Finish()

ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
if err != nil {
return nil, nil, err
}
namesMap := make(map[string]struct{})

for _, m := range ms {
for name := range m.Metric {
namesMap[string(name)] = struct{}{}
}
}

names := make([]string, 0, len(namesMap))
for name := range namesMap {
names = append(names, name)
}
sort.Strings(names)

return names, nil, nil
}

func (q *distributorQuerier) Close() error {
return nil
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,30 @@ func verifySeries(t *testing.T, series storage.Series, l labels.Labels, samples
require.False(t, it.Next())
require.Nil(t, it.Err())
}
func TestDistributorQuerier_LabelNames(t *testing.T) {
someMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
labelNames := []string{"foo", "job"}

t.Run("with matchers", func(t *testing.T) {
metrics := []metric.Metric{
{Metric: model.Metric{"foo": "bar"}},
{Metric: model.Metric{"job": "baz"}},
{Metric: model.Metric{"job": "baz", "foo": "boom"}},
}
d := &mockDistributor{}
d.On("MetricsForLabelMatchers", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers).
Return(metrics, nil)

queryable := newDistributorQueryable(d, false, nil, 0)
querier, err := queryable.Querier(context.Background(), mint, maxt)
require.NoError(t, err)

names, warnings, err := querier.LabelNames(someMatchers...)
require.NoError(t, err)
assert.Empty(t, warnings)
assert.Equal(t, labelNames, names)
})
}

func convertToChunks(t *testing.T, samples []cortexpb.Sample) []client.Chunk {
// We need to make sure that there is atleast one chunk present,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/duplicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (m testQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]st
return nil, nil, nil
}

func (m testQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (m testQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/error_translate_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (e errorTranslateQuerier) LabelValues(name string, matchers ...*labels.Matc
return values, warnings, e.fn(err)
}

func (e errorTranslateQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
func (e errorTranslateQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames(matchers...)
return values, warnings, e.fn(err)
}

Expand All @@ -149,8 +149,8 @@ func (e errorTranslateChunkQuerier) LabelValues(name string, matchers ...*labels
return values, warnings, e.fn(err)
}

func (e errorTranslateChunkQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
func (e errorTranslateChunkQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames(matchers...)
return values, warnings, e.fn(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/error_translate_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (t errorTestQuerier) LabelValues(name string, matchers ...*labels.Matcher)
return nil, nil, t.err
}

func (t errorTestQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (t errorTestQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, t.err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/lazyquery/lazyquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (l LazyQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]st
}

// LabelNames implements Storage.Querier
func (l LazyQuerier) LabelNames() ([]string, storage.Warnings, error) {
return l.next.LabelNames()
func (l LazyQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return l.next.LabelNames(matchers...)
}

// Close implements Storage.Querier
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,13 @@ func (q querier) LabelValues(name string, matchers ...*labels.Matcher) ([]string
return strutil.MergeSlices(sets...), warnings, nil
}

func (q querier) LabelNames() ([]string, storage.Warnings, error) {
func (q querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
if !q.queryStoreForLabels {
return q.metadataQuerier.LabelNames()
return q.metadataQuerier.LabelNames(matchers...)
}

if len(q.queriers) == 1 {
return q.queriers[0].LabelNames()
return q.queriers[0].LabelNames(matchers...)
}

var (
Expand All @@ -453,7 +453,7 @@ func (q querier) LabelNames() ([]string, storage.Warnings, error) {
querier := querier
g.Go(func() error {
// NB: Names are sorted in Cortex already.
myNames, myWarnings, err := querier.LabelNames()
myNames, myWarnings, err := querier.LabelNames(matchers...)
if err != nil {
return err
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,35 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
}
})

t.Run("label names with matchers", func(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "route", "get_user"),
}
distributor := &mockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, matchers).Return([]metric.Metric{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
q, err := queryable.Querier(ctx, util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime))
require.NoError(t, err)

_, _, err = q.LabelNames(matchers...)
require.NoError(t, err)

if !testData.expectedSkipped {
// Assert on the time range of the actual executed query (5s delta).
delta := float64(5000)
require.Len(t, distributor.Calls, 1)
assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method)
args := distributor.Calls[0].Arguments
assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataStartTime), int64(args.Get(1).(model.Time)), delta)
assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataEndTime), int64(args.Get(2).(model.Time)), delta)
assert.Equal(t, matchers, args.Get(3).([]*labels.Matcher))
} else {
// Ensure no query has been executed executed (because skipped).
assert.Len(t, distributor.Calls, 0)
}
})

t.Run("label values", func(t *testing.T) {
distributor := &mockDistributor{}
distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/queryrange/promql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,10 @@ func (m *testMatrix) Select(_ bool, selectParams *storage.SelectHints, matchers
func (m *testMatrix) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (m *testMatrix) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (m *testMatrix) Close() error { return nil }
func (m *testMatrix) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (m *testMatrix) Close() error { return nil }

func newSeries(metric labels.Labels, generator func(float64) float64) *promql.StorageSeries {
sort.Sort(metric)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (q *ShardedQuerier) LabelValues(name string, matchers ...*labels.Matcher) (
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *ShardedQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (q *ShardedQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, errors.Errorf("unimplemented")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (q *MockShardedQueryable) LabelValues(name string, matchers ...*labels.Matc
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *MockShardedQueryable) LabelNames() ([]string, storage.Warnings, error) {
func (q *MockShardedQueryable) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, errors.Errorf("unimplemented")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m mockQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]st
return nil, nil, nil
}

func (m mockQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (m mockQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}

Expand Down
Loading

0 comments on commit 6fe1b01

Please sign in to comment.