From 3c68464a3cc52bdeda2d2ad9c85da85c42b5e304 Mon Sep 17 00:00:00 2001 From: khyatisoneji Date: Fri, 17 Jan 2020 21:19:24 +0530 Subject: [PATCH] store: add consistency delay to fetch blocks Signed-off-by: khyatisoneji --- cmd/thanos/compact.go | 37 ++------ cmd/thanos/store.go | 12 ++- docs/components/store.md | 2 + pkg/block/fetcher.go | 37 ++++++++ pkg/compact/compact.go | 66 +------------ pkg/compact/compact_e2e_test.go | 10 +- .../garbagecollector/garbage_collector.go | 95 +++++++++++++++++++ pkg/testutil/prometheus.go | 26 ++++- test/e2e/store_gateway_test.go | 10 +- 9 files changed, 195 insertions(+), 100 deletions(-) create mode 100644 pkg/compact/garbagecollector/garbage_collector.go diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 5832625158d..2a2d826ef49 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" - "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/compact/garbagecollector" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" @@ -181,17 +181,11 @@ func runCompact( Name: "thanos_compactor_iterations_total", Help: "Total number of iterations that were executed successfully.", }) - consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ - Name: "thanos_consistency_delay_seconds", - Help: "Configured consistency delay in seconds.", - }, func() float64 { - return consistencyDelay.Seconds() - }) partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total", Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.", }) - reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts) + reg.MustRegister(halted, retried, iterations, partialUploadDeleteAttempts) downsampleMetrics := newDownsampleMetrics(reg) @@ -240,15 +234,17 @@ func runCompact( } }() - metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), + prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", prometheusRegisterer, block.NewLabelShardedMetaFilter(relabelConfig).Filter, - (&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter, + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, ) if err != nil { return errors.Wrap(err, "create meta fetcher") } - sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false) + garbageBlocksFinder := garbagecollector.NewGarbageBlocksFinder() + sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, garbageBlocksFinder, blockSyncConcurrency, acceptMalformedIndex, false) if err != nil { return errors.Wrap(err, "create syncer") } @@ -385,25 +381,6 @@ func runCompact( return nil } -type consistencyDelayMetaFilter struct { - logger log.Logger - consistencyDelay time.Duration -} - -func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) { - for id, meta := range metas { - if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) && - meta.Thanos.Source != metadata.BucketRepairSource && - meta.Thanos.Source != metadata.CompactorSource && - meta.Thanos.Source != metadata.CompactorRepairSource { - - level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) - synced.WithLabelValues(block.TooFreshMeta).Inc() - delete(metas, id) - } - } -} - // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error { genIndex := prometheus.NewCounter(prometheus.CounterOpts{ diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index 5e19037be18..49ce8c9c1a2 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/relabel" "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/compact/garbagecollector" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" "github.com/thanos-io/thanos/pkg/extprom" @@ -78,6 +79,9 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { enableIndexHeader := cmd.Flag("experimental.enable-index-header", "If true, Store Gateway will recreate index-header instead of index-cache.json for each block. This will replace index-cache.json permanently once it will be out of experimental stage."). Hidden().Default("false").Bool() + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", "Minimum age of fresh (non-compacted) blocks before they are being processed."). + Default("30m")) + m[component.Store.String()] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, debugLogging bool) error { if minTime.PrometheusTimestamp() > maxTime.PrometheusTimestamp() { return errors.Errorf("invalid argument: --min-time '%s' can't be greater than --max-time '%s'", @@ -113,6 +117,7 @@ func registerStore(m map[string]setupFunc, app *kingpin.Application) { selectorRelabelConf, *advertiseCompatibilityLabel, *enableIndexHeader, + time.Duration(*consistencyDelay), ) } } @@ -145,6 +150,7 @@ func runStore( selectorRelabelConf *extflag.PathOrContent, advertiseCompatibilityLabel bool, enableIndexHeader bool, + consistencyDelay time.Duration, ) error { // Initiate HTTP listener providing metrics endpoint and readiness/liveness probes. statusProber := prober.New(component, logger, prometheus.WrapRegistererWithPrefix("thanos_", reg)) @@ -211,9 +217,13 @@ func runStore( return errors.Wrap(err, "create index cache") } - metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, extprom.WrapRegistererWithPrefix("thanos_", reg), + garbageBlocksFinder := garbagecollector.NewGarbageBlocksFinder() + prometheusRegisterer := extprom.WrapRegistererWithPrefix("thanos_", reg) + metaFetcher, err := block.NewMetaFetcher(logger, fetcherConcurrency, bkt, dataDir, prometheusRegisterer, block.NewTimePartitionMetaFilter(filterConf.MinTime, filterConf.MaxTime).Filter, block.NewLabelShardedMetaFilter(relabelConfig).Filter, + block.NewConsistencyDelayMetaFilter(logger, consistencyDelay, prometheusRegisterer).Filter, + garbagecollector.NewGarbageBlocksFilter(garbageBlocksFinder).Filter, ) if err != nil { return errors.Wrap(err, "meta fetcher") diff --git a/docs/components/store.md b/docs/components/store.md index 2cc130d9e97..99b32f3addb 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -135,6 +135,8 @@ Flags: Prometheus relabel-config syntax. See format details: https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config + --consistency-delay=30m Minimum age of fresh (non-compacted) blocks + before they are being processed. ``` diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 3fc658e3ba5..7ffba097764 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -407,3 +407,40 @@ func (f *LabelShardedMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, sync delete(metas, id) } } + +// ConsistencyDelayMetaFilter is a MetaFetcher filter that filters out blocks that are created before a specified consistency delay. +type ConsistencyDelayMetaFilter struct { + logger log.Logger + consistencyDelay time.Duration +} + +// NewConsistencyDelayMetaFilter creates ConsistencyDelayMetaFilter. +func NewConsistencyDelayMetaFilter(logger log.Logger, consistencyDelay time.Duration, reg prometheus.Registerer) *ConsistencyDelayMetaFilter { + consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "consistency_delay_seconds", + Help: "Configured consistency delay in seconds.", + }, func() float64 { + return consistencyDelay.Seconds() + }) + reg.MustRegister(consistencyDelayMetric) + + return &ConsistencyDelayMetaFilter{ + logger: logger, + consistencyDelay: consistencyDelay, + } +} + +// Filter filters out blocks that filters blocks that have are created before a specified consistency delay. +func (f *ConsistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced GaugeLabeled, _ bool) { + for id, meta := range metas { + if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) && + meta.Thanos.Source != metadata.BucketRepairSource && + meta.Thanos.Source != metadata.CompactorSource && + meta.Thanos.Source != metadata.CompactorRepairSource { + + level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) + synced.WithLabelValues(TooFreshMeta).Inc() + delete(metas, id) + } + } +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index b9f5332fd6d..02c4a2610f7 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/indexheader" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact/downsample" + "github.com/thanos-io/thanos/pkg/compact/garbagecollector" "github.com/thanos-io/thanos/pkg/objstore" ) @@ -46,6 +47,7 @@ type Syncer struct { metrics *syncerMetrics acceptMalformedIndex bool enableVerticalCompaction bool + garbageBlocksFinder *garbagecollector.GarbageBlocksFinder } type syncerMetrics struct { @@ -120,7 +122,7 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, garbageBlocksFinder *garbagecollector.GarbageBlocksFinder, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -131,6 +133,7 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket fetcher: fetcher, blocks: map[ulid.ULID]*metadata.Meta{}, metrics: newSyncerMetrics(reg), + garbageBlocksFinder: garbageBlocksFinder, blockSyncConcurrency: blockSyncConcurrency, acceptMalformedIndex: acceptMalformedIndex, // The syncer offers an option to enable vertical compaction, even if it's @@ -245,67 +248,8 @@ func (s *Syncer) GarbageCollect(ctx context.Context) error { return nil } -func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { - // Map each block to its highest priority parent. Initial blocks have themselves - // in their source section, i.e. are their own parent. - parents := map[ulid.ULID]ulid.ULID{} - - for id, meta := range s.blocks { - // Skip any block that has a different resolution. - if meta.Thanos.Downsample.Resolution != resolution { - continue - } - - // For each source block we contain, check whether we are the highest priority parent block. - for _, sid := range meta.Compaction.Sources { - pid, ok := parents[sid] - // No parents for the source block so far. - if !ok { - parents[sid] = id - continue - } - pmeta, ok := s.blocks[pid] - if !ok { - return nil, errors.Errorf("previous parent block %s not found", pid) - } - // The current block is the higher priority parent for the source if its - // compaction level is higher than that of the previously set parent. - // If compaction levels are equal, the more recent ULID wins. - // - // The ULID recency alone is not sufficient since races, e.g. induced - // by downtime of garbage collection, may re-compact blocks that are - // were already compacted into higher-level blocks multiple times. - level, plevel := meta.Compaction.Level, pmeta.Compaction.Level - - if level > plevel || (level == plevel && id.Compare(pid) > 0) { - parents[sid] = id - } - } - } - - // A block can safely be deleted if they are not the highest priority parent for - // any source block. - topParents := map[ulid.ULID]struct{}{} - for _, pid := range parents { - topParents[pid] = struct{}{} - } - - for id, meta := range s.blocks { - // Skip any block that has a different resolution. - if meta.Thanos.Downsample.Resolution != resolution { - continue - } - if _, ok := topParents[id]; ok { - continue - } - - ids = append(ids, id) - } - return ids, nil -} - func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error { - garbageIds, err := s.GarbageBlocks(resolution) + garbageIds, err := s.garbageBlocksFinder.GarbageBlocks(resolution, s.blocks) if err != nil { return err } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 926175dc689..077f48a85e1 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -29,6 +29,10 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) +const ( + BlockDelay = 30 * time.Minute +) + func TestSyncer_GarbageCollect_e2e(t *testing.T) { objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) @@ -90,7 +94,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) testutil.Ok(t, err) - sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 1, false, false) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, nil, 1, false, false) testutil.Ok(t, err) // Do one initial synchronization with the bucket. @@ -163,7 +167,7 @@ func TestGroup_Compact_e2e(t *testing.T) { metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) testutil.Ok(t, err) - sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 5, false, false) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, nil, 5, false, false) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) @@ -380,7 +384,7 @@ func createAndUpload(t testing.TB, bkt objstore.Bucket, blocks []blockgenSpec) ( if b.numSamples == 0 { id, err = createEmptyBlock(prepareDir, b.mint, b.maxt, b.extLset, b.res) } else { - id, err = testutil.CreateBlock(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, b.extLset, b.res) + id, err = testutil.CreateBlockWithBlockDelay(ctx, prepareDir, b.series, b.numSamples, b.mint, b.maxt, BlockDelay, b.extLset, b.res) } testutil.Ok(t, err) diff --git a/pkg/compact/garbagecollector/garbage_collector.go b/pkg/compact/garbagecollector/garbage_collector.go new file mode 100644 index 00000000000..a00de892bea --- /dev/null +++ b/pkg/compact/garbagecollector/garbage_collector.go @@ -0,0 +1,95 @@ +package garbagecollector + +import ( + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact/downsample" +) + +type GarbageBlocksFinder struct{} + +func NewGarbageBlocksFinder() *GarbageBlocksFinder { + return &GarbageBlocksFinder{} +} + +func (s *GarbageBlocksFinder) GarbageBlocks(resolution int64, blocks map[ulid.ULID]*metadata.Meta) (ids []ulid.ULID, err error) { + // Map each block to its highest priority parent. Initial blocks have themselves + // in their source section, i.e. are their own parent. + parents := map[ulid.ULID]ulid.ULID{} + + for id, meta := range blocks { + // Skip any block that has a different resolution. + if meta.Thanos.Downsample.Resolution != resolution { + continue + } + + // For each source block we contain, check whether we are the highest priority parent block. + for _, sid := range meta.Compaction.Sources { + pid, ok := parents[sid] + // No parents for the source block so far. + if !ok { + parents[sid] = id + continue + } + pmeta, ok := blocks[pid] + if !ok { + return nil, errors.Errorf("previous parent block %s not found", pid) + } + // The current block is the higher priority parent for the source if its + // compaction level is higher than that of the previously set parent. + // If compaction levels are equal, the more recent ULID wins. + // + // The ULID recency alone is not sufficient since races, e.g. induced + // by downtime of garbage collection, may re-compact blocks that are + // were already compacted into higher-level blocks multiple times. + level, plevel := meta.Compaction.Level, pmeta.Compaction.Level + + if level > plevel || (level == plevel && id.Compare(pid) > 0) { + parents[sid] = id + } + } + } + + // A block can safely be deleted if they are not the highest priority parent for + // any source block. + topParents := map[ulid.ULID]struct{}{} + for _, pid := range parents { + topParents[pid] = struct{}{} + } + + for id, meta := range blocks { + // Skip any block that has a different resolution. + if meta.Thanos.Downsample.Resolution != resolution { + continue + } + if _, ok := topParents[id]; ok { + continue + } + + ids = append(ids, id) + } + return ids, nil +} + +type GarbageBlocksFilter struct { + garbageBlocksFinder *GarbageBlocksFinder +} + +func NewGarbageBlocksFilter(garbageBlocksFinder *GarbageBlocksFinder) *GarbageBlocksFilter { + return &GarbageBlocksFilter{ + garbageBlocksFinder: garbageBlocksFinder, + } +} + +func (f *GarbageBlocksFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) { + for _, res := range []int64{ + downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2, + } { + garbageIds, _ := f.garbageBlocksFinder.GarbageBlocks(res, metas) + for _, id := range garbageIds { + delete(metas, id) + } + } +} diff --git a/pkg/testutil/prometheus.go b/pkg/testutil/prometheus.go index 505320fcaae..a98a290f32d 100644 --- a/pkg/testutil/prometheus.go +++ b/pkg/testutil/prometheus.go @@ -282,7 +282,23 @@ func CreateBlock( extLset labels.Labels, resolution int64, ) (id ulid.ULID, err error) { - return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, false) + return createBlock(ctx, dir, series, numSamples, mint, maxt, 0, extLset, resolution, false) +} + +// CreateBlockWithBlockDelay writes a block with the given series and numSamples samples each. +// Samples will be in the time range [mint, maxt) +// Block ID will be created with a delay of time duration blockDelay. +func CreateBlockWithBlockDelay( + ctx context.Context, + dir string, + series []labels.Labels, + numSamples int, + mint, maxt int64, + blockDelay time.Duration, + extLset labels.Labels, + resolution int64, +) (id ulid.ULID, err error) { + return createBlock(ctx, dir, series, numSamples, mint, maxt, blockDelay, extLset, resolution, false) } // CreateBlockWithTombstone is same as CreateBlock but leaves tombstones which mimics the Prometheus local block. @@ -295,7 +311,7 @@ func CreateBlockWithTombstone( extLset labels.Labels, resolution int64, ) (id ulid.ULID, err error) { - return createBlock(ctx, dir, series, numSamples, mint, maxt, extLset, resolution, true) + return createBlock(ctx, dir, series, numSamples, mint, maxt, 0, extLset, resolution, true) } func createBlock( @@ -304,10 +320,16 @@ func createBlock( series []labels.Labels, numSamples int, mint, maxt int64, + blockDelay time.Duration, extLset labels.Labels, resolution int64, tombstones bool, ) (id ulid.ULID, err error) { + id, err = ulid.New(uint64(time.Now().Add(blockDelay).Unix()*1000), nil) + if err != nil { + return id, errors.Wrap(err, "create block id") + } + h, err := tsdb.NewHead(nil, nil, nil, 10000000000) if err != nil { return id, errors.Wrap(err, "create head block") diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index b3df6d1f8fe..b036c4c1ac7 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -22,6 +22,10 @@ import ( yaml "gopkg.in/yaml.v2" ) +const ( + BlockDelay = 30 * time.Minute +) + func TestStoreGateway(t *testing.T) { a := newLocalAddresser() minioAddr := a.New() @@ -76,13 +80,13 @@ func TestStoreGateway(t *testing.T) { extLset3 := labels.FromStrings("ext1", "value2", "replica", "3") now := time.Now() - id1, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset, 0) + id1, err := testutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), BlockDelay, extLset, 0) testutil.Ok(t, err) - id2, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset2, 0) + id2, err := testutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), BlockDelay, extLset2, 0) testutil.Ok(t, err) - id3, err := testutil.CreateBlock(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), extLset3, 0) + id3, err := testutil.CreateBlockWithBlockDelay(ctx, dir, series, 10, timestamp.FromTime(now), timestamp.FromTime(now.Add(2*time.Hour)), BlockDelay, extLset3, 0) testutil.Ok(t, err) l := log.NewLogfmtLogger(os.Stdout)