From f7e77d7fb27fdd4505ca9f6eda6f43bc67e203db Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 21 Dec 2020 14:45:58 +0100 Subject: [PATCH] Export metadata fetcher metrics Signed-off-by: Marco Pracucci --- pkg/block/fetcher.go | 87 ++++++++++++++++++++------------------- pkg/block/fetcher_test.go | 56 ++++++++++++------------- 2 files changed, 73 insertions(+), 70 deletions(-) diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index c0e04dbe8e..2373356da1 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -37,32 +37,35 @@ import ( const FetcherConcurrency = 32 -type fetcherMetrics struct { - syncs prometheus.Counter - syncFailures prometheus.Counter - syncDuration prometheus.Histogram +// FetcherMetrics holds metrics tracked by the metadata fetcher. This struct and its fields are exported +// to allow depending projects (eg. Cortex) to implement their own custom metadata fetcher while tracking +// compatible metrics. +type FetcherMetrics struct { + Syncs prometheus.Counter + SyncFailures prometheus.Counter + SyncDuration prometheus.Histogram - synced *extprom.TxGaugeVec - modified *extprom.TxGaugeVec + Synced *extprom.TxGaugeVec + Modified *extprom.TxGaugeVec } -func (s *fetcherMetrics) submit() { - s.synced.Submit() - s.modified.Submit() +func (s *FetcherMetrics) submit() { + s.Synced.Submit() + s.Modified.Submit() } -func (s *fetcherMetrics) resetTx() { - s.synced.ResetTx() - s.modified.ResetTx() +func (s *FetcherMetrics) resetTx() { + s.Synced.ResetTx() + s.Modified.ResetTx() } const ( fetcherSubSys = "blocks_meta" - corruptedMeta = "corrupted-meta-json" - noMeta = "no-meta-json" - loadedMeta = "loaded" - failedMeta = "failed" + CorruptedMeta = "corrupted-meta-json" + NoMeta = "no-meta-json" + LoadedMeta = "loaded" + FailedMeta = "failed" // Synced label values. labelExcludedMeta = "label-excluded" @@ -71,7 +74,7 @@ const ( duplicateMeta = "duplicate" // Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted, // but don't have a replacement block yet. - markedForDeletionMeta = "marked-for-deletion" + MarkedForDeletionMeta = "marked-for-deletion" // MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric. MarkedForNoCompactionMeta = "marked-for-no-compact" @@ -80,26 +83,26 @@ const ( replicaRemovedMeta = "replica-label-removed" ) -func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { - var m fetcherMetrics +func NewFetcherMetrics(reg prometheus.Registerer) *FetcherMetrics { + var m FetcherMetrics - m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.Syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Subsystem: fetcherSubSys, Name: "syncs_total", Help: "Total blocks metadata synchronization attempts", }) - m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ + m.SyncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Subsystem: fetcherSubSys, Name: "sync_failures_total", Help: "Total blocks metadata synchronization failures", }) - m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + m.SyncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Subsystem: fetcherSubSys, Name: "sync_duration_seconds", Help: "Duration of the blocks metadata synchronization in seconds", Buckets: []float64{0.01, 1, 10, 100, 1000}, }) - m.synced = extprom.NewTxGaugeVec( + m.Synced = extprom.NewTxGaugeVec( reg, prometheus.GaugeOpts{ Subsystem: fetcherSubSys, @@ -107,18 +110,18 @@ func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics { Help: "Number of block metadata synced", }, []string{"state"}, - []string{corruptedMeta}, - []string{noMeta}, - []string{loadedMeta}, + []string{CorruptedMeta}, + []string{NoMeta}, + []string{LoadedMeta}, []string{tooFreshMeta}, - []string{failedMeta}, + []string{FailedMeta}, []string{labelExcludedMeta}, []string{timeExcludedMeta}, []string{duplicateMeta}, - []string{markedForDeletionMeta}, + []string{MarkedForDeletionMeta}, []string{MarkedForNoCompactionMeta}, ) - m.modified = extprom.NewTxGaugeVec( + m.Modified = extprom.NewTxGaugeVec( reg, prometheus.GaugeOpts{ Subsystem: fetcherSubSys, @@ -197,7 +200,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente // NewMetaFetcher transforms BaseFetcher into actually usable *MetaFetcher. func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier, logTags ...interface{}) *MetaFetcher { - return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers, logger: log.With(f.logger, logTags...)} + return &MetaFetcher{metrics: NewFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers, logger: log.With(f.logger, logTags...)} } var ( @@ -405,15 +408,15 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) { return resp, nil } -func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]error, err error) { +func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]error, err error) { start := time.Now() defer func() { - metrics.syncDuration.Observe(time.Since(start).Seconds()) + metrics.SyncDuration.Observe(time.Since(start).Seconds()) if err != nil { - metrics.syncFailures.Inc() + metrics.SyncFailures.Inc() } }() - metrics.syncs.Inc() + metrics.Syncs.Inc() metrics.resetTx() // Run this in thread safe run group. @@ -433,25 +436,25 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter metas[id] = m } - metrics.synced.WithLabelValues(failedMeta).Set(float64(len(resp.metaErrs))) - metrics.synced.WithLabelValues(noMeta).Set(resp.noMetas) - metrics.synced.WithLabelValues(corruptedMeta).Set(resp.corruptedMetas) + metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs))) + metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas) + metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas) for _, filter := range filters { // NOTE: filter can update synced metric accordingly to the reason of the exclude. - if err := filter.Filter(ctx, metas, metrics.synced); err != nil { + if err := filter.Filter(ctx, metas, metrics.Synced); err != nil { return nil, nil, errors.Wrap(err, "filter metas") } } for _, m := range modifiers { // NOTE: modifier can update modified metric accordingly to the reason of the modification. - if err := m.Modify(ctx, metas, metrics.modified); err != nil { + if err := m.Modify(ctx, metas, metrics.Modified); err != nil { return nil, nil, errors.Wrap(err, "modify metas") } } - metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas))) + metrics.Synced.WithLabelValues(LoadedMeta).Set(float64(len(metas))) metrics.submit() if len(resp.metaErrs) > 0 { @@ -464,7 +467,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter type MetaFetcher struct { wrapped *BaseFetcher - metrics *fetcherMetrics + metrics *FetcherMetrics filters []MetadataFilter modifiers []MetadataModifier @@ -823,7 +826,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL mtx.Lock() f.deletionMarkMap[id] = m if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() { - synced.WithLabelValues(markedForDeletionMeta).Inc() + synced.WithLabelValues(MarkedForDeletionMeta).Inc() delete(metas, id) } mtx.Unlock() diff --git a/pkg/block/fetcher_test.go b/pkg/block/fetcher_test.go index 71949f8603..6201cf27be 100644 --- a/pkg/block/fetcher_test.go +++ b/pkg/block/fetcher_test.go @@ -32,10 +32,10 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) -func newTestFetcherMetrics() *fetcherMetrics { - return &fetcherMetrics{ - synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}), - modified: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"modified"}), +func newTestFetcherMetrics() *FetcherMetrics { + return &FetcherMetrics{ + Synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}), + Modified: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"modified"}), } } @@ -280,14 +280,14 @@ func TestMetaFetcher_Fetch(t *testing.T) { expectedFailures = 1 } testutil.Equals(t, float64(i+1), promtest.ToFloat64(baseFetcher.syncs)) - testutil.Equals(t, float64(i+1), promtest.ToFloat64(fetcher.metrics.syncs)) - testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(loadedMeta))) - testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(noMeta))) - testutil.Equals(t, float64(tcase.expectedFiltered), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues("filtered"))) - testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(labelExcludedMeta))) - testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(timeExcludedMeta))) - testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(failedMeta))) - testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(tooFreshMeta))) + testutil.Equals(t, float64(i+1), promtest.ToFloat64(fetcher.metrics.Syncs)) + testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(LoadedMeta))) + testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(NoMeta))) + testutil.Equals(t, float64(tcase.expectedFiltered), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues("filtered"))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(timeExcludedMeta))) + testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(FailedMeta))) + testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(tooFreshMeta))) }); !ok { return } @@ -353,9 +353,9 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) { } m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m.synced)) + testutil.Ok(t, f.Filter(ctx, input, m.Synced)) - testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, 3.0, promtest.ToFloat64(m.Synced.WithLabelValues(labelExcludedMeta))) testutil.Equals(t, expected, input) } @@ -451,10 +451,10 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) { deleted := len(input) - len(expected) m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m.synced)) + testutil.Ok(t, f.Filter(ctx, input, m.Synced)) testutil.Equals(t, expected, input) - testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta))) + testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.Synced.WithLabelValues(labelExcludedMeta))) }) @@ -515,9 +515,9 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) { } m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m.synced)) + testutil.Ok(t, f.Filter(ctx, input, m.Synced)) - testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta))) + testutil.Equals(t, 2.0, promtest.ToFloat64(m.Synced.WithLabelValues(timeExcludedMeta))) testutil.Equals(t, expected, input) } @@ -866,9 +866,9 @@ func TestDeduplicateFilter_Filter(t *testing.T) { }, } } - testutil.Ok(t, f.Filter(ctx, metas, m.synced)) + testutil.Ok(t, f.Filter(ctx, metas, m.Synced)) compareSliceWithMapKeys(t, metas, tcase.expected) - testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta))) + testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.Synced.WithLabelValues(duplicateMeta))) }); !ok { return } @@ -935,9 +935,9 @@ func TestReplicaLabelRemover_Modify(t *testing.T) { }, } { m := newTestFetcherMetrics() - testutil.Ok(t, tcase.replicaLabelRemover.Modify(ctx, tcase.input, m.modified)) + testutil.Ok(t, tcase.replicaLabelRemover.Modify(ctx, tcase.input, m.Modified)) - testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta))) + testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.Modified.WithLabelValues(replicaRemovedMeta))) testutil.Equals(t, tcase.expected, tcase.input) } } @@ -1041,8 +1041,8 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds{}": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - testutil.Ok(t, f.Filter(ctx, input, m.synced)) - testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) + testutil.Ok(t, f.Filter(ctx, input, m.Synced)) + testutil.Equals(t, 0.0, promtest.ToFloat64(m.Synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) @@ -1066,8 +1066,8 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) { f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg) testutil.Equals(t, map[string]float64{"consistency_delay_seconds{}": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds")) - testutil.Ok(t, f.Filter(ctx, input, m.synced)) - testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta))) + testutil.Ok(t, f.Filter(ctx, input, m.Synced)) + testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.Synced.WithLabelValues(tooFreshMeta))) testutil.Equals(t, expected, input) }) } @@ -1115,8 +1115,8 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) { } m := newTestFetcherMetrics() - testutil.Ok(t, f.Filter(ctx, input, m.synced)) - testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta))) + testutil.Ok(t, f.Filter(ctx, input, m.Synced)) + testutil.Equals(t, 1.0, promtest.ToFloat64(m.Synced.WithLabelValues(MarkedForDeletionMeta))) testutil.Equals(t, expected, input) }) }