Skip to content

Commit

Permalink
Export metadata fetcher metrics (thanos-io#3660)
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Feb 26, 2021
1 parent 2027fb3 commit 0d86697
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 70 deletions.
87 changes: 45 additions & 42 deletions pkg/block/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,35 @@ import (

const FetcherConcurrency = 32

type fetcherMetrics struct {
syncs prometheus.Counter
syncFailures prometheus.Counter
syncDuration prometheus.Histogram
// FetcherMetrics holds metrics tracked by the metadata fetcher. This struct and its fields are exported
// to allow depending projects (eg. Cortex) to implement their own custom metadata fetcher while tracking
// compatible metrics.
type FetcherMetrics struct {
Syncs prometheus.Counter
SyncFailures prometheus.Counter
SyncDuration prometheus.Histogram

synced *extprom.TxGaugeVec
modified *extprom.TxGaugeVec
Synced *extprom.TxGaugeVec
Modified *extprom.TxGaugeVec
}

func (s *fetcherMetrics) submit() {
s.synced.Submit()
s.modified.Submit()
func (s *FetcherMetrics) submit() {
s.Synced.Submit()
s.Modified.Submit()
}

func (s *fetcherMetrics) resetTx() {
s.synced.ResetTx()
s.modified.ResetTx()
func (s *FetcherMetrics) resetTx() {
s.Synced.ResetTx()
s.Modified.ResetTx()
}

const (
fetcherSubSys = "blocks_meta"

corruptedMeta = "corrupted-meta-json"
noMeta = "no-meta-json"
loadedMeta = "loaded"
failedMeta = "failed"
CorruptedMeta = "corrupted-meta-json"
NoMeta = "no-meta-json"
LoadedMeta = "loaded"
FailedMeta = "failed"

// Synced label values.
labelExcludedMeta = "label-excluded"
Expand All @@ -71,7 +74,7 @@ const (
duplicateMeta = "duplicate"
// Blocks that are marked for deletion can be loaded as well. This is done to make sure that we load blocks that are meant to be deleted,
// but don't have a replacement block yet.
markedForDeletionMeta = "marked-for-deletion"
MarkedForDeletionMeta = "marked-for-deletion"

// MarkedForNoCompactionMeta is label for blocks which are loaded but also marked for no compaction. This label is also counted in `loaded` label metric.
MarkedForNoCompactionMeta = "marked-for-no-compact"
Expand All @@ -80,45 +83,45 @@ const (
replicaRemovedMeta = "replica-label-removed"
)

func newFetcherMetrics(reg prometheus.Registerer) *fetcherMetrics {
var m fetcherMetrics
func NewFetcherMetrics(reg prometheus.Registerer) *FetcherMetrics {
var m FetcherMetrics

m.syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
m.Syncs = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: fetcherSubSys,
Name: "syncs_total",
Help: "Total blocks metadata synchronization attempts",
})
m.syncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
m.SyncFailures = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Subsystem: fetcherSubSys,
Name: "sync_failures_total",
Help: "Total blocks metadata synchronization failures",
})
m.syncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
m.SyncDuration = promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Subsystem: fetcherSubSys,
Name: "sync_duration_seconds",
Help: "Duration of the blocks metadata synchronization in seconds",
Buckets: []float64{0.01, 1, 10, 100, 1000},
})
m.synced = extprom.NewTxGaugeVec(
m.Synced = extprom.NewTxGaugeVec(
reg,
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Name: "synced",
Help: "Number of block metadata synced",
},
[]string{"state"},
[]string{corruptedMeta},
[]string{noMeta},
[]string{loadedMeta},
[]string{CorruptedMeta},
[]string{NoMeta},
[]string{LoadedMeta},
[]string{tooFreshMeta},
[]string{failedMeta},
[]string{FailedMeta},
[]string{labelExcludedMeta},
[]string{timeExcludedMeta},
[]string{duplicateMeta},
[]string{markedForDeletionMeta},
[]string{MarkedForDeletionMeta},
[]string{MarkedForNoCompactionMeta},
)
m.modified = extprom.NewTxGaugeVec(
m.Modified = extprom.NewTxGaugeVec(
reg,
prometheus.GaugeOpts{
Subsystem: fetcherSubSys,
Expand Down Expand Up @@ -197,7 +200,7 @@ func NewMetaFetcher(logger log.Logger, concurrency int, bkt objstore.Instrumente

// NewMetaFetcher transforms BaseFetcher into actually usable *MetaFetcher.
func (f *BaseFetcher) NewMetaFetcher(reg prometheus.Registerer, filters []MetadataFilter, modifiers []MetadataModifier, logTags ...interface{}) *MetaFetcher {
return &MetaFetcher{metrics: newFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers, logger: log.With(f.logger, logTags...)}
return &MetaFetcher{metrics: NewFetcherMetrics(reg), wrapped: f, filters: filters, modifiers: modifiers, logger: log.With(f.logger, logTags...)}
}

var (
Expand Down Expand Up @@ -405,15 +408,15 @@ func (f *BaseFetcher) fetchMetadata(ctx context.Context) (interface{}, error) {
return resp, nil
}

func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]error, err error) {
func (f *BaseFetcher) fetch(ctx context.Context, metrics *FetcherMetrics, filters []MetadataFilter, modifiers []MetadataModifier) (_ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]error, err error) {
start := time.Now()
defer func() {
metrics.syncDuration.Observe(time.Since(start).Seconds())
metrics.SyncDuration.Observe(time.Since(start).Seconds())
if err != nil {
metrics.syncFailures.Inc()
metrics.SyncFailures.Inc()
}
}()
metrics.syncs.Inc()
metrics.Syncs.Inc()
metrics.resetTx()

// Run this in thread safe run group.
Expand All @@ -433,25 +436,25 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter
metas[id] = m
}

metrics.synced.WithLabelValues(failedMeta).Set(float64(len(resp.metaErrs)))
metrics.synced.WithLabelValues(noMeta).Set(resp.noMetas)
metrics.synced.WithLabelValues(corruptedMeta).Set(resp.corruptedMetas)
metrics.Synced.WithLabelValues(FailedMeta).Set(float64(len(resp.metaErrs)))
metrics.Synced.WithLabelValues(NoMeta).Set(resp.noMetas)
metrics.Synced.WithLabelValues(CorruptedMeta).Set(resp.corruptedMetas)

for _, filter := range filters {
// NOTE: filter can update synced metric accordingly to the reason of the exclude.
if err := filter.Filter(ctx, metas, metrics.synced); err != nil {
if err := filter.Filter(ctx, metas, metrics.Synced); err != nil {
return nil, nil, errors.Wrap(err, "filter metas")
}
}

for _, m := range modifiers {
// NOTE: modifier can update modified metric accordingly to the reason of the modification.
if err := m.Modify(ctx, metas, metrics.modified); err != nil {
if err := m.Modify(ctx, metas, metrics.Modified); err != nil {
return nil, nil, errors.Wrap(err, "modify metas")
}
}

metrics.synced.WithLabelValues(loadedMeta).Set(float64(len(metas)))
metrics.Synced.WithLabelValues(LoadedMeta).Set(float64(len(metas)))
metrics.submit()

if len(resp.metaErrs) > 0 {
Expand All @@ -464,7 +467,7 @@ func (f *BaseFetcher) fetch(ctx context.Context, metrics *fetcherMetrics, filter

type MetaFetcher struct {
wrapped *BaseFetcher
metrics *fetcherMetrics
metrics *FetcherMetrics

filters []MetadataFilter
modifiers []MetadataModifier
Expand Down Expand Up @@ -823,7 +826,7 @@ func (f *IgnoreDeletionMarkFilter) Filter(ctx context.Context, metas map[ulid.UL
mtx.Lock()
f.deletionMarkMap[id] = m
if time.Since(time.Unix(m.DeletionTime, 0)).Seconds() > f.delay.Seconds() {
synced.WithLabelValues(markedForDeletionMeta).Inc()
synced.WithLabelValues(MarkedForDeletionMeta).Inc()
delete(metas, id)
}
mtx.Unlock()
Expand Down
56 changes: 28 additions & 28 deletions pkg/block/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
"github.com/thanos-io/thanos/pkg/testutil"
)

func newTestFetcherMetrics() *fetcherMetrics {
return &fetcherMetrics{
synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}),
modified: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"modified"}),
func newTestFetcherMetrics() *FetcherMetrics {
return &FetcherMetrics{
Synced: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}),
Modified: extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"modified"}),
}
}

Expand Down Expand Up @@ -280,14 +280,14 @@ func TestMetaFetcher_Fetch(t *testing.T) {
expectedFailures = 1
}
testutil.Equals(t, float64(i+1), promtest.ToFloat64(baseFetcher.syncs))
testutil.Equals(t, float64(i+1), promtest.ToFloat64(fetcher.metrics.syncs))
testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(loadedMeta)))
testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(noMeta)))
testutil.Equals(t, float64(tcase.expectedFiltered), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues("filtered")))
testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(failedMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, float64(i+1), promtest.ToFloat64(fetcher.metrics.Syncs))
testutil.Equals(t, float64(len(tcase.expectedMetas)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(LoadedMeta)))
testutil.Equals(t, float64(len(tcase.expectedNoMeta)), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(NoMeta)))
testutil.Equals(t, float64(tcase.expectedFiltered), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues("filtered")))
testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, float64(expectedFailures), promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(FailedMeta)))
testutil.Equals(t, 0.0, promtest.ToFloat64(fetcher.metrics.Synced.WithLabelValues(tooFreshMeta)))
}); !ok {
return
}
Expand Down Expand Up @@ -353,9 +353,9 @@ func TestLabelShardedMetaFilter_Filter_Basic(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Ok(t, f.Filter(ctx, input, m.Synced))

testutil.Equals(t, 3.0, promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, 3.0, promtest.ToFloat64(m.Synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, expected, input)

}
Expand Down Expand Up @@ -451,10 +451,10 @@ func TestLabelShardedMetaFilter_Filter_Hashmod(t *testing.T) {
deleted := len(input) - len(expected)

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Ok(t, f.Filter(ctx, input, m.Synced))

testutil.Equals(t, expected, input)
testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.synced.WithLabelValues(labelExcludedMeta)))
testutil.Equals(t, float64(deleted), promtest.ToFloat64(m.Synced.WithLabelValues(labelExcludedMeta)))

})

Expand Down Expand Up @@ -515,9 +515,9 @@ func TestTimePartitionMetaFilter_Filter(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Ok(t, f.Filter(ctx, input, m.Synced))

testutil.Equals(t, 2.0, promtest.ToFloat64(m.synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, 2.0, promtest.ToFloat64(m.Synced.WithLabelValues(timeExcludedMeta)))
testutil.Equals(t, expected, input)

}
Expand Down Expand Up @@ -866,9 +866,9 @@ func TestDeduplicateFilter_Filter(t *testing.T) {
},
}
}
testutil.Ok(t, f.Filter(ctx, metas, m.synced))
testutil.Ok(t, f.Filter(ctx, metas, m.Synced))
compareSliceWithMapKeys(t, metas, tcase.expected)
testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.synced.WithLabelValues(duplicateMeta)))
testutil.Equals(t, float64(inputLen-len(tcase.expected)), promtest.ToFloat64(m.Synced.WithLabelValues(duplicateMeta)))
}); !ok {
return
}
Expand Down Expand Up @@ -935,9 +935,9 @@ func TestReplicaLabelRemover_Modify(t *testing.T) {
},
} {
m := newTestFetcherMetrics()
testutil.Ok(t, tcase.replicaLabelRemover.Modify(ctx, tcase.input, m.modified))
testutil.Ok(t, tcase.replicaLabelRemover.Modify(ctx, tcase.input, m.Modified))

testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.modified.WithLabelValues(replicaRemovedMeta)))
testutil.Equals(t, tcase.modified, promtest.ToFloat64(m.Modified.WithLabelValues(replicaRemovedMeta)))
testutil.Equals(t, tcase.expected, tcase.input)
}
}
Expand Down Expand Up @@ -1041,8 +1041,8 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
f := NewConsistencyDelayMetaFilter(nil, 0*time.Second, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds{}": 0.0}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Equals(t, 0.0, promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta)))
testutil.Ok(t, f.Filter(ctx, input, m.Synced))
testutil.Equals(t, 0.0, promtest.ToFloat64(m.Synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})

Expand All @@ -1066,8 +1066,8 @@ func TestConsistencyDelayMetaFilter_Filter_0(t *testing.T) {
f := NewConsistencyDelayMetaFilter(nil, 30*time.Minute, reg)
testutil.Equals(t, map[string]float64{"consistency_delay_seconds{}": (30 * time.Minute).Seconds()}, extprom.CurrentGaugeValuesFor(t, reg, "consistency_delay_seconds"))

testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.synced.WithLabelValues(tooFreshMeta)))
testutil.Ok(t, f.Filter(ctx, input, m.Synced))
testutil.Equals(t, float64(len(u.created)-len(expected)), promtest.ToFloat64(m.Synced.WithLabelValues(tooFreshMeta)))
testutil.Equals(t, expected, input)
})
}
Expand Down Expand Up @@ -1115,8 +1115,8 @@ func TestIgnoreDeletionMarkFilter_Filter(t *testing.T) {
}

m := newTestFetcherMetrics()
testutil.Ok(t, f.Filter(ctx, input, m.synced))
testutil.Equals(t, 1.0, promtest.ToFloat64(m.synced.WithLabelValues(markedForDeletionMeta)))
testutil.Ok(t, f.Filter(ctx, input, m.Synced))
testutil.Equals(t, 1.0, promtest.ToFloat64(m.Synced.WithLabelValues(MarkedForDeletionMeta)))
testutil.Equals(t, expected, input)
})
}
Expand Down

0 comments on commit 0d86697

Please sign in to comment.