From e9ecbd693a61d064b6cd81d878d7268f5657e3fe Mon Sep 17 00:00:00 2001 From: khyatisoneji Date: Fri, 17 Jan 2020 21:19:24 +0530 Subject: [PATCH] store: add consistency delay to fetch blocks Signed-off-by: khyatisoneji --- cmd/thanos/compact.go | 26 +---- cmd/thanos/store.go | 19 ++++ docs/components/store.md | 4 + pkg/block/fetcher.go | 29 ++++++ pkg/compact/compact.go | 66 +------------ pkg/compact/compact_e2e_test.go | 4 +- .../garbagecollector/garbage_collector.go | 95 +++++++++++++++++++ 7 files changed, 158 insertions(+), 85 deletions(-) create mode 100644 pkg/compact/garbagecollector/garbage_collector.go diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 4ff22f0020b..81217157cc6 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" - "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/compact/garbagecollector" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" @@ -242,13 +242,14 @@ func runCompact( metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), block.NewLabelShardedMetaFilter(relabelConfig).Filter, - (&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter, + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay).Filter, ) if err != nil { return errors.Wrap(err, "create meta fetcher") } - sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false) + garbageBlocksFinder := garbagecollector.NewGarbageBlocksFinder() + sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, garbageBlocksFinder, blockSyncConcurrency, acceptMalformedIndex, false) if err != nil { return errors.Wrap(err, "create syncer") } @@ -385,25 +386,6 @@ func runCompact( return nil } -type consistencyDelayMetaFilter struct { - logger log.Logger - consistencyDelay time.Duration -} - -func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) { - for id, meta := range metas { - if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) && - meta.Thanos.Source != metadata.BucketRepairSource && - meta.Thanos.Source != metadata.CompactorSource && - meta.Thanos.Source != metadata.CompactorRepairSource { - - level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) - synced.WithLabelValues(block.TooFreshMeta).Inc() - delete(metas, id) - } - } -} - // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error { genIndex := prometheus.NewCounter(prometheus.CounterOpts{ diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 6937b9f4dfa..306304eafbd 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "time" "github.com/go-kit/kit/log" @@ -12,6 +13,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/relabel" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/compact/garbagecollector" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" @@ -75,6 +78,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf := regSelectorRelabelFlags(cmd) + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)). + Default("30m")) + m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", @@ -109,6 +115,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { }, selectorRelabelConf, *advertiseCompatibilityLabel, + time.Duration(*consistencyDelay), ) } } @@ -140,7 +147,16 @@ func runStore( filterConf *store.FilterConfig, selectorRelabelConf *extflag.PathOrContent, advertiseCompatibilityLabel bool, + consistencyDelay time.Duration, ) error { + consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "thanos_consistency_delay_seconds", + Help: "Configured consistency delay in seconds.", + }, func() float64 { + return consistencyDelay.Seconds() + }) + reg.MustRegister(consistencyDelayMetric) + // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) srv := httpserver.New(logger, reg, component, statusProber, @@ -206,9 +222,12 @@ func runStore( return errors.Wrap(err, "create index cache") } + garbageBlocksFinder := garbagecollector.NewGarbageBlocksFinder() metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, block.NewLabelShardedMetaFilter(relabelConfig).Filter, + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay).Filter, + garbagecollector.NewGarbageBlocksFilter(garbageBlocksFinder).Filter, ) if err != nil { return errors.Wrap(err, "meta fetcher") diff --git a/docs/components/store.md b/docs/components/store.md index f071e9475ec..cc216e2aa93 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -135,6 +135,10 @@ Flags: Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config + --consistency-delay=30m Minimum age of fresh (non-compacted) blocks + before they are being processed. Malformed + blocks older than the maximum of + consistency-delay and 48h0m0s will be removed. ``` diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 3fc658e3ba5..70433e1fde5 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -407,3 +407,32 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync delete(metas, id) } } + +// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay. +type ConsistencyDelayMetaFilter struct { + logger log.Logger + consistencyDelay time.Duration +} + +// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter. +func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration) *ConsistencyDelayMetaFilter { + return &ConsistencyDelayMetaFilter{ + logger: logger, + consistencyDelay: consistencyDelay, + } +} + +// Filter filters out blocks that filters blocks that have are created before a specified consistency delay. +func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { + for id, meta := range metas { + if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) && + meta.Thanos.Source != metadata.BucketRepairSource && + meta.Thanos.Source != metadata.CompactorSource && + meta.Thanos.Source != metadata.CompactorRepairSource { + + level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) + synced.WithLabelValues(TooFreshMeta).Inc() + delete(metas, id) + } + } +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index b9f5332fd6d..02c4a2610f7 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/compact/garbagecollector" "github.com/thanos-io/thanos/pkg/objstore" ) @@ -46,6 +47,7 @@ type Syncer struct { metrics *syncerMetrics acceptMalformedIndex bool enableVerticalCompaction bool + garbageBlocksFinder *garbagecollector.GarbageBlocksFinder } type syncerMetrics struct { @@ -120,7 +122,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, garbageBlocksFinder *garbagecollector.GarbageBlocksFinder, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -131,6 +133,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket fetcher: fetcher, blocks: map[ulid.ULID]*metadata.Meta{}, metrics: newSyncerMetrics(reg), + garbageBlocksFinder: garbageBlocksFinder, blockSyncConcurrency: blockSyncConcurrency, acceptMalformedIndex: acceptMalformedIndex, // The syncer offers an option to enable vertical compaction, even if it's @@ -245,67 +248,8 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { return nil } -func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { - // Map each block to its highest priority parent. Initial blocks have themselves - // in their source section, i.e. are their own parent. - parents := map[ulid.ULID]ulid.ULID{} - - for id, meta := range s.blocks { - // Skip any block that has a different resolution. - if meta.Thanos.Downsample.Resolution != resolution { - continue - } - - // For each source block we contain, check whether we are the highest priority parent block. - for _, sid := range meta.Compaction.Sources { - pid, ok := parents[sid] - // No parents for the source block so far. - if !ok { - parents[sid] = id - continue - } - pmeta, ok := s.blocks[pid] - if !ok { - return nil, errors.Errorf("previous parent block %s not found", pid) - } - // The current block is the higher priority parent for the source if its - // compaction level is higher than that of the previously set parent. - // If compaction levels are equal, the more recent ULID wins. - // - // The ULID recency alone is not sufficient since races, e.g. induced - // by downtime of garbage collection, may re-compact blocks that are - // were already compacted into higher-level blocks multiple times. - level, plevel := meta.Compaction.Level, pmeta.Compaction.Level - - if level > plevel || (level == plevel && id.Compare(pid) > 0) { - parents[sid] = id - } - } - } - - // A block can safely be deleted if they are not the highest priority parent for - // any source block. - topParents := map[ulid.ULID]struct{}{} - for _, pid := range parents { - topParents[pid] = struct{}{} - } - - for id, meta := range s.blocks { - // Skip any block that has a different resolution. - if meta.Thanos.Downsample.Resolution != resolution { - continue - } - if _, ok := topParents[id]; ok { - continue - } - - ids = append(ids, id) - } - return ids, nil -} - func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error { - garbageIds, err := s.GarbageBlocks(resolution) + garbageIds, err := s.garbageBlocksFinder.GarbageBlocks(resolution, s.blocks) if err != nil { return err } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 926175dc689..5309cad384b 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -90,7 +90,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) testutil.Ok(t, err) - sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 1, false, false) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, nil, 1, false, false) testutil.Ok(t, err) // Do one initial synchronization with the bucket. @@ -163,7 +163,7 @@ func TestGroup_Compact_e2e(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) testutil.Ok(t, err) - sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 5, false, false) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, nil, 5, false, false) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) diff --git a/pkg/compact/garbagecollector/garbage_collector.go b/pkg/compact/garbagecollector/garbage_collector.go new file mode 100644 index 00000000000..a00de892bea --- /dev/null +++ b/pkg/compact/garbagecollector/garbage_collector.go @@ -0,0 +1,95 @@ +package garbagecollector + +import ( + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact/downsample" +) + +type GarbageBlocksFinder struct{} + +func NewGarbageBlocksFinder() *GarbageBlocksFinder { + return &GarbageBlocksFinder{} +} + +func (s *GarbageBlocksFinder) GarbageBlocks(resolution int64, blocks map[ulid.ULID]*metadata.Meta) (ids []ulid.ULID, err error) { + // Map each block to its highest priority parent. Initial blocks have themselves + // in their source section, i.e. are their own parent. + parents := map[ulid.ULID]ulid.ULID{} + + for id, meta := range blocks { + // Skip any block that has a different resolution. + if meta.Thanos.Downsample.Resolution != resolution { + continue + } + + // For each source block we contain, check whether we are the highest priority parent block. + for _, sid := range meta.Compaction.Sources { + pid, ok := parents[sid] + // No parents for the source block so far. + if !ok { + parents[sid] = id + continue + } + pmeta, ok := blocks[pid] + if !ok { + return nil, errors.Errorf("previous parent block %s not found", pid) + } + // The current block is the higher priority parent for the source if its + // compaction level is higher than that of the previously set parent. + // If compaction levels are equal, the more recent ULID wins. + // + // The ULID recency alone is not sufficient since races, e.g. induced + // by downtime of garbage collection, may re-compact blocks that are + // were already compacted into higher-level blocks multiple times. + level, plevel := meta.Compaction.Level, pmeta.Compaction.Level + + if level > plevel || (level == plevel && id.Compare(pid) > 0) { + parents[sid] = id + } + } + } + + // A block can safely be deleted if they are not the highest priority parent for + // any source block. + topParents := map[ulid.ULID]struct{}{} + for _, pid := range parents { + topParents[pid] = struct{}{} + } + + for id, meta := range blocks { + // Skip any block that has a different resolution. + if meta.Thanos.Downsample.Resolution != resolution { + continue + } + if _, ok := topParents[id]; ok { + continue + } + + ids = append(ids, id) + } + return ids, nil +} + +type GarbageBlocksFilter struct { + garbageBlocksFinder *GarbageBlocksFinder +} + +func NewGarbageBlocksFilter(garbageBlocksFinder *GarbageBlocksFinder) *GarbageBlocksFilter { + return &GarbageBlocksFilter{ + garbageBlocksFinder: garbageBlocksFinder, + } +} + +func (f *GarbageBlocksFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) { + for _, res := range []int64{ + downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2, + } { + garbageIds, _ := f.garbageBlocksFinder.GarbageBlocks(res, metas) + for _, id := range garbageIds { + delete(metas, id) + } + } +}