diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 711e38daec..8155202c76 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -234,6 +234,7 @@ func runCompact( ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency) duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency) noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency) + noDownsampleMarkerFilter := downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency) labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig) consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg)) timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime) @@ -260,18 +261,21 @@ func runCompact( sy *compact.Syncer ) { + filters := []block.MetadataFilter{ + timePartitionMetaFilter, + labelShardedMetaFilter, + consistencyDelayMetaFilter, + ignoreDeletionMarkFilter, + block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels), + duplicateBlocksFilter, + noCompactMarkerFilter, + } + if !conf.disableDownsampling { + filters = append(filters, noDownsampleMarkerFilter) + } // Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability. cf := baseMetaFetcher.NewMetaFetcher( - extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ - timePartitionMetaFilter, - labelShardedMetaFilter, - consistencyDelayMetaFilter, - ignoreDeletionMarkFilter, - block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels), - duplicateBlocksFilter, - noCompactMarkerFilter, - }, - ) + extprom.WrapRegistererWithPrefix("thanos_", reg), filters) cf.UpdateOnChange(func(blocks []metadata.Meta, err error) { api.SetLoaded(blocks, err) }) @@ -436,12 +440,30 @@ func runCompact( return errors.Wrap(err, "sync before first pass of downsampling") } - for _, meta := range sy.Metas() { + filteredMetas := sy.Metas() + noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks() + for ul := range noDownsampleBlocks { + delete(filteredMetas, ul) + } + + for _, meta := range filteredMetas { groupKey := meta.Thanos.GroupKey() downsampleMetrics.downsamples.WithLabelValues(groupKey) downsampleMetrics.downsampleFailures.WithLabelValues(groupKey) } - if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { + + if err := downsampleBucket( + ctx, + logger, + downsampleMetrics, + insBkt, + filteredMetas, + downsamplingDir, + conf.downsampleConcurrency, + conf.blockFilesConcurrency, + metadata.HashFunc(conf.hashFunc), + conf.acceptMalformedIndex, + ); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } @@ -449,9 +471,22 @@ func runCompact( if err := sy.SyncMetas(ctx); err != nil { return errors.Wrap(err, "sync before second pass of downsampling") } - if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil { + + if err := downsampleBucket( + ctx, + logger, + downsampleMetrics, + insBkt, + filteredMetas, + downsamplingDir, + conf.downsampleConcurrency, + conf.blockFilesConcurrency, + metadata.HashFunc(conf.hashFunc), + conf.acceptMalformedIndex, + ); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } + level.Info(logger).Log("msg", "downsampling iterations done") } else { level.Info(logger).Log("msg", "downsampling was explicitly disabled") diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index af89b79bc8..28076b3270 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -92,7 +92,7 @@ func RunDownsample( // While fetching blocks, filter out blocks that were marked for no downsample. metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{ block.NewDeduplicateFilter(block.FetcherConcurrency), - downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt), + downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency), }) if err != nil { return errors.Wrap(err, "create meta fetcher") diff --git a/pkg/compact/downsample/downsample.go b/pkg/compact/downsample/downsample.go index 78b615f904..72febd1e73 100644 --- a/pkg/compact/downsample/downsample.go +++ b/pkg/compact/downsample/downsample.go @@ -798,11 +798,11 @@ type GatherNoDownsampleMarkFilter struct { } // NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter. -func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter { +func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, concurrency int) *GatherNoDownsampleMarkFilter { return &GatherNoDownsampleMarkFilter{ logger: logger, bkt: bkt, - concurrency: 1, + concurrency: concurrency, } } diff --git a/test/e2e/compact_test.go b/test/e2e/compact_test.go index 63b6830a90..63c3330cbc 100644 --- a/test/e2e/compact_test.go +++ b/test/e2e/compact_test.go @@ -864,3 +864,49 @@ func ensureGETStatusCode(t testing.TB, code int, url string) { testutil.Ok(t, err) testutil.Equals(t, code, r.StatusCode) } + +func TestCompactorDownsampleIgnoresMarked(t *testing.T) { + now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z") + testutil.Ok(t, err) + + logger := log.NewLogfmtLogger(os.Stderr) + e, err := e2e.NewDockerEnvironment("downsample-mrkd") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + dir := filepath.Join(e.SharedDir(), "tmp") + testutil.Ok(t, os.MkdirAll(dir, os.ModePerm)) + + const bucket = "compact-test" + m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS()) + testutil.Ok(t, e2e.StartAndWaitReady(m)) + + bktCfg := e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir()) + bkt, err := s3.NewBucketWithConfig(logger, bktCfg, "test") + testutil.Ok(t, err) + + downsampledBase := blockDesc{ + series: []labels.Labels{ + labels.FromStrings("z", "1", "b", "2"), + labels.FromStrings("z", "1", "b", "5"), + }, + extLset: labels.FromStrings("case", "block-about-to-be-downsampled"), + mint: timestamp.FromTime(now), + maxt: timestamp.FromTime(now.Add(10 * 24 * time.Hour)), + } + // New block that will be downsampled. + justAfterConsistencyDelay := 30 * time.Minute + + downsampledRawID, err := downsampledBase.Create(context.Background(), dir, justAfterConsistencyDelay, metadata.NoneFunc, 1200) + testutil.Ok(t, err) + testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, downsampledRawID.String()), downsampledRawID.String())) + testutil.Ok(t, block.MarkForNoDownsample(context.Background(), logger, bkt, downsampledRawID, metadata.ManualNoDownsampleReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))) + + c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{ + Type: client.S3, + Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()), + }, nil) + testutil.Ok(t, e2e.StartAndWaitReady(c)) + testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics())) + +}