From 1228e113c62f977df19a3edba90a6b83c4ef9a25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Sun, 19 Nov 2023 05:17:32 +0200 Subject: [PATCH] compact: hook nodownsamplemarkfilter into filters chain (#6893) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We have a NoDownsampleMarkFilter that we were not using before in the compactor for some reason. Hook it into the filters chain if downsampling is enabled and then trim matching ULIDs from the downsampling process. Add a test to cover this scenario. Fixes https://github.com/thanos-io/thanos/issues/6179. Signed-off-by: Giedrius Statkevičius --- cmd/thanos/compact.go | 61 ++++++++++++++++++++++------ cmd/thanos/downsample.go | 2 +- pkg/compact/downsample/downsample.go | 4 +- test/e2e/compact_test.go | 46 +++++++++++++++++++++ 4 files changed, 97 insertions(+), 16 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 711e38daec..8155202c76 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -234,6 +234,7 @@ func runCompact( ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency) noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency) + noDownsampleMarkerFilter := downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency) labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig) consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)) timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime) @@ -260,18 +261,21 @@ func runCompact( sy *compact.Syncer ) { + filters := []block.MetadataFilter{ + timePartitionMetaFilter, + labelShardedMetaFilter, + consistencyDelayMetaFilter, + ignoreDeletionMarkFilter, + block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels), + duplicateBlocksFilter, + noCompactMarkerFilter, + } + if !conf.disableDownsampling { + filters = append(filters, noDownsampleMarkerFilter) + } // Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability. cf := baseMetaFetcher.NewMetaFetcher( - extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ - timePartitionMetaFilter, - labelShardedMetaFilter, - consistencyDelayMetaFilter, - ignoreDeletionMarkFilter, - block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels), - duplicateBlocksFilter, - noCompactMarkerFilter, - }, - ) + extprom.WrapRegistererWithPrefix("thanos_", reg), filters) cf.UpdateOnChange(func(blocks []metadata.Meta, err error) { api.SetLoaded(blocks, err) }) @@ -436,12 +440,30 @@ func runCompact( return errors.Wrap(err, "sync before first pass of downsampling") } - for _, meta := range sy.Metas() { + filteredMetas := sy.Metas() + noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks() + for ul := range noDownsampleBlocks { + delete(filteredMetas, ul) + } + + for _, meta := range filteredMetas { groupKey := meta.Thanos.GroupKey() downsampleMetrics.downsamples.WithLabelValues(groupKey) downsampleMetrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { + + if err := downsampleBucket( + ctx, + logger, + downsampleMetrics, + insBkt, + filteredMetas, + downsamplingDir, + conf.downsampleConcurrency, + conf.blockFilesConcurrency, + metadata.HashFunc(conf.hashFunc), + conf.acceptMalformedIndex, + ); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } @@ -449,9 +471,22 @@ func runCompact( if err := sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { + + if err := downsampleBucket( + ctx, + logger, + downsampleMetrics, + insBkt, + filteredMetas, + downsamplingDir, + conf.downsampleConcurrency, + conf.blockFilesConcurrency, + metadata.HashFunc(conf.hashFunc), + conf.acceptMalformedIndex, + ); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } + level.Info(logger).Log("msg", "downsampling iterations done") } else { level.Info(logger).Log("msg", "downsampling was explicitly disabled") diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index af89b79bc8..28076b3270 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -92,7 +92,7 @@ func RunDownsample( // While fetching blocks, filter out blocks that were marked for no downsample. metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(block.FetcherConcurrency), - downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt), + downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency), }) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 78b615f904..72febd1e73 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -798,11 +798,11 @@ type GatherNoDownsampleMarkFilter struct { } // NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter. -func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter { +func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, concurrency int) *GatherNoDownsampleMarkFilter { return &GatherNoDownsampleMarkFilter{ logger: logger, bkt: bkt, - concurrency: 1, + concurrency: concurrency, } } diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 63b6830a90..63c3330cbc 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -864,3 +864,49 @@ func ensureGETStatusCode(t testing.TB, code int, url string) { testutil.Ok(t, err) testutil.Equals(t, code, r.StatusCode) } + +func TestCompactorDownsampleIgnoresMarked(t *testing.T) { + now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z") + testutil.Ok(t, err) + + logger := log.NewLogfmtLogger(os.Stderr) + e, err := e2e.NewDockerEnvironment("downsample-mrkd") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + const bucket = "compact-test" + m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + bktCfg := e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()) + bkt, err := s3.NewBucketWithConfig(logger, bktCfg, "test") + testutil.Ok(t, err) + + downsampledBase := blockDesc{ + series: []labels.Labels{ + labels.FromStrings("z", "1", "b", "2"), + labels.FromStrings("z", "1", "b", "5"), + }, + extLset: labels.FromStrings("case", "block-about-to-be-downsampled"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(10 * 24 * time.Hour)), + } + // New block that will be downsampled. + justAfterConsistencyDelay := 30 * time.Minute + + downsampledRawID, err := downsampledBase.Create(context.Background(), dir, justAfterConsistencyDelay, metadata.NoneFunc, 1200) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, downsampledRawID.String()), downsampledRawID.String())) + testutil.Ok(t, block.MarkForNoDownsample(context.Background(), logger, bkt, downsampledRawID, metadata.ManualNoDownsampleReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + + c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()), + }, nil) + testutil.Ok(t, e2e.StartAndWaitReady(c)) + testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics())) + +}