diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 8d9c4e92227..0dc8c4d58b7 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -13,6 +13,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/oklog/run" + "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -23,6 +24,7 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" @@ -31,6 +33,11 @@ import ( "gopkg.in/alecthomas/kingpin.v2" ) +const ( + metricIndexGenerateName = "thanos_compact_generated_index_total" + metricIndexGenerateHelp = "Total number of generated indexes." +) + var ( compactions = compactionSet{ 1 * time.Hour, @@ -85,7 +92,7 @@ func registerCompact(m map[string]setupFunc, app *kingpin.Application) { objStoreConfig := regCommonObjStoreFlags(cmd, "", true) - consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %s will be removed.", compact.MinimumAgeForRemoval)). + consistencyDelay := modelDuration(cmd.Flag("consistency-delay", fmt.Sprintf("Minimum age of fresh (non-compacted) blocks before they are being processed. Malformed blocks older than the maximum of consistency-delay and %v will be removed.", compact.PartialUploadThresholdAge)). Default("30m")) retentionRaw := modelDuration(cmd.Flag("retention.resolution-raw", "How long to retain raw samples in bucket. 0d - disables this retention").Default("0d")) @@ -162,21 +169,28 @@ func runCompact( ) error { halted := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "thanos_compactor_halted", - Help: "Set to 1 if the compactor halted due to an unexpected error", + Help: "Set to 1 if the compactor halted due to an unexpected error.", }) + halted.Set(0) retried := prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compactor_retries_total", - Help: "Total number of retries after retriable compactor error", + Help: "Total number of retries after retriable compactor error.", }) iterations := prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compactor_iterations_total", - Help: "Total number of iterations that were executed successfully", + Help: "Total number of iterations that were executed successfully.", }) - halted.Set(0) - - reg.MustRegister(halted) - reg.MustRegister(retried) - reg.MustRegister(iterations) + consistencyDelayMetric := prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "thanos_consistency_delay_seconds", + Help: "Configured consistency delay in seconds.", + }, func() float64 { + return consistencyDelay.Seconds() + }) + partialUploadDeleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_compactor_aborted_partial_uploads_deletion_attempts_total", + Help: "Total number of started deletions of blocks that are assumed aborted and only partially uploaded.", + }) + reg.MustRegister(halted, retried, iterations, consistencyDelayMetric, partialUploadDeleteAttempts) downsampleMetrics := newDownsampleMetrics(reg) @@ -225,8 +239,15 @@ func runCompact( } }() - sy, err := compact.NewSyncer(logger, reg, bkt, consistencyDelay, - blockSyncConcurrency, acceptMalformedIndex, false, relabelConfig) + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg), + block.NewLabelShardedMetaFilter(relabelConfig).Filter, + (&consistencyDelayMetaFilter{logger: logger, consistencyDelay: consistencyDelay}).Filter, + ) + if err != nil { + return errors.Wrap(err, "create meta fetcher") + } + + sy, err := compact.NewSyncer(logger, reg, bkt, metaFetcher, blockSyncConcurrency, acceptMalformedIndex, false) if err != nil { return errors.Wrap(err, "create syncer") } @@ -276,26 +297,24 @@ func runCompact( level.Info(logger).Log("msg", "retention policy of 1 hour aggregated samples is enabled", "duration", retentionByResolution[compact.ResolutionLevel1h]) } - f := func() error { + compactMainFn := func() error { if err := compactor.Compact(ctx); err != nil { return errors.Wrap(err, "compaction failed") } - level.Info(logger).Log("msg", "compaction iterations done") - // TODO(bplotka): Remove "disableDownsampling" once https://github.com/thanos-io/thanos/issues/297 is fixed. if !disableDownsampling { // After all compactions are done, work down the downsampling backlog. // We run two passes of this to ensure that the 1h downsampling is generated // for 5m downsamplings created in the first run. level.Info(logger).Log("msg", "start first pass of downsampling") - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, downsamplingDir); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil { return errors.Wrap(err, "first pass of downsampling failed") } level.Info(logger).Log("msg", "start second pass of downsampling") - if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, downsamplingDir); err != nil { + if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, metaFetcher, downsamplingDir); err != nil { return errors.Wrap(err, "second pass of downsampling failed") } level.Info(logger).Log("msg", "downsampling iterations done") @@ -303,9 +322,11 @@ func runCompact( level.Warn(logger).Log("msg", "downsampling was explicitly disabled") } - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, retentionByResolution); err != nil { + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, retentionByResolution); err != nil { return errors.Wrap(err, fmt.Sprintf("retention failed")) } + + compact.BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, partialUploadDeleteAttempts) return nil } @@ -314,18 +335,18 @@ func runCompact( // Generate index file. if generateMissingIndexCacheFiles { - if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, indexCacheDir); err != nil { + if err := genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, indexCacheDir); err != nil { return err } } if !wait { - return f() + return compactMainFn() } // --wait=true is specified. return runutil.Repeat(5*time.Minute, ctx.Done(), func() error { - err := f() + err := compactMainFn() if err == nil { iterations.Inc() return nil @@ -363,13 +384,27 @@ func runCompact( return nil } -const ( - metricIndexGenerateName = "thanos_compact_generated_index_total" - metricIndexGenerateHelp = "Total number of generated indexes." -) +type consistencyDelayMetaFilter struct { + logger log.Logger + consistencyDelay time.Duration +} + +func (f *consistencyDelayMetaFilter) Filter(metas map[ulid.ULID]*metadata.Meta, synced block.GaugeLabeled, _ bool) { + for id, meta := range metas { + if ulid.Now()-id.Time() < uint64(f.consistencyDelay/time.Millisecond) && + meta.Thanos.Source != metadata.BucketRepairSource && + meta.Thanos.Source != metadata.CompactorSource && + meta.Thanos.Source != metadata.CompactorRepairSource { + + level.Debug(f.logger).Log("msg", "block is too fresh for now", "block", id) + synced.WithLabelValues(block.TooFreshMeta).Inc() + delete(metas, id) + } + } +} // genMissingIndexCacheFiles scans over all blocks, generates missing index cache files and uploads them to object storage. -func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, dir string) error { +func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prometheus.Registry, bkt objstore.Bucket, fetcher block.MetadataFetcher, dir string) error { genIndex := prometheus.NewCounter(prometheus.CounterOpts{ Name: metricIndexGenerateName, Help: metricIndexGenerateHelp, @@ -391,38 +426,18 @@ func genMissingIndexCacheFiles(ctx context.Context, logger log.Logger, reg *prom level.Info(logger).Log("msg", "start index cache processing") - var metas []*metadata.Meta - - if err := bkt.Iter(ctx, "", func(name string) error { - id, ok := block.IsBlockDir(name) - if !ok { - return nil - } - - meta, err := block.DownloadMeta(ctx, logger, bkt, id) - if err != nil { - // Probably not finished block, skip it. - if bkt.IsObjNotFoundErr(errors.Cause(err)) { - level.Warn(logger).Log("msg", "meta file wasn't found", "block", id.String()) - return nil - } - return errors.Wrap(err, "download metadata") - } + metas, _, err := fetcher.Fetch(ctx) + if err != nil { + return errors.Wrap(err, "fetch metas") + } + for _, meta := range metas { // New version of compactor pushes index cache along with data block. // Skip uncompacted blocks. if meta.Compaction.Level == 1 { - return nil + continue } - metas = append(metas, &meta) - - return nil - }); err != nil { - return errors.Wrap(err, "retrieve bucket block metas") - } - - for _, meta := range metas { if err := generateIndexCacheFile(ctx, bkt, logger, dir, meta); err != nil { return err } diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 5c053d7e6e2..00de9e57cbb 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/extflag" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/prober" @@ -88,6 +89,11 @@ func runDownsample( return err } + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", extprom.WrapRegistererWithPrefix("thanos_", reg)) + if err != nil { + return errors.Wrap(err, "create meta fetcher") + } + // Ensure we close up everything properly. defer func() { if err != nil { @@ -107,13 +113,13 @@ func runDownsample( level.Info(logger).Log("msg", "start first pass of downsampling") - if err := downsampleBucket(ctx, logger, metrics, bkt, dataDir); err != nil { + if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil { return errors.Wrap(err, "downsampling failed") } level.Info(logger).Log("msg", "start second pass of downsampling") - if err := downsampleBucket(ctx, logger, metrics, bkt, dataDir); err != nil { + if err := downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dataDir); err != nil { return errors.Wrap(err, "downsampling failed") } @@ -148,6 +154,7 @@ func downsampleBucket( logger log.Logger, metrics *DownsampleMetrics, bkt objstore.Bucket, + fetcher block.MetadataFetcher, dir string, ) error { if err := os.RemoveAll(dir); err != nil { @@ -163,25 +170,9 @@ func downsampleBucket( } }() - var metas []*metadata.Meta - - err := bkt.Iter(ctx, "", func(name string) error { - id, ok := block.IsBlockDir(name) - if !ok { - return nil - } - - m, err := block.DownloadMeta(ctx, logger, bkt, id) - if err != nil { - return errors.Wrap(err, "download metadata") - } - - metas = append(metas, &m) - - return nil - }) + metas, _, err := fetcher.Fetch(ctx) if err != nil { - return errors.Wrap(err, "retrieve bucket block metas") + return errors.Wrap(err, "downsampling meta fetch") } // mapping from a hash over all source IDs to blocks. We don't need to downsample a block diff --git a/cmd/thanos/main_test.go b/cmd/thanos/main_test.go index ea4f7063e14..f6aedb48eb3 100644 --- a/cmd/thanos/main_test.go +++ b/cmd/thanos/main_test.go @@ -75,7 +75,10 @@ func TestCleanupIndexCacheFolder(t *testing.T) { }) expReg.MustRegister(genIndexExp) - testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, dir)) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + testutil.Ok(t, err) + + testutil.Ok(t, genMissingIndexCacheFiles(ctx, logger, reg, bkt, metaFetcher, dir)) genIndexExp.Inc() testutil.GatherAndCompare(t, expReg, reg, metricIndexGenerateName) @@ -112,7 +115,10 @@ func TestCleanupDownsampleCacheFolder(t *testing.T) { metrics := newDownsampleMetrics(prometheus.NewRegistry()) testutil.Equals(t, 0.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos)))) - testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, dir)) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + testutil.Ok(t, err) + + testutil.Ok(t, downsampleBucket(ctx, logger, metrics, bkt, metaFetcher, dir)) testutil.Equals(t, 1.0, promtest.ToFloat64(metrics.downsamples.WithLabelValues(compact.GroupKey(meta.Thanos)))) _, err = os.Stat(dir) diff --git a/pkg/block/fetcher.go b/pkg/block/fetcher.go index 0e110a905a3..66af7e0af5d 100644 --- a/pkg/block/fetcher.go +++ b/pkg/block/fetcher.go @@ -223,7 +223,6 @@ func (s *MetaFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met level.Warn(s.logger).Log("msg", "best effort save of the meta.json to local dir failed; ignoring", "dir", cachedBlockDir, "err", err) } } - return m, nil } diff --git a/pkg/compact/clean.go b/pkg/compact/clean.go new file mode 100644 index 00000000000..7fa085eebc8 --- /dev/null +++ b/pkg/compact/clean.go @@ -0,0 +1,48 @@ +package compact + +import ( + "context" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/objstore" +) + +const ( + // PartialUploadThresholdAge is a time after partial block is assumed aborted and ready to be cleaned. + // Keep it long as it is based on block creation time not upload start time. + PartialUploadThresholdAge = 2 * 24 * time.Hour +) + +func BestEffortCleanAbortedPartialUploads(ctx context.Context, logger log.Logger, fetcher block.MetadataFetcher, bkt objstore.Bucket, deleteAttempts prometheus.Counter) { + level.Info(logger).Log("msg", "started cleaning of aborted partial uploads") + _, partial, err := fetcher.Fetch(ctx) + if err != nil { + level.Warn(logger).Log("msg", "failed to fetch metadata for cleaning of aborted partial uploads; skipping", "err", err) + } + + // Delete partial blocks that are older than partialUploadThresholdAge. + // TODO(bwplotka): This is can cause data loss if blocks are: + // * being uploaded longer than partialUploadThresholdAge + // * being uploaded and started after their partialUploadThresholdAge + // can be assumed in this case. Keep partialUploadThresholdAge long for now. + // Mitigate this by adding ModifiedTime to bkt and check that instead of ULID (block creation time). + for id := range partial { + if ulid.Now()-id.Time() <= uint64(PartialUploadThresholdAge/time.Millisecond) { + // Minimum delay has not expired, ignore for now. + continue + } + + deleteAttempts.Inc() + if err := block.Delete(ctx, logger, bkt, id); err != nil { + level.Warn(logger).Log("msg", "failed to delete aborted partial upload; skipping", "block", id, "thresholdAge", PartialUploadThresholdAge, "err", err) + return + } + level.Info(logger).Log("msg", "deleted aborted partial upload", "block", id, "thresholdAge", PartialUploadThresholdAge) + } + level.Info(logger).Log("msg", "cleaning of aborted partial uploads done") +} diff --git a/pkg/compact/clean_test.go b/pkg/compact/clean_test.go new file mode 100644 index 00000000000..5332da76373 --- /dev/null +++ b/pkg/compact/clean_test.go @@ -0,0 +1,72 @@ +package compact + +import ( + "bytes" + "context" + "encoding/json" + "path" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + promtest "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/objstore/inmem" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestBestEffortCleanAbortedPartialUploads(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + bkt := inmem.NewBucket() + logger := log.NewNopLogger() + + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + testutil.Ok(t, err) + + // 1. No meta, old block, should be removed. + shouldDeleteID, err := ulid.New(uint64(time.Now().Add(-PartialUploadThresholdAge-1*time.Hour).Unix()*1000), nil) + testutil.Ok(t, err) + + var fakeChunk bytes.Buffer + fakeChunk.Write([]byte{0, 1, 2, 3}) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001"), &fakeChunk)) + + // 2. Old block with meta, so should be kept. + shouldIgnoreID1, err := ulid.New(uint64(time.Now().Add(-PartialUploadThresholdAge-2*time.Hour).Unix()*1000), nil) + testutil.Ok(t, err) + var meta metadata.Meta + meta.Version = 1 + meta.ULID = shouldIgnoreID1 + + var buf bytes.Buffer + testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID1.String(), metadata.MetaFilename), &buf)) + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID1.String(), "chunks", "000001"), &fakeChunk)) + + // 3. No meta, newer block that should be kept. + shouldIgnoreID2, err := ulid.New(uint64(time.Now().Add(-2*time.Hour).Unix()*1000), nil) + testutil.Ok(t, err) + + testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreID2.String(), "chunks", "000001"), &fakeChunk)) + + deleteAttempts := prometheus.NewCounter(prometheus.CounterOpts{}) + BestEffortCleanAbortedPartialUploads(ctx, logger, metaFetcher, bkt, deleteAttempts) + testutil.Equals(t, 1.0, promtest.ToFloat64(deleteAttempts)) + + exists, err := bkt.Exists(ctx, path.Join(shouldDeleteID.String(), "chunks", "000001")) + testutil.Ok(t, err) + testutil.Equals(t, false, exists) + + exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreID1.String(), "chunks", "000001")) + testutil.Ok(t, err) + testutil.Equals(t, true, exists) + + exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreID2.String(), "chunks", "000001")) + testutil.Ok(t, err) + testutil.Equals(t, true, exists) +} diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index b853ff5be1f..5b40bca70da 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -5,7 +5,6 @@ import ( "fmt" "io/ioutil" "os" - "path" "path/filepath" "sort" "sync" @@ -17,7 +16,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb" terrors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/block" @@ -32,33 +30,24 @@ const ( ResolutionLevelRaw = ResolutionLevel(downsample.ResLevel0) ResolutionLevel5m = ResolutionLevel(downsample.ResLevel1) ResolutionLevel1h = ResolutionLevel(downsample.ResLevel2) - - MinimumAgeForRemoval = time.Duration(30 * time.Minute) ) -var blockTooFreshSentinelError = errors.New("Block too fresh") - -// Syncer syncronizes block metas from a bucket into a local directory. +// Syncer synchronizes block metas from a bucket into a local directory. // It sorts them into compaction groups based on equal label sets. type Syncer struct { logger log.Logger reg prometheus.Registerer bkt objstore.Bucket - consistencyDelay time.Duration + fetcher block.MetadataFetcher mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta - blocksMtx sync.Mutex blockSyncConcurrency int metrics *syncerMetrics acceptMalformedIndex bool enableVerticalCompaction bool - relabelConfig []*relabel.Config } type syncerMetrics struct { - syncMetas prometheus.Counter - syncMetaFailures prometheus.Counter - syncMetaDuration prometheus.Histogram garbageCollectedBlocks prometheus.Counter garbageCollections prometheus.Counter garbageCollectionFailures prometheus.Counter @@ -73,20 +62,6 @@ type syncerMetrics struct { func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { var m syncerMetrics - m.syncMetas = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_compact_sync_meta_total", - Help: "Total number of sync meta operations.", - }) - m.syncMetaFailures = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "thanos_compact_sync_meta_failures_total", - Help: "Total number of failed sync meta operations.", - }) - m.syncMetaDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "thanos_compact_sync_meta_duration_seconds", - Help: "Time it took to sync meta files.", - Buckets: []float64{0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120, 240, 360, 720}, - }) - m.garbageCollectedBlocks = prometheus.NewCounter(prometheus.CounterOpts{ Name: "thanos_compact_garbage_collected_blocks_total", Help: "Total number of deleted blocks by compactor.", @@ -128,9 +103,6 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { if reg != nil { reg.MustRegister( - m.syncMetas, - m.syncMetaFailures, - m.syncMetaDuration, m.garbageCollectedBlocks, m.garbageCollections, m.garbageCollectionFailures, @@ -145,22 +117,21 @@ func newSyncerMetrics(reg prometheus.Registerer) *syncerMetrics { return &m } -// NewSyncer returns a new Syncer for the given Bucket and directory. +// NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, consistencyDelay time.Duration, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool, relabelConfig []*relabel.Config) (*Syncer, error) { +func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, blockSyncConcurrency int, acceptMalformedIndex bool, enableVerticalCompaction bool) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } return &Syncer{ logger: logger, reg: reg, - consistencyDelay: consistencyDelay, - blocks: map[ulid.ULID]*metadata.Meta{}, bkt: bkt, + fetcher: fetcher, + blocks: map[ulid.ULID]*metadata.Meta{}, metrics: newSyncerMetrics(reg), blockSyncConcurrency: blockSyncConcurrency, acceptMalformedIndex: acceptMalformedIndex, - relabelConfig: relabelConfig, // The syncer offers an option to enable vertical compaction, even if it's // not currently used by Thanos, because the compactor is also used by Cortex // which needs vertical compaction. @@ -168,24 +139,6 @@ func NewSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket }, nil } -// SyncMetas synchronizes all meta files from blocks in the bucket into -// the memory. It removes any partial blocks older than the max of -// consistencyDelay and MinimumAgeForRemoval from the bucket. -func (c *Syncer) SyncMetas(ctx context.Context) error { - c.mtx.Lock() - defer c.mtx.Unlock() - - begin := time.Now() - - err := c.syncMetas(ctx) - if err != nil { - c.metrics.syncMetaFailures.Inc() - } - c.metrics.syncMetas.Inc() - c.metrics.syncMetaDuration.Observe(time.Since(begin).Seconds()) - return err -} - // UntilNextDownsampling calculates how long it will take until the next downsampling operation. // Returns an error if there will be no downsampling. func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { @@ -202,155 +155,19 @@ func UntilNextDownsampling(m *metadata.Meta) (time.Duration, error) { } } -func (c *Syncer) syncMetas(ctx context.Context) error { - var wg sync.WaitGroup - defer wg.Wait() - - metaIDsChan := make(chan ulid.ULID) - errChan := make(chan error, c.blockSyncConcurrency) - - workCtx, cancel := context.WithCancel(ctx) - defer cancel() - for i := 0; i < c.blockSyncConcurrency; i++ { - wg.Add(1) - go func() { - defer wg.Done() - - for id := range metaIDsChan { - // Check if we already have this block cached locally. - c.blocksMtx.Lock() - _, seen := c.blocks[id] - c.blocksMtx.Unlock() - if seen { - continue - } - - meta, err := c.downloadMeta(workCtx, id) - if err == blockTooFreshSentinelError { - continue - } - - if err != nil { - if removedOrIgnored := c.removeIfMetaMalformed(workCtx, id); removedOrIgnored { - continue - } - errChan <- err - return - } - - // Check for block labels by relabeling. - // If output is empty, the block will be dropped. - lset := labels.FromMap(meta.Thanos.Labels) - processedLabels := relabel.Process(lset, c.relabelConfig...) - if processedLabels == nil { - level.Debug(c.logger).Log("msg", "dropping block(drop in relabeling)", "block", id) - continue - } - - c.blocksMtx.Lock() - c.blocks[id] = meta - c.blocksMtx.Unlock() - } - }() - } - - // Read back all block metas so we can detect deleted blocks. - remote := map[ulid.ULID]struct{}{} - - err := c.bkt.Iter(ctx, "", func(name string) error { - id, ok := block.IsBlockDir(name) - if !ok { - return nil - } - - remote[id] = struct{}{} +func (s *Syncer) SyncMetas(ctx context.Context) error { + s.mtx.Lock() + defer s.mtx.Unlock() - select { - case <-ctx.Done(): - case metaIDsChan <- id: - } - - return nil - }) - close(metaIDsChan) + metas, _, err := s.fetcher.Fetch(ctx) if err != nil { - return retry(errors.Wrap(err, "retrieve bucket block metas")) - } - - wg.Wait() - close(errChan) - - if err := <-errChan; err != nil { return retry(err) } - - // Delete all local block dirs that no longer exist in the bucket. - for id := range c.blocks { - if _, ok := remote[id]; !ok { - delete(c.blocks, id) - } - } + s.blocks = metas return nil } -func (c *Syncer) downloadMeta(ctx context.Context, id ulid.ULID) (*metadata.Meta, error) { - level.Debug(c.logger).Log("msg", "download meta", "block", id) - - meta, err := block.DownloadMeta(ctx, c.logger, c.bkt, id) - if err != nil { - if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) { - level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) - return nil, blockTooFreshSentinelError - } - return nil, errors.Wrapf(err, "downloading meta.json for %s", id) - } - - // ULIDs contain a millisecond timestamp. We do not consider blocks that have been created too recently to - // avoid races when a block is only partially uploaded. This relates to all blocks, excluding: - // - repair created blocks - // - compactor created blocks - // NOTE: It is not safe to miss "old" block (even that it is newly created) in sync step. Compactor needs to aware of ALL old blocks. - // TODO(bplotka): https://github.com/thanos-io/thanos/issues/377. - if ulid.Now()-id.Time() < uint64(c.consistencyDelay/time.Millisecond) && - meta.Thanos.Source != metadata.BucketRepairSource && - meta.Thanos.Source != metadata.CompactorSource && - meta.Thanos.Source != metadata.CompactorRepairSource { - - level.Debug(c.logger).Log("msg", "block is too fresh for now", "block", id) - return nil, blockTooFreshSentinelError - } - - return &meta, nil -} - -// removeIfMalformed removes a block from the bucket if that block does not have a meta file. It ignores blocks that -// are younger than MinimumAgeForRemoval. -func (c *Syncer) removeIfMetaMalformed(ctx context.Context, id ulid.ULID) (removedOrIgnored bool) { - metaExists, err := c.bkt.Exists(ctx, path.Join(id.String(), block.MetaFilename)) - if err != nil { - level.Warn(c.logger).Log("msg", "failed to check meta exists for block", "block", id, "err", err) - return false - } - if metaExists { - // Meta exists, block is not malformed. - return false - } - - if ulid.Now()-id.Time() <= uint64(MinimumAgeForRemoval/time.Millisecond) { - // Minimum delay has not expired, ignore for now. - return true - } - - if err := block.Delete(ctx, c.logger, c.bkt, id); err != nil { - level.Warn(c.logger).Log("msg", "failed to delete malformed block", "block", id, "err", err) - return false - } - level.Info(c.logger).Log("msg", "deleted malformed block", "block", id) - - return true -} - // GroupKey returns a unique identifier for the group the block belongs to. It considers // the downsampling resolution and the block's labels. func GroupKey(meta metadata.Thanos) string { @@ -363,27 +180,27 @@ func groupKey(res int64, lbls labels.Labels) string { // Groups returns the compaction groups for all blocks currently known to the syncer. // It creates all groups from the scratch on every call. -func (c *Syncer) Groups() (res []*Group, err error) { - c.mtx.Lock() - defer c.mtx.Unlock() +func (s *Syncer) Groups() (res []*Group, err error) { + s.mtx.Lock() + defer s.mtx.Unlock() groups := map[string]*Group{} - for _, m := range c.blocks { + for _, m := range s.blocks { g, ok := groups[GroupKey(m.Thanos)] if !ok { g, err = newGroup( - log.With(c.logger, "compactionGroup", GroupKey(m.Thanos)), - c.bkt, + log.With(s.logger, "compactionGroup", GroupKey(m.Thanos)), + s.bkt, labels.FromMap(m.Thanos.Labels), m.Thanos.Downsample.Resolution, - c.acceptMalformedIndex, - c.enableVerticalCompaction, - c.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.verticalCompactions.WithLabelValues(GroupKey(m.Thanos)), - c.metrics.garbageCollectedBlocks, + s.acceptMalformedIndex, + s.enableVerticalCompaction, + s.metrics.compactions.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.compactionRunsStarted.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.compactionRunsCompleted.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.compactionFailures.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.verticalCompactions.WithLabelValues(GroupKey(m.Thanos)), + s.metrics.garbageCollectedBlocks, ) if err != nil { return nil, errors.Wrap(err, "create compaction group") @@ -403,9 +220,9 @@ func (c *Syncer) Groups() (res []*Group, err error) { // GarbageCollect deletes blocks from the bucket if their data is available as part of a // block with a higher compaction level. -func (c *Syncer) GarbageCollect(ctx context.Context) error { - c.mtx.Lock() - defer c.mtx.Unlock() +func (s *Syncer) GarbageCollect(ctx context.Context) error { + s.mtx.Lock() + defer s.mtx.Unlock() begin := time.Now() @@ -413,12 +230,12 @@ func (c *Syncer) GarbageCollect(ctx context.Context) error { for _, res := range []int64{ downsample.ResLevel0, downsample.ResLevel1, downsample.ResLevel2, } { - err := c.garbageCollect(ctx, res) + err := s.garbageCollect(ctx, res) if err != nil { - c.metrics.garbageCollectionFailures.Inc() + s.metrics.garbageCollectionFailures.Inc() } - c.metrics.garbageCollections.Inc() - c.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds()) + s.metrics.garbageCollections.Inc() + s.metrics.garbageCollectionDuration.Observe(time.Since(begin).Seconds()) if err != nil { return errors.Wrapf(err, "garbage collect resolution %d", res) @@ -427,13 +244,12 @@ func (c *Syncer) GarbageCollect(ctx context.Context) error { return nil } -func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { +func (s *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { // Map each block to its highest priority parent. Initial blocks have themselves // in their source section, i.e. are their own parent. parents := map[ulid.ULID]ulid.ULID{} - for id, meta := range c.blocks { - + for id, meta := range s.blocks { // Skip any block that has a different resolution. if meta.Thanos.Downsample.Resolution != resolution { continue @@ -447,7 +263,7 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { parents[sid] = id continue } - pmeta, ok := c.blocks[pid] + pmeta, ok := s.blocks[pid] if !ok { return nil, errors.Errorf("previous parent block %s not found", pid) } @@ -473,7 +289,7 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { topParents[pid] = struct{}{} } - for id, meta := range c.blocks { + for id, meta := range s.blocks { // Skip any block that has a different resolution. if meta.Thanos.Downsample.Resolution != resolution { continue @@ -487,8 +303,8 @@ func (c *Syncer) GarbageBlocks(resolution int64) (ids []ulid.ULID, err error) { return ids, nil } -func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { - garbageIds, err := c.GarbageBlocks(resolution) +func (s *Syncer) garbageCollect(ctx context.Context, resolution int64) error { + garbageIds, err := s.GarbageBlocks(resolution) if err != nil { return err } @@ -501,9 +317,9 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { // Spawn a new context so we always delete a block in full on shutdown. delCtx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - level.Info(c.logger).Log("msg", "deleting outdated block", "block", id) + level.Info(s.logger).Log("msg", "deleting outdated block", "block", id) - err := block.Delete(delCtx, c.logger, c.bkt, id) + err := block.Delete(delCtx, s.logger, s.bkt, id) cancel() if err != nil { return retry(errors.Wrapf(err, "delete block %s from bucket", id)) @@ -511,8 +327,8 @@ func (c *Syncer) garbageCollect(ctx context.Context, resolution int64) error { // Immediately update our in-memory state so no further call to SyncMetas is needed // after running garbage collection. - delete(c.blocks, id) - c.metrics.garbageCollectedBlocks.Inc() + delete(s.blocks, id) + s.metrics.garbageCollectedBlocks.Inc() } return nil } @@ -1161,5 +977,6 @@ func (c *BucketCompactor) Compact(ctx context.Context) error { break } } + level.Info(c.logger).Log("msg", "compaction iterations done") return nil } diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 9f35f24bff5..9a7411b2e95 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -20,7 +20,6 @@ import ( "github.com/prometheus/client_golang/prometheus" promtest "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/pkg/relabel" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/index" "github.com/thanos-io/thanos/pkg/block" @@ -28,57 +27,8 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/objstore/objtesting" "github.com/thanos-io/thanos/pkg/testutil" - "gopkg.in/yaml.v2" ) -func TestSyncer_SyncMetas_e2e(t *testing.T) { - objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) - defer cancel() - - relabelConfig := make([]*relabel.Config, 0) - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig) - testutil.Ok(t, err) - - // Generate 15 blocks. Initially the first 10 are synced into memory and only the last - // 10 are in the bucket. - // After the first synchronization the first 5 should be dropped and the - // last 5 be loaded from the bucket. - var ids []ulid.ULID - var metas []*metadata.Meta - - for i := 0; i < 15; i++ { - id, err := ulid.New(uint64(i), nil) - testutil.Ok(t, err) - - var meta metadata.Meta - meta.Version = 1 - meta.ULID = id - - if i < 10 { - sy.blocks[id] = &meta - } - ids = append(ids, id) - metas = append(metas, &meta) - } - for _, m := range metas[5:] { - var buf bytes.Buffer - testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) - } - - groups, err := sy.Groups() - testutil.Ok(t, err) - testutil.Equals(t, ids[:10], groups[0].IDs()) - - testutil.Ok(t, sy.SyncMetas(ctx)) - - groups, err = sy.Groups() - testutil.Ok(t, err) - testutil.Equals(t, ids[5:], groups[0].IDs()) - }) -} - func TestSyncer_GarbageCollect_e2e(t *testing.T) { objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) @@ -89,8 +39,6 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { var metas []*metadata.Meta var ids []ulid.ULID - relabelConfig := make([]*relabel.Config, 0) - for i := 0; i < 10; i++ { var m metadata.Meta @@ -125,7 +73,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { m3.Thanos.Downsample.Resolution = 0 var m4 metadata.Meta - m4.Version = 14 + m4.Version = 1 m4.ULID = ulid.MustNew(400, nil) m4.Compaction.Level = 2 m4.Compaction.Sources = ids[9:] // covers the last block but is a different resolution. Must not trigger deletion. @@ -139,11 +87,14 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) } - // Do one initial synchronization with the bucket. - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) testutil.Ok(t, err) - testutil.Ok(t, sy.SyncMetas(ctx)) + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 1, false, false) + testutil.Ok(t, err) + + // Do one initial synchronization with the bucket. + testutil.Ok(t, sy.SyncMetas(ctx)) testutil.Ok(t, sy.GarbageCollect(ctx)) var rem []ulid.ULID @@ -209,7 +160,10 @@ func TestGroup_Compact_e2e(t *testing.T) { reg := prometheus.NewRegistry() - sy, err := NewSyncer(logger, reg, bkt, 0*time.Second, 5, false, false, nil) + metaFetcher, err := block.NewMetaFetcher(nil, 32, bkt, "", nil) + testutil.Ok(t, err) + + sy, err := NewSyncer(nil, nil, bkt, metaFetcher, 5, false, false) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) @@ -220,8 +174,6 @@ func TestGroup_Compact_e2e(t *testing.T) { // Compaction on empty should not fail. testutil.Ok(t, bComp.Compact(ctx)) - testutil.Equals(t, 1.0, promtest.ToFloat64(sy.metrics.syncMetas)) - testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.syncMetaFailures)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) testutil.Equals(t, 0, MetricCount(sy.metrics.compactions)) @@ -310,8 +262,6 @@ func TestGroup_Compact_e2e(t *testing.T) { }) testutil.Ok(t, bComp.Compact(ctx)) - testutil.Equals(t, 3.0, promtest.ToFloat64(sy.metrics.syncMetas)) - testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.syncMetaFailures)) testutil.Equals(t, 5.0, promtest.ToFloat64(sy.metrics.garbageCollectedBlocks)) testutil.Equals(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures)) testutil.Equals(t, 4, MetricCount(sy.metrics.compactions)) @@ -495,86 +445,3 @@ func createEmptyBlock(dir string, mint int64, maxt int64, extLset labels.Labels, return uid, nil } - -func TestSyncer_SyncMetasFilter_e2e(t *testing.T) { - var err error - - relabelContentYaml := ` - - action: drop - regex: "A" - source_labels: - - cluster - ` - var relabelConfig []*relabel.Config - err = yaml.Unmarshal([]byte(relabelContentYaml), &relabelConfig) - testutil.Ok(t, err) - - extLsets := []labels.Labels{{{Name: "cluster", Value: "A"}}, {{Name: "cluster", Value: "B"}}} - - objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) { - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) - defer cancel() - - sy, err := NewSyncer(nil, nil, bkt, 0, 1, false, false, relabelConfig) - testutil.Ok(t, err) - - var ids []ulid.ULID - var metas []*metadata.Meta - - for i := 0; i < 16; i++ { - id, err := ulid.New(uint64(i), nil) - testutil.Ok(t, err) - - var meta metadata.Meta - meta.Version = 1 - meta.ULID = id - meta.Thanos = metadata.Thanos{ - Labels: extLsets[i%2].Map(), - } - - ids = append(ids, id) - metas = append(metas, &meta) - } - for _, m := range metas[:10] { - var buf bytes.Buffer - testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) - } - - testutil.Ok(t, sy.SyncMetas(ctx)) - - groups, err := sy.Groups() - testutil.Ok(t, err) - var evenIds []ulid.ULID - for i := 0; i < 10; i++ { - if i%2 != 0 { - evenIds = append(evenIds, ids[i]) - } - } - testutil.Equals(t, evenIds, groups[0].IDs()) - - // Upload last 6 blocks. - for _, m := range metas[10:] { - var buf bytes.Buffer - testutil.Ok(t, json.NewEncoder(&buf).Encode(&m)) - testutil.Ok(t, bkt.Upload(ctx, path.Join(m.ULID.String(), metadata.MetaFilename), &buf)) - } - - // Delete first 4 blocks. - for _, m := range metas[:4] { - testutil.Ok(t, block.Delete(ctx, log.NewNopLogger(), bkt, m.ULID)) - } - - testutil.Ok(t, sy.SyncMetas(ctx)) - - groups, err = sy.Groups() - testutil.Ok(t, err) - evenIds = make([]ulid.ULID, 0) - for i := 4; i < 16; i++ { - if i%2 != 0 { - evenIds = append(evenIds, ids[i]) - } - } - testutil.Equals(t, evenIds, groups[0].IDs()) - }) -} diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 3624d27fda6..f7364f1147b 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -1,19 +1,12 @@ package compact import ( - "bytes" - "context" - "path" "testing" - "time" "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/oklog/ulid" "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/relabel" terrors "github.com/prometheus/prometheus/tsdb/errors" - "github.com/thanos-io/thanos/pkg/objstore/inmem" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -73,41 +66,6 @@ func TestRetryError(t *testing.T) { testutil.Assert(t, IsHaltError(err), "not a halt error. Retry should not hide halt error") } -func TestSyncer_SyncMetas_HandlesMalformedBlocks(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - bkt := inmem.NewBucket() - relabelConfig := make([]*relabel.Config, 0) - sy, err := NewSyncer(nil, nil, bkt, 10*time.Second, 1, false, false, relabelConfig) - testutil.Ok(t, err) - - // Generate 1 block which is older than MinimumAgeForRemoval which has chunk data but no meta. Compactor should delete it. - shouldDeleteId, err := ulid.New(uint64(time.Now().Add(-time.Hour).Unix()*1000), nil) - testutil.Ok(t, err) - - var fakeChunk bytes.Buffer - fakeChunk.Write([]byte{0, 1, 2, 3}) - testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001"), &fakeChunk)) - - // Generate 1 block which is older than consistencyDelay but younger than MinimumAgeForRemoval, and which has chunk - // data but no meta. Compactor should ignore it. - shouldIgnoreId, err := ulid.New(uint64(time.Now().Unix()*1000), nil) - testutil.Ok(t, err) - - testutil.Ok(t, bkt.Upload(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001"), &fakeChunk)) - - testutil.Ok(t, sy.SyncMetas(ctx)) - - exists, err := bkt.Exists(ctx, path.Join(shouldDeleteId.String(), "chunks", "000001")) - testutil.Ok(t, err) - testutil.Equals(t, false, exists) - - exists, err = bkt.Exists(ctx, path.Join(shouldIgnoreId.String(), "chunks", "000001")) - testutil.Ok(t, err) - testutil.Equals(t, true, exists) -} - func TestGroupKey(t *testing.T) { for _, tcase := range []struct { input metadata.Thanos diff --git a/pkg/compact/downsample/streamed_block_writer.go b/pkg/compact/downsample/streamed_block_writer.go index 0bc84e11c4f..531ee6efa90 100644 --- a/pkg/compact/downsample/streamed_block_writer.go +++ b/pkg/compact/downsample/streamed_block_writer.go @@ -128,7 +128,7 @@ func NewStreamedBlockWriter( }, nil } -// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks Metas to indexWrites and adds label sets to +// WriteSeries writes chunks data to the chunkWriter, writes lset and chunks MetasFetcher to indexWrites and adds label sets to // labelsValues sets and memPostings to be written on the finalize state in the end of downsampling process. func (w *streamedBlockWriter) WriteSeries(lset labels.Labels, chunks []chunks.Meta) error { if w.finalized || w.ignoreFinalize { diff --git a/pkg/compact/retention.go b/pkg/compact/retention.go index 9021677f2bd..aba46963ecc 100644 --- a/pkg/compact/retention.go +++ b/pkg/compact/retention.go @@ -13,21 +13,17 @@ import ( // ApplyRetentionPolicyByResolution removes blocks depending on the specified retentionByResolution based on blocks MaxTime. // A value of 0 disables the retention for its resolution. -func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, retentionByResolution map[ResolutionLevel]time.Duration) error { +func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bkt objstore.Bucket, fetcher block.MetadataFetcher, retentionByResolution map[ResolutionLevel]time.Duration) error { level.Info(logger).Log("msg", "start optional retention") - if err := bkt.Iter(ctx, "", func(name string) error { - id, ok := block.IsBlockDir(name) - if !ok { - return nil - } - m, err := block.DownloadMeta(ctx, logger, bkt, id) - if err != nil { - return errors.Wrap(err, "download metadata") - } + metas, _, err := fetcher.Fetch(ctx) + if err != nil { + return errors.Wrap(err, "fetch metas") + } + for id, m := range metas { retentionDuration := retentionByResolution[ResolutionLevel(m.Thanos.Downsample.Resolution)] if retentionDuration.Seconds() == 0 { - return nil + continue } maxTime := time.Unix(m.MaxTime/1000, 0) @@ -37,10 +33,6 @@ func ApplyRetentionPolicyByResolution(ctx context.Context, logger log.Logger, bk return errors.Wrap(err, "delete block") } } - - return nil - }); err != nil { - return errors.Wrap(err, "retention") } level.Info(logger).Log("msg", "optional retention apply done") diff --git a/pkg/compact/retention_test.go b/pkg/compact/retention_test.go index c35fde30643..b34847faf3b 100644 --- a/pkg/compact/retention_test.go +++ b/pkg/compact/retention_test.go @@ -11,6 +11,7 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/compact" "github.com/thanos-io/thanos/pkg/objstore" @@ -236,7 +237,11 @@ func TestApplyRetentionPolicyByResolution(t *testing.T) { for _, b := range tt.blocks { uploadMockBlock(t, bkt, b.id, b.minTime, b.maxTime, int64(b.resolution)) } - if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, tt.retentionByResolution); (err != nil) != tt.wantErr { + + metaFetcher, err := block.NewMetaFetcher(logger, 32, bkt, "", nil) + testutil.Ok(t, err) + + if err := compact.ApplyRetentionPolicyByResolution(ctx, logger, bkt, metaFetcher, tt.retentionByResolution); (err != nil) != tt.wantErr { t.Errorf("ApplyRetentionPolicyByResolution() error = %v, wantErr %v", err, tt.wantErr) }