From cb727abeed90ab5da3990557fefe0da9dd889a1a Mon Sep 17 00:00:00 2001 From: Bartlomiej Plotka Date: Wed, 4 Nov 2020 16:00:29 +0100 Subject: [PATCH] compact: Refactored compaction Planner for ability to extend planners. Added compatibility tests. (#3402) Signed-off-by: Bartlomiej Plotka --- cmd/thanos/compact.go | 2 +- pkg/block/block_test.go | 5 +- pkg/block/metadata/meta.go | 9 +- pkg/compact/compact.go | 179 +++++++------- pkg/compact/compact_e2e_test.go | 4 +- pkg/compact/compact_test.go | 19 +- pkg/compact/planner.go | 163 +++++++++++++ pkg/compact/planner_test.go | 410 ++++++++++++++++++++++++++++++++ pkg/shipper/shipper_e2e_test.go | 2 - 9 files changed, 673 insertions(+), 120 deletions(-) create mode 100644 pkg/compact/planner.go create mode 100644 pkg/compact/planner_test.go diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index 4dcc41db92..ba09d20b46 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -280,7 +280,7 @@ func runCompact( grouper := compact.NewDefaultGrouper(logger, bkt, conf.acceptMalformedIndex, enableVerticalCompaction, reg, blocksMarkedForDeletion, garbageCollectedBlocks) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, deleteDelay, blocksCleaned, blockCleanupFailures) - compactor, err := compact.NewBucketCompactor(logger, sy, grouper, comp, compactDir, bkt, conf.compactionConcurrency) + compactor, err := compact.NewBucketCompactor(logger, sy, grouper, compact.NewTSDBBasedPlanner(logger, levels), comp, compactDir, bkt, conf.compactionConcurrency) if err != nil { cancel() return errors.Wrap(err, "create bucket compactor") diff --git a/pkg/block/block_test.go b/pkg/block/block_test.go index 8c55989ce1..fb584bc758 100644 --- a/pkg/block/block_test.go +++ b/pkg/block/block_test.go @@ -141,7 +141,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 4, len(bkt.Objects())) testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 562, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) // File stats are gathered. testutil.Equals(t, fmt.Sprintf(`{ @@ -161,7 +161,6 @@ func TestUpload(t *testing.T) { }, "version": 1, "thanos": { - "version": 1, "labels": { "ext1": "val1" }, @@ -192,7 +191,7 @@ func TestUpload(t *testing.T) { testutil.Equals(t, 4, len(bkt.Objects())) testutil.Equals(t, 3751, len(bkt.Objects()[path.Join(b1.String(), ChunksDirname, "000001")])) testutil.Equals(t, 401, len(bkt.Objects()[path.Join(b1.String(), IndexFilename)])) - testutil.Equals(t, 562, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) + testutil.Equals(t, 546, len(bkt.Objects()[path.Join(b1.String(), MetaFilename)])) } { // Upload with no external labels should be blocked. diff --git a/pkg/block/metadata/meta.go b/pkg/block/metadata/meta.go index a98d8ea0f1..db9e3792aa 100644 --- a/pkg/block/metadata/meta.go +++ b/pkg/block/metadata/meta.go @@ -168,13 +168,14 @@ func Read(dir string) (*Meta, error) { if m.Version != TSDBVersion1 { return nil, errors.Errorf("unexpected meta file version %d", m.Version) } - if m.Thanos.Version == 0 { + + version := m.Thanos.Version + if version == 0 { // For compatibility. - m.Thanos.Version = ThanosVersion1 - return &m, nil + version = ThanosVersion1 } - if m.Thanos.Version != ThanosVersion1 { + if version != ThanosVersion1 { return nil, errors.Errorf("unexpected meta file Thanos section version %d", m.Version) } return &m, nil diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 61590cc4b8..b3966c6196 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -323,7 +323,7 @@ type Group struct { labels labels.Labels resolution int64 mtx sync.Mutex - blocks map[ulid.ULID]*metadata.Meta + metasByMinTime []*metadata.Meta acceptMalformedIndex bool enableVerticalCompaction bool compactions prometheus.Counter @@ -361,7 +361,6 @@ func NewGroup( key: key, labels: lset, resolution: resolution, - blocks: map[ulid.ULID]*metadata.Meta{}, acceptMalformedIndex: acceptMalformedIndex, enableVerticalCompaction: enableVerticalCompaction, compactions: compactions, @@ -391,7 +390,11 @@ func (cg *Group) Add(meta *metadata.Meta) error { if cg.resolution != meta.Thanos.Downsample.Resolution { return errors.New("block and group resolution do not match") } - cg.blocks[meta.ULID] = meta + + cg.metasByMinTime = append(cg.metasByMinTime, meta) + sort.Slice(cg.metasByMinTime, func(i, j int) bool { + return cg.metasByMinTime[i].MinTime < cg.metasByMinTime[j].MinTime + }) return nil } @@ -400,8 +403,8 @@ func (cg *Group) IDs() (ids []ulid.ULID) { cg.mtx.Lock() defer cg.mtx.Unlock() - for id := range cg.blocks { - ids = append(ids, id) + for _, m := range cg.metasByMinTime { + ids = append(ids, m.ULID) } sort.Slice(ids, func(i, j int) bool { return ids[i].Compare(ids[j]) < 0 @@ -414,13 +417,10 @@ func (cg *Group) MinTime() int64 { cg.mtx.Lock() defer cg.mtx.Unlock() - min := int64(math.MaxInt64) - for _, b := range cg.blocks { - if b.MinTime < min { - min = b.MinTime - } + if len(cg.metasByMinTime) > 0 { + return cg.metasByMinTime[0].MinTime } - return min + return math.MaxInt64 } // MaxTime returns the max time across all group's blocks. @@ -429,9 +429,9 @@ func (cg *Group) MaxTime() int64 { defer cg.mtx.Unlock() max := int64(math.MinInt64) - for _, b := range cg.blocks { - if b.MaxTime > max { - max = b.MaxTime + for _, m := range cg.metasByMinTime { + if m.MaxTime > max { + max = m.MaxTime } } return max @@ -447,9 +447,35 @@ func (cg *Group) Resolution() int64 { return cg.resolution } +// Planner returns blocks to compact. +type Planner interface { + // Plan returns a block directories of blocks that should be compacted into single one. + // The blocks can be overlapping. The provided metadata has to be ordered by minTime. + Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) +} + +// Compactor provides compaction against an underlying storage of time series data. +// This is similar to tsdb.Compactor just without Plan method. +// TODO(bwplotka): Split the Planner from Compactor on upstream as well, so we can import it. +type Compactor interface { + // Write persists a Block into a directory. + // No Block is written when resulting Block has 0 samples, and returns empty ulid.ULID{}. + Write(dest string, b tsdb.BlockReader, mint, maxt int64, parent *tsdb.BlockMeta) (ulid.ULID, error) + + // Compact runs compaction against the provided directories. Must + // only be called concurrently with results of Plan(). + // Can optionally pass a list of already open blocks, + // to avoid having to reopen them. + // When resulting Block has 0 samples + // * No block is written. + // * The source dirs are marked Deletable. + // * Returns empty ulid.ULID{}. + Compact(dest string, dirs []string, open []*tsdb.Block) (ulid.ULID, error) +} + // Compact plans and runs a single compaction against the group. The compacted result // is uploaded into the bucket the blocks were retrieved from. -func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) { +func (cg *Group) Compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, rerr error) { cg.compactionRunsStarted.Inc() subDir := filepath.Join(dir, cg.Key()) @@ -470,7 +496,7 @@ func (cg *Group) Compact(ctx context.Context, dir string, comp tsdb.Compactor) ( return false, ulid.ULID{}, errors.Wrap(err, "create compaction group dir") } - shouldRerun, compID, err := cg.compact(ctx, subDir, comp) + shouldRerun, compID, err := cg.compact(ctx, subDir, planner, comp) if err != nil { cg.compactionFailures.Inc() return false, ulid.ULID{}, err @@ -562,22 +588,18 @@ func IsRetryError(err error) bool { return ok } -func (cg *Group) areBlocksOverlapping(include *metadata.Meta, excludeDirs ...string) error { +func (cg *Group) areBlocksOverlapping(include *metadata.Meta, exclude ...*metadata.Meta) error { var ( - metas []tsdb.BlockMeta - exclude = map[ulid.ULID]struct{}{} + metas []tsdb.BlockMeta + excludeMap = map[ulid.ULID]struct{}{} ) - for _, e := range excludeDirs { - id, err := ulid.Parse(filepath.Base(e)) - if err != nil { - return errors.Wrapf(err, "overlaps find dir %s", e) - } - exclude[id] = struct{}{} + for _, meta := range exclude { + excludeMap[meta.ULID] = struct{}{} } - for _, m := range cg.blocks { - if _, ok := exclude[m.ULID]; ok { + for _, m := range cg.metasByMinTime { + if _, ok := excludeMap[m.ULID]; ok { continue } metas = append(metas, m.BlockMeta) @@ -654,7 +676,7 @@ func RepairIssue347(ctx context.Context, logger log.Logger, bkt objstore.Bucket, return nil } -func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) (shouldRerun bool, compID ulid.ULID, err error) { +func (cg *Group) compact(ctx context.Context, dir string, planner Planner, comp Compactor) (shouldRerun bool, compID ulid.ULID, err error) { cg.mtx.Lock() defer cg.mtx.Unlock() @@ -670,29 +692,16 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( overlappingBlocks = true } - // Planning a compaction works purely based on the meta.json files in our future group's dir. - // So we first dump all our memory block metas into the directory. - for _, meta := range cg.blocks { - bdir := filepath.Join(dir, meta.ULID.String()) - if err := os.MkdirAll(bdir, 0777); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "create planning block dir") - } - if err := meta.WriteToDir(cg.logger, bdir); err != nil { - return false, ulid.ULID{}, errors.Wrap(err, "write planning meta file") - } - } - - // Plan against the written meta.json files. - plan, err := comp.Plan(dir) + toCompact, err := planner.Plan(ctx, cg.metasByMinTime) if err != nil { return false, ulid.ULID{}, errors.Wrap(err, "plan compaction") } - if len(plan) == 0 { + if len(toCompact) == 0 { // Nothing to do. return false, ulid.ULID{}, nil } - level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", plan)) + level.Info(cg.logger).Log("msg", "compaction available and planned; downloading blocks", "plan", fmt.Sprintf("%v", toCompact)) // Due to #183 we verify that none of the blocks in the plan have overlapping sources. // This is one potential source of how we could end up with duplicated chunks. @@ -701,71 +710,54 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // Once we have a plan we need to download the actual data. begin := time.Now() - for _, pdir := range plan { - meta, err := metadata.Read(pdir) - if err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "read meta from %s", pdir) - } - + toCompactDirs := make([]string, 0, len(toCompact)) + for _, meta := range toCompact { + bdir := filepath.Join(dir, meta.ULID.String()) for _, s := range meta.Compaction.Sources { if _, ok := uniqueSources[s]; ok { - return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", plan)) + return false, ulid.ULID{}, halt(errors.Errorf("overlapping sources detected for plan %v", toCompact)) } uniqueSources[s] = struct{}{} } - id, err := ulid.Parse(filepath.Base(pdir)) - if err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "plan dir %s", pdir) - } - - if meta.ULID.Compare(id) != 0 { - return false, ulid.ULID{}, errors.Errorf("mismatch between meta %s and dir %s", meta.ULID, id) - } - - if err := block.Download(ctx, cg.logger, cg.bkt, id, pdir); err != nil { - return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", id)) + if err := block.Download(ctx, cg.logger, cg.bkt, meta.ULID, bdir); err != nil { + return false, ulid.ULID{}, retry(errors.Wrapf(err, "download block %s", meta.ULID)) } // Ensure all input blocks are valid. - stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(pdir, block.IndexFilename), meta.MinTime, meta.MaxTime) + stats, err := block.GatherIndexIssueStats(cg.logger, filepath.Join(bdir, block.IndexFilename), meta.MinTime, meta.MaxTime) if err != nil { - return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", pdir) + return false, ulid.ULID{}, errors.Wrapf(err, "gather index issues for block %s", bdir) } if err := stats.CriticalErr(); err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", pdir, meta.Compaction.Level, meta.Thanos.Labels)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "block with not healthy index found %s; Compaction level %v; Labels: %v", bdir, meta.Compaction.Level, meta.Thanos.Labels)) } if err := stats.Issue347OutsideChunksErr(); err != nil { - return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", pdir), meta.ULID) + return false, ulid.ULID{}, issue347Error(errors.Wrapf(err, "invalid, but reparable block %s", bdir), meta.ULID) } if err := stats.PrometheusIssue5372Err(); !cg.acceptMalformedIndex && err != nil { return false, ulid.ULID{}, errors.Wrapf(err, - "block id %s, try running with --debug.accept-malformed-index", id) + "block id %s, try running with --debug.accept-malformed-index", meta.ULID) } + toCompactDirs = append(toCompactDirs, bdir) } - level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", plan), "duration", time.Since(begin)) + level.Info(cg.logger).Log("msg", "downloaded and verified blocks; compacting blocks", "plan", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin)) begin = time.Now() - - compID, err = comp.Compact(dir, plan, nil) + compID, err = comp.Compact(dir, toCompactDirs, nil) if err != nil { - return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", plan)) + return false, ulid.ULID{}, halt(errors.Wrapf(err, "compact blocks %v", toCompactDirs)) } if compID == (ulid.ULID{}) { // Prometheus compactor found that the compacted block would have no samples. - level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", fmt.Sprintf("%v", plan)) - for _, block := range plan { - meta, err := metadata.Read(block) - if err != nil { - level.Warn(cg.logger).Log("msg", "failed to read meta for block", "block", block) - continue - } + level.Info(cg.logger).Log("msg", "compacted block would have no samples, deleting source blocks", "blocks", fmt.Sprintf("%v", toCompactDirs)) + for _, meta := range toCompact { if meta.Stats.NumSamples == 0 { - if err := cg.deleteBlock(block); err != nil { - level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", block) + if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { + level.Warn(cg.logger).Log("msg", "failed to mark for deletion an empty block found during compaction", "block", meta.ULID) } } } @@ -777,7 +769,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( cg.verticalCompactions.Inc() } level.Info(cg.logger).Log("msg", "compacted blocks", "new", compID, - "blocks", fmt.Sprintf("%v", plan), "duration", time.Since(begin), "overlapping_blocks", overlappingBlocks) + "blocks", fmt.Sprintf("%v", toCompactDirs), "duration", time.Since(begin), "overlapping_blocks", overlappingBlocks) bdir := filepath.Join(dir, compID.String()) index := filepath.Join(bdir, block.IndexFilename) @@ -804,7 +796,7 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // Ensure the output block is not overlapping with anything else, // unless vertical compaction is enabled. if !cg.enableVerticalCompaction { - if err := cg.areBlocksOverlapping(newMeta, plan...); err != nil { + if err := cg.areBlocksOverlapping(newMeta, toCompact...); err != nil { return false, ulid.ULID{}, halt(errors.Wrapf(err, "resulted compacted block %s overlaps with something", bdir)) } } @@ -819,23 +811,17 @@ func (cg *Group) compact(ctx context.Context, dir string, comp tsdb.Compactor) ( // Mark for deletion the blocks we just compacted from the group and bucket so they do not get included // into the next planning cycle. // Eventually the block we just uploaded should get synced into the group again (including sync-delay). - for _, b := range plan { - if err := cg.deleteBlock(b); err != nil { + for _, meta := range toCompact { + if err := cg.deleteBlock(meta.ULID, filepath.Join(dir, meta.ULID.String())); err != nil { return false, ulid.ULID{}, retry(errors.Wrapf(err, "mark old block for deletion from bucket")) } cg.groupGarbageCollectedBlocks.Inc() } - return true, compID, nil } -func (cg *Group) deleteBlock(b string) error { - id, err := ulid.Parse(filepath.Base(b)) - if err != nil { - return errors.Wrapf(err, "plan dir %s", b) - } - - if err := os.RemoveAll(b); err != nil { +func (cg *Group) deleteBlock(id ulid.ULID, bdir string) error { + if err := os.RemoveAll(bdir); err != nil { return errors.Wrapf(err, "remove old block dir %s", id) } @@ -854,7 +840,8 @@ type BucketCompactor struct { logger log.Logger sy *Syncer grouper Grouper - comp tsdb.Compactor + comp Compactor + planner Planner compactDir string bkt objstore.Bucket concurrency int @@ -865,7 +852,8 @@ func NewBucketCompactor( logger log.Logger, sy *Syncer, grouper Grouper, - comp tsdb.Compactor, + planner Planner, + comp Compactor, compactDir string, bkt objstore.Bucket, concurrency int, @@ -877,6 +865,7 @@ func NewBucketCompactor( logger: logger, sy: sy, grouper: grouper, + planner: planner, comp: comp, compactDir: compactDir, bkt: bkt, @@ -914,7 +903,7 @@ func (c *BucketCompactor) Compact(ctx context.Context) (rerr error) { go func() { defer wg.Done() for g := range groupChan { - shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.comp) + shouldRerunGroup, _, err := g.Compact(workCtx, c.compactDir, c.planner, c.comp) if err == nil { if shouldRerunGroup { mtx.Lock() diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 478d0364ff..93dc2d5b24 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -195,8 +195,10 @@ func TestGroup_Compact_e2e(t *testing.T) { comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil) testutil.Ok(t, err) + planner := NewTSDBBasedPlanner(logger, []int64{1000, 3000}) + grouper := NewDefaultGrouper(logger, bkt, false, false, reg, blocksMarkedForDeletion, garbageCollectedBlocks) - bComp, err := NewBucketCompactor(logger, sy, grouper, comp, dir, bkt, 2) + bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2) testutil.Ok(t, err) // Compaction on empty should not fail. diff --git a/pkg/compact/compact_test.go b/pkg/compact/compact_test.go index 3f2928b15f..47423f8812 100644 --- a/pkg/compact/compact_test.go +++ b/pkg/compact/compact_test.go @@ -6,7 +6,6 @@ package compact import ( "testing" - "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb" @@ -116,21 +115,13 @@ func TestGroupKey(t *testing.T) { func TestGroupMaxMinTime(t *testing.T) { g := &Group{ - blocks: make(map[ulid.ULID]*metadata.Meta), + metasByMinTime: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 1, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 2, MaxTime: 30}}, + }, } - now := ulid.Now() - id1, err := ulid.New(now, nil) - testutil.Ok(t, err) - id2, err := ulid.New(now-10, nil) - testutil.Ok(t, err) - id3, err := ulid.New(now+10, nil) - testutil.Ok(t, err) - - g.blocks[id1] = &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}} - g.blocks[id2] = &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 1, MaxTime: 20}} - g.blocks[id3] = &metadata.Meta{BlockMeta: tsdb.BlockMeta{MinTime: 2, MaxTime: 30}} - testutil.Equals(t, int64(0), g.MinTime()) testutil.Equals(t, int64(30), g.MaxTime()) } diff --git a/pkg/compact/planner.go b/pkg/compact/planner.go new file mode 100644 index 0000000000..53fb628bbc --- /dev/null +++ b/pkg/compact/planner.go @@ -0,0 +1,163 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compact + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type tsdbBasedPlanner struct { + logger log.Logger + + ranges []int64 +} + +var _ Planner = &tsdbBasedPlanner{} + +// NewTSDBBasedPlanner is planner with the same functionality as Prometheus' TSDB plus special handling of excluded blocks. +// TODO(bwplotka): Consider upstreaming this to Prometheus. +// It's the same functionality just without accessing filesystem, and special handling of excluded blocks. +func NewTSDBBasedPlanner(logger log.Logger, ranges []int64) *tsdbBasedPlanner { + return &tsdbBasedPlanner{logger: logger, ranges: ranges} +} + +func (p *tsdbBasedPlanner) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { + res := selectOverlappingMetas(metasByMinTime) + if len(res) > 0 { + return res, nil + } + + // No overlapping blocks, do compaction the usual way. + // We do not include a recently created block with max(minTime), so the block which was just created from WAL. + // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. + metasByMinTime = metasByMinTime[:len(metasByMinTime)-1] + res = append(res, selectMetas(p.ranges, metasByMinTime)...) + if len(res) > 0 { + return res, nil + } + + // Compact any blocks with big enough time range that have >5% tombstones. + for i := len(metasByMinTime) - 1; i >= 0; i-- { + meta := metasByMinTime[i] + if meta.MaxTime-meta.MinTime < p.ranges[len(p.ranges)/2] { + break + } + if float64(meta.Stats.NumTombstones)/float64(meta.Stats.NumSeries+1) > 0.05 { + return []*metadata.Meta{metasByMinTime[i]}, nil + } + } + + return nil, nil +} + +// selectMetas returns the dir metas that should be compacted into a single new block. +// If only a single block range is configured, the result is always nil. +func selectMetas(ranges []int64, metasByMinTime []*metadata.Meta) []*metadata.Meta { + if len(ranges) < 2 || len(metasByMinTime) < 1 { + return nil + } + + highTime := metasByMinTime[len(metasByMinTime)-1].MinTime + + for _, iv := range ranges[1:] { + parts := splitByRange(metasByMinTime, iv) + if len(parts) == 0 { + continue + } + + Outer: + for _, p := range parts { + // Do not select the range if it has a block whose compaction failed. + for _, m := range p { + if m.Compaction.Failed { + continue Outer + } + } + + mint := p[0].MinTime + maxt := p[len(p)-1].MaxTime + // Pick the range of blocks if it spans the full range (potentially with gaps) + // or is before the most recent block. + // This ensures we don't compact blocks prematurely when another one of the same + // size still fits in the range. + if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 { + return p + } + } + } + + return nil +} + +// selectOverlappingMetas returns all dirs with overlapping time ranges. +// It expects sorted input by mint and returns the overlapping dirs in the same order as received. +func selectOverlappingMetas(metasByMinTime []*metadata.Meta) []*metadata.Meta { + if len(metasByMinTime) < 2 { + return nil + } + var overlappingMetas []*metadata.Meta + globalMaxt := metasByMinTime[0].MaxTime + for i, m := range metasByMinTime[1:] { + if m.MinTime < globalMaxt { + if len(overlappingMetas) == 0 { + // When it is the first overlap, need to add the last one as well. + overlappingMetas = append(overlappingMetas, metasByMinTime[i]) + } + overlappingMetas = append(overlappingMetas, m) + } else if len(overlappingMetas) > 0 { + break + } + + if m.MaxTime > globalMaxt { + globalMaxt = m.MaxTime + } + } + return overlappingMetas +} + +// splitByRange splits the directories by the time range. The range sequence starts at 0. +// +// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 +// it returns [0-10, 10-20], [50-60], [90-100]. +func splitByRange(metasByMinTime []*metadata.Meta, tr int64) [][]*metadata.Meta { + var splitDirs [][]*metadata.Meta + + for i := 0; i < len(metasByMinTime); { + var ( + group []*metadata.Meta + t0 int64 + m = metasByMinTime[i] + ) + // Compute start of aligned time range of size tr closest to the current block's start. + if m.MinTime >= 0 { + t0 = tr * (m.MinTime / tr) + } else { + t0 = tr * ((m.MinTime - tr + 1) / tr) + } + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if m.MaxTime > t0+tr { + i++ + continue + } + + // Add all dirs to the current group that are within [t0, t0+tr]. + for ; i < len(metasByMinTime); i++ { + // Either the block falls into the next range or doesn't fit at all (checked above). + if metasByMinTime[i].MaxTime > t0+tr { + break + } + group = append(group, metasByMinTime[i]) + } + + if len(group) > 0 { + splitDirs = append(splitDirs, group) + } + } + + return splitDirs +} diff --git a/pkg/compact/planner_test.go b/pkg/compact/planner_test.go new file mode 100644 index 0000000000..b8748b9029 --- /dev/null +++ b/pkg/compact/planner_test.go @@ -0,0 +1,410 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package compact + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "sort" + "testing" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/testutil" +) + +type tsdbPlannerAdapter struct { + dir string + comp tsdb.Compactor +} + +func (p *tsdbPlannerAdapter) Plan(_ context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { + // TSDB planning works based on the meta.json files in the given dir. Mock it up. + for _, meta := range metasByMinTime { + bdir := filepath.Join(p.dir, meta.ULID.String()) + if err := os.MkdirAll(bdir, 0777); err != nil { + return nil, errors.Wrap(err, "create planning block dir") + } + if err := meta.WriteToDir(log.NewNopLogger(), bdir); err != nil { + return nil, errors.Wrap(err, "write planning meta file") + } + } + plan, err := p.comp.Plan(p.dir) + if err != nil { + return nil, err + } + + var res []*metadata.Meta + for _, pdir := range plan { + meta, err := metadata.Read(pdir) + if err != nil { + return nil, errors.Wrapf(err, "read meta from %s", pdir) + } + res = append(res, meta) + } + return res, nil +} + +// Adapted from https://github.com/prometheus/prometheus/blob/6c56a1faaaad07317ff585bda75b99bdba0517ad/tsdb/compact_test.go#L167 +func TestPlanners_Plan_Compatibility(t *testing.T) { + ranges := []int64{ + 20, + 60, + 180, + 540, + 1620, + } + + // This mimics our default ExponentialBlockRanges with min block size equals to 20. + tsdbComp, err := tsdb.NewLeveledCompactor(context.Background(), nil, nil, ranges, nil) + testutil.Ok(t, err) + tsdbPlanner := &tsdbPlannerAdapter{comp: tsdbComp} + tsdbBasedPlanner := NewTSDBBasedPlanner(log.NewNopLogger(), ranges) + + for _, c := range []struct { + name string + metas []*metadata.Meta + expected []*metadata.Meta + }{ + { + name: "Outside Range", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + }, + }, + { + name: "We should wait for four blocks of size 20 to appear before compacting.", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + }, + }, + { + name: `We should wait for a next block of size 20 to appear before compacting + the existing ones. We have three, but we ignore the fresh one from WAl`, + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + }, + { + name: "Block to fill the entire parent range appeared – should be compacted", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + }, + { + name: `Block for the next parent range appeared with gap with size 20. Nothing will happen in the first one + anymore but we ignore fresh one still, so no compaction`, + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + }, + { + name: `Block for the next parent range appeared, and we have a gap with size 20 between second and third block. + We will not get this missed gap anymore and we should compact just these two.`, + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 80, MaxTime: 100}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + }, + }, + { + name: "We have 20, 20, 20, 60, 60 range blocks. '5' is marked as fresh one", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 120, MaxTime: 180}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + }, + {name: "We have 20, 60, 20, 60, 240 range blocks. We can compact 20 + 60 + 60", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 960, MaxTime: 980}}, // Fresh one. + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 720, MaxTime: 960}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 120, MaxTime: 180}}, + }, + }, + { + name: "Do not select large blocks that have many tombstones when there is no fresh block", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}, + }, + }, + { + name: "Select large blocks that have many tombstones when fresh appears", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 540, MaxTime: 560}}, + }, + expected: []*metadata.Meta{{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}}, + }, + { + name: "For small blocks, do not compact tombstones, even when fresh appears.", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 60, Stats: tsdb.BlockStats{ + NumSeries: 10, + NumTombstones: 3, + }}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 60, MaxTime: 80}}, + }, + }, + { + name: `Regression test: we were stuck in a compact loop where we always recompacted + the same block when tombstones and series counts were zero`, + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 540, Stats: tsdb.BlockStats{ + NumSeries: 0, + NumTombstones: 0, + }}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 540, MaxTime: 560}}, + }, + }, + { + name: `Regression test: we were wrongly assuming that new block is fresh from WAL when its ULID is newest. + We need to actually look on max time instead. + + With previous, wrong approach "8" block was ignored, so we were wrongly compacting 5 and 7 and introducing + block overlaps`, + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 0, MaxTime: 360}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 540, MaxTime: 560}}, // Fresh one. + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, + }, + }, + // |--------------| + // |----------------| + // |--------------| + { + name: "Overlapping blocks 1", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 19, MaxTime: 40}}, + }, + }, + // |--------------| + // |--------------| + // |--------------| + { + name: "Overlapping blocks 2", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, + }, + }, + // |--------------| + // |---------------------| + // |--------------| + { + name: "Overlapping blocks 3", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 10, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 10, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 50}}, + }, + }, + // |--------------| + // |--------------------------------| + // |--------------| + // |--------------| + { + name: "Overlapping blocks 4", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 0, MaxTime: 360}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 340, MaxTime: 560}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 0, MaxTime: 360}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 340, MaxTime: 560}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(7, nil), MinTime: 360, MaxTime: 420}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(8, nil), MinTime: 420, MaxTime: 540}}, + }, + }, + // |--------------| + // |--------------| + // |--------------| + // |--------------| + { + name: "Overlapping blocks 5", + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 39, MaxTime: 50}}, + }, + expected: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 9, MaxTime: 20}}, + }, + }, + } { + t.Run(c.name, func(t *testing.T) { + // For compatibility. + t.Run("tsdbPlannerAdapter", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test-compact") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + metasByMinTime := make([]*metadata.Meta, len(c.metas)) + for i := range metasByMinTime { + metasByMinTime[i] = c.metas[i] + } + sort.Slice(metasByMinTime, func(i, j int) bool { + return metasByMinTime[i].MinTime < metasByMinTime[j].MinTime + }) + + tsdbPlanner.dir = dir + plan, err := tsdbPlanner.Plan(context.Background(), metasByMinTime) + testutil.Ok(t, err) + testutil.Equals(t, c.expected, plan) + }) + t.Run("tsdbBasedPlanner", func(t *testing.T) { + metasByMinTime := make([]*metadata.Meta, len(c.metas)) + for i := range metasByMinTime { + metasByMinTime[i] = c.metas[i] + } + sort.Slice(metasByMinTime, func(i, j int) bool { + return metasByMinTime[i].MinTime < metasByMinTime[j].MinTime + }) + + plan, err := tsdbBasedPlanner.Plan(context.Background(), metasByMinTime) + testutil.Ok(t, err) + testutil.Equals(t, c.expected, plan) + }) + }) + } +} + +// Adapted form: https://github.com/prometheus/prometheus/blob/6c56a1faaaad07317ff585bda75b99bdba0517ad/tsdb/compact_test.go#L377 +func TestRangeWithFailedCompactionWontGetSelected(t *testing.T) { + ranges := []int64{ + 20, + 60, + 180, + 540, + 1620, + } + + // This mimics our default ExponentialBlockRanges with min block size equals to 20. + tsdbComp, err := tsdb.NewLeveledCompactor(context.Background(), nil, nil, ranges, nil) + testutil.Ok(t, err) + tsdbPlanner := &tsdbPlannerAdapter{comp: tsdbComp} + tsdbBasedPlanner := NewTSDBBasedPlanner(log.NewNopLogger(), ranges) + + for _, c := range []struct { + metas []*metadata.Meta + }{ + { + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + }, + }, + { + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 80, MaxTime: 100}}, + }, + }, + { + metas: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(1, nil), MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(2, nil), MinTime: 20, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(3, nil), MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(4, nil), MinTime: 60, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(5, nil), MinTime: 120, MaxTime: 180}}, + {BlockMeta: tsdb.BlockMeta{Version: 1, ULID: ulid.MustNew(6, nil), MinTime: 180, MaxTime: 200}}, + }, + }, + } { + t.Run("", func(t *testing.T) { + c.metas[1].Compaction.Failed = true + // For compatibility. + t.Run("tsdbPlannerAdapter", func(t *testing.T) { + dir, err := ioutil.TempDir("", "test-compact") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + tsdbPlanner.dir = dir + plan, err := tsdbPlanner.Plan(context.Background(), c.metas) + testutil.Ok(t, err) + testutil.Equals(t, []*metadata.Meta(nil), plan) + }) + t.Run("tsdbBasedPlanner", func(t *testing.T) { + plan, err := tsdbBasedPlanner.Plan(context.Background(), c.metas) + testutil.Ok(t, err) + testutil.Equals(t, []*metadata.Meta(nil), plan) + }) + }) + } +} diff --git a/pkg/shipper/shipper_e2e_test.go b/pkg/shipper/shipper_e2e_test.go index bd5459cdad..f76ea5f76e 100644 --- a/pkg/shipper/shipper_e2e_test.go +++ b/pkg/shipper/shipper_e2e_test.go @@ -148,7 +148,6 @@ func TestShipper_SyncBlocks_e2e(t *testing.T) { {RelPath: "index", SizeBytes: 13}, {RelPath: "meta.json"}, } - meta.Thanos.Version = 1 buf := bytes.Buffer{} testutil.Ok(t, meta.Write(&buf)) @@ -305,7 +304,6 @@ func TestShipper_SyncBlocksWithMigrating_e2e(t *testing.T) { {RelPath: "index", SizeBytes: 13}, {RelPath: "meta.json"}, } - meta.Thanos.Version = 1 buf := bytes.Buffer{} testutil.Ok(t, meta.Write(&buf))