Skip to content

Commit

Permalink
compact: hook nodownsamplemarkfilter into filters chain (thanos-io#6893)
Browse files Browse the repository at this point in the history
We have a NoDownsampleMarkFilter that we were not using before in the
compactor for some reason. Hook it into the filters chain if
downsampling is enabled and then trim matching ULIDs from the
downsampling process. Add a test to cover this scenario.

Fixes thanos-io#6179.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Nov 23, 2023
1 parent af907c1 commit 1228e11
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 16 deletions.
61 changes: 48 additions & 13 deletions cmd/thanos/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func runCompact(
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, insBkt, deleteDelay/2, conf.blockMetaFetchConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(conf.blockMetaFetchConcurrency)
noCompactMarkerFilter := compact.NewGatherNoCompactionMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
noDownsampleMarkerFilter := downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, conf.blockMetaFetchConcurrency)
labelShardedMetaFilter := block.NewLabelShardedMetaFilter(relabelConfig)
consistencyDelayMetaFilter := block.NewConsistencyDelayMetaFilter(logger, conf.consistencyDelay, extprom.WrapRegistererWithPrefix("thanos_", reg))
timePartitionMetaFilter := block.NewTimePartitionMetaFilter(conf.filterConf.MinTime, conf.filterConf.MaxTime)
Expand All @@ -260,18 +261,21 @@ func runCompact(
sy *compact.Syncer
)
{
filters := []block.MetadataFilter{
timePartitionMetaFilter,
labelShardedMetaFilter,
consistencyDelayMetaFilter,
ignoreDeletionMarkFilter,
block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels),
duplicateBlocksFilter,
noCompactMarkerFilter,
}
if !conf.disableDownsampling {
filters = append(filters, noDownsampleMarkerFilter)
}
// Make sure all compactor meta syncs are done through Syncer.SyncMeta for readability.
cf := baseMetaFetcher.NewMetaFetcher(
extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
timePartitionMetaFilter,
labelShardedMetaFilter,
consistencyDelayMetaFilter,
ignoreDeletionMarkFilter,
block.NewReplicaLabelRemover(logger, conf.dedupReplicaLabels),
duplicateBlocksFilter,
noCompactMarkerFilter,
},
)
extprom.WrapRegistererWithPrefix("thanos_", reg), filters)
cf.UpdateOnChange(func(blocks []metadata.Meta, err error) {
api.SetLoaded(blocks, err)
})
Expand Down Expand Up @@ -436,22 +440,53 @@ func runCompact(
return errors.Wrap(err, "sync before first pass of downsampling")
}

for _, meta := range sy.Metas() {
filteredMetas := sy.Metas()
noDownsampleBlocks := noDownsampleMarkerFilter.NoDownsampleMarkedBlocks()
for ul := range noDownsampleBlocks {
delete(filteredMetas, ul)
}

for _, meta := range filteredMetas {
groupKey := meta.Thanos.GroupKey()
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {

if err := downsampleBucket(
ctx,
logger,
downsampleMetrics,
insBkt,
filteredMetas,
downsamplingDir,
conf.downsampleConcurrency,
conf.blockFilesConcurrency,
metadata.HashFunc(conf.hashFunc),
conf.acceptMalformedIndex,
); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, insBkt, sy.Metas(), downsamplingDir, conf.downsampleConcurrency, conf.blockFilesConcurrency, metadata.HashFunc(conf.hashFunc), conf.acceptMalformedIndex); err != nil {

if err := downsampleBucket(
ctx,
logger,
downsampleMetrics,
insBkt,
filteredMetas,
downsamplingDir,
conf.downsampleConcurrency,
conf.blockFilesConcurrency,
metadata.HashFunc(conf.hashFunc),
conf.acceptMalformedIndex,
); err != nil {
return errors.Wrap(err, "second pass of downsampling failed")
}

level.Info(logger).Log("msg", "downsampling iterations done")
} else {
level.Info(logger).Log("msg", "downsampling was explicitly disabled")
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func RunDownsample(
// While fetching blocks, filter out blocks that were marked for no downsample.
metaFetcher, err := block.NewMetaFetcher(logger, block.FetcherConcurrency, insBkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), []block.MetadataFilter{
block.NewDeduplicateFilter(block.FetcherConcurrency),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt),
downsample.NewGatherNoDownsampleMarkFilter(logger, insBkt, block.FetcherConcurrency),
})
if err != nil {
return errors.Wrap(err, "create meta fetcher")
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/downsample/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,11 +798,11 @@ type GatherNoDownsampleMarkFilter struct {
}

// NewGatherNoDownsampleMarkFilter creates GatherNoDownsampleMarkFilter.
func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader) *GatherNoDownsampleMarkFilter {
func NewGatherNoDownsampleMarkFilter(logger log.Logger, bkt objstore.InstrumentedBucketReader, concurrency int) *GatherNoDownsampleMarkFilter {
return &GatherNoDownsampleMarkFilter{
logger: logger,
bkt: bkt,
concurrency: 1,
concurrency: concurrency,
}
}

Expand Down
46 changes: 46 additions & 0 deletions test/e2e/compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,3 +864,49 @@ func ensureGETStatusCode(t testing.TB, code int, url string) {
testutil.Ok(t, err)
testutil.Equals(t, code, r.StatusCode)
}

func TestCompactorDownsampleIgnoresMarked(t *testing.T) {
now, err := time.Parse(time.RFC3339, "2020-03-24T08:00:00Z")
testutil.Ok(t, err)

logger := log.NewLogfmtLogger(os.Stderr)
e, err := e2e.NewDockerEnvironment("downsample-mrkd")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

dir := filepath.Join(e.SharedDir(), "tmp")
testutil.Ok(t, os.MkdirAll(dir, os.ModePerm))

const bucket = "compact-test"
m := e2edb.NewMinio(e, "minio", bucket, e2edb.WithMinioTLS())
testutil.Ok(t, e2e.StartAndWaitReady(m))

bktCfg := e2ethanos.NewS3Config(bucket, m.Endpoint("http"), m.Dir())
bkt, err := s3.NewBucketWithConfig(logger, bktCfg, "test")
testutil.Ok(t, err)

downsampledBase := blockDesc{
series: []labels.Labels{
labels.FromStrings("z", "1", "b", "2"),
labels.FromStrings("z", "1", "b", "5"),
},
extLset: labels.FromStrings("case", "block-about-to-be-downsampled"),
mint: timestamp.FromTime(now),
maxt: timestamp.FromTime(now.Add(10 * 24 * time.Hour)),
}
// New block that will be downsampled.
justAfterConsistencyDelay := 30 * time.Minute

downsampledRawID, err := downsampledBase.Create(context.Background(), dir, justAfterConsistencyDelay, metadata.NoneFunc, 1200)
testutil.Ok(t, err)
testutil.Ok(t, objstore.UploadDir(context.Background(), logger, bkt, path.Join(dir, downsampledRawID.String()), downsampledRawID.String()))
testutil.Ok(t, block.MarkForNoDownsample(context.Background(), logger, bkt, downsampledRawID, metadata.ManualNoDownsampleReason, "why not", promauto.With(nil).NewCounter(prometheus.CounterOpts{})))

c := e2ethanos.NewCompactorBuilder(e, "working").Init(client.BucketConfig{
Type: client.S3,
Config: e2ethanos.NewS3Config(bucket, m.InternalEndpoint("http"), m.Dir()),
}, nil)
testutil.Ok(t, e2e.StartAndWaitReady(c))
testutil.NotOk(t, c.WaitSumMetricsWithOptions(e2emon.Greater(0), []string{"thanos_compact_downsample_total"}, e2emon.WaitMissingMetrics()))

}

0 comments on commit 1228e11

Please sign in to comment.