From 68bef3fce0812b01e7970fd1d90c36dbb42f562b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Sun, 19 Nov 2023 05:17:32 +0200 Subject: [PATCH] compact: hook nodownsamplemarkfilter into filters chain (#6893) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We have a NoDownsampleMarkFilter that we were not using before in the compactor for some reason. Hook it into the filters chain if downsampling is enabled and then trim matching ULIDs from the downsampling process. Add a test to cover this scenario. Fixes https://github.com/thanos-io/thanos/issues/6179. Signed-off-by: Giedrius Statkevičius --- cmd/thanos/compact.go | 61 ++++++++++++++++++++++------ cmd/thanos/downsample.go | 2 +- pkg/compact/downsample/downsample.go | 4 +- test/e2e/compact_test.go | 46 +++++++++++++++++++++ 4 files changed, 97 insertions(+), 16 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 7ab7e05639..b813a0d464 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -234,6 +234,7 @@ func runCompact( ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency) noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency) + noDownsampleMarkerFilter := downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency) labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig) consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)) timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime) @@ -260,18 +261,21 @@ func runCompact( sy *compact.Syncer ) { + filters := []block.MetadataFilter{ + timePartitionMetaFilter, + labelShardedMetaFilter, + consistencyDelayMetaFilter, + ignoreDeletionMarkFilter, + block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels), + duplicateBlocksFilter, + noCompactMarkerFilter, + } + if !conf.disableDownsampling { + filters = append(filters, noDownsampleMarkerFilter) + } // Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability. cf := baseMetaFetcher.NewMetaFetcher( - extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ - timePartitionMetaFilter, - labelShardedMetaFilter, - consistencyDelayMetaFilter, - ignoreDeletionMarkFilter, - block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels), - duplicateBlocksFilter, - noCompactMarkerFilter, - }, - ) + extprom.WrapRegistererWithPrefix("thanos_", reg), filters) cf.UpdateOnChange(func(blocks []metadata.Meta, err error) { api.SetLoaded(blocks, err) }) @@ -436,12 +440,30 @@ func runCompact( return errors.Wrap(err, "sync before first pass of downsampling") } - for _, meta := range sy.Metas() { + filteredMetas := sy.Metas() + noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks() + for ul := range noDownsampleBlocks { + delete(filteredMetas, ul) + } + + for _, meta := range filteredMetas { groupKey := meta.Thanos.GroupKey() downsampleMetrics.downsamples.WithLabelValues(groupKey) downsampleMetrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { + + if err := downsampleBucket( + ctx, + logger, + downsampleMetrics, + insBkt, + filteredMetas, + downsamplingDir, + conf.downsampleConcurrency, + conf.blockFilesConcurrency, + metadata.HashFunc(conf.hashFunc), + conf.acceptMalformedIndex, + ); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } @@ -449,9 +471,22 @@ func runCompact( if err := sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { + + if err := downsampleBucket( + ctx, + logger, + downsampleMetrics, + insBkt, + filteredMetas, + downsamplingDir, + conf.downsampleConcurrency, + conf.blockFilesConcurrency, + metadata.HashFunc(conf.hashFunc), + conf.acceptMalformedIndex, + ); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } + level.Info(logger).Log("msg", "downsampling iterations done") } else { level.Info(logger).Log("msg", "downsampling was explicitly disabled") diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 99132705d8..9a3418c171 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -92,7 +92,7 @@ func RunDownsample( // While fetching blocks, filter out blocks that were marked for no downsample. metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(block.FetcherConcurrency), - downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt), + downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency), }) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 829cc8d456..14963602e6 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -808,11 +808,11 @@ type GatherNoDownsampleMarkFilter struct { } // NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter. -func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter { +func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, concurrency int) *GatherNoDownsampleMarkFilter { return &GatherNoDownsampleMarkFilter{ logger: logger, bkt: bkt, - concurrency: 1, + concurrency: concurrency, } } diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index eddeebdf68..e7fc6a0333 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -878,3 +878,49 @@ func ensureGETStatusCode(t testing.TB, code int, url string) { testutil.Ok(t, err) testutil.Equals(t, code, r.StatusCode) } + +func TestCompactorDownsampleIgnoresMarked(t *testing.T) { + now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z") + testutil.Ok(t, err) + + logger := log.NewLogfmtLogger(os.Stderr) + e, err := e2e.NewDockerEnvironment("downsample-mrkd") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + const bucket = "compact-test" + m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + bktCfg := e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()) + bkt, err := s3.NewBucketWithConfig(logger, bktCfg, "test") + testutil.Ok(t, err) + + downsampledBase := blockDesc{ + series: []labels.Labels{ + labels.FromStrings("z", "1", "b", "2"), + labels.FromStrings("z", "1", "b", "5"), + }, + extLset: labels.FromStrings("case", "block-about-to-be-downsampled"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(10 * 24 * time.Hour)), + } + // New block that will be downsampled. + justAfterConsistencyDelay := 30 * time.Minute + + downsampledRawID, err := downsampledBase.Create(context.Background(), dir, justAfterConsistencyDelay, metadata.NoneFunc, 1200) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, downsampledRawID.String()), downsampledRawID.String())) + testutil.Ok(t, block.MarkForNoDownsample(context.Background(), logger, bkt, downsampledRawID, metadata.ManualNoDownsampleReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + + c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()), + }, nil) + testutil.Ok(t, e2e.StartAndWaitReady(c)) + testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics())) + +}