Skip to content

Commit

Permalink
Remove unused compactor code (#886)
Browse files Browse the repository at this point in the history
* Remove unused compactor code.

Signed-off-by: Peter Štibraný <[email protected]>

* Removed DefaultGrouper.

Signed-off-by: Peter Štibraný <[email protected]>

* Remove extra time range.

Signed-off-by: Peter Štibraný <[email protected]>
  • Loading branch information
pstibrany authored Jan 25, 2022
1 parent 5270b17 commit 4979b21
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 918 deletions.
47 changes: 0 additions & 47 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -213,52 +212,6 @@ func defaultGroupKey(res int64, lbls labels.Labels) string {
return fmt.Sprintf("%d@%v", res, lbls.Hash())
}

// DefaultGrouper is the default grouper. It groups blocks based on downsample
// resolution and block's labels.
type DefaultGrouper struct {
userID string
hashFunc metadata.HashFunc
}

// NewDefaultGrouper makes a new DefaultGrouper.
func NewDefaultGrouper(userID string, hashFunc metadata.HashFunc) *DefaultGrouper {
return &DefaultGrouper{
userID: userID,
hashFunc: hashFunc,
}
}

// Groups implements Grouper.Groups.
func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Job, err error) {
groups := map[string]*Job{}
for _, m := range blocks {
groupKey := DefaultGroupKey(m.Thanos)
job, ok := groups[groupKey]
if !ok {
lbls := labels.FromMap(m.Thanos.Labels)
job = NewJob(
g.userID,
groupKey,
lbls,
m.Thanos.Downsample.Resolution,
g.hashFunc,
false, // No splitting.
0, // No splitting shards.
"", // No sharding.
)
groups[groupKey] = job
res = append(res, job)
}
if err := job.AppendMeta(m); err != nil {
return nil, errors.Wrap(err, "add compaction group")
}
}
sort.Slice(res, func(i, j int) bool {
return res[i].Key() < res[j].Key()
})
return res, nil
}

func minTime(metas []*metadata.Meta) time.Time {
if len(metas) == 0 {
return time.Time{}
Expand Down
38 changes: 27 additions & 11 deletions pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
m.ULID = ulid.MustNew(uint64(i), nil)
m.Compaction.Sources = []ulid.ULID{m.ULID}
m.Compaction.Level = 1
m.MinTime = 0
m.MaxTime = 2 * time.Hour.Milliseconds()

ids = append(ids, m.ULID)
metas = append(metas, &m)
Expand All @@ -91,16 +93,29 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
m3.Compaction.Level = 3
m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block.
m3.Thanos.Downsample.Resolution = 0
m3.MinTime = 0
m3.MaxTime = 2 * time.Hour.Milliseconds()

var m4 metadata.Meta
m4.Version = 1
m4.ULID = ulid.MustNew(400, nil)
m4.Compaction.Level = 2
m4.Compaction.Sources = ids[9:] // covers the last block but is a different resolution. Must not trigger deletion.
m4.Thanos.Downsample.Resolution = 1000
m4.MinTime = 0
m4.MaxTime = 2 * time.Hour.Milliseconds()

var m5 metadata.Meta
m5.Version = 1
m5.ULID = ulid.MustNew(500, nil)
m5.Compaction.Level = 2
m5.Compaction.Sources = ids[8:9] // built from block 8, but different resolution. Block 8 is already included in m3, can be deleted.
m5.Thanos.Downsample.Resolution = 1000
m5.MinTime = 0
m5.MaxTime = 2 * time.Hour.Milliseconds()

// Create all blocks in the bucket.
for _, m := range append(metas, &m1, &m2, &m3, &m4) {
for _, m := range append(metas, &m1, &m2, &m3, &m4, &m5) {
fmt.Println("create", m.ULID)
var buf bytes.Buffer
require.NoError(t, json.NewEncoder(&buf).Encode(&m))
Expand Down Expand Up @@ -147,21 +162,22 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
})

// Only the level 3 block, the last source block in both resolutions should be left.
assert.Equal(t, []ulid.ULID{metas[9].ULID, m3.ULID, m4.ULID}, rem)
assert.Equal(t, []ulid.ULID{metas[9].ULID, m3.ULID, m4.ULID, m5.ULID}, rem)

// After another sync the changes should also be reflected in the local groups.
require.NoError(t, sy.SyncMetas(ctx))
require.NoError(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewDefaultGrouper("user-1", metadata.NoneFunc)
grouper := NewSplitAndMergeGrouper("user-1", []int64{2 * time.Hour.Milliseconds()}, 0, 0, log.NewNopLogger())
groups, err := grouper.Groups(sy.Metas())
require.NoError(t, err)

assert.Equal(t, "0@17241709254077376921", groups[0].Key())
assert.Equal(t, "0@17241709254077376921-merge--0-7200000", groups[0].Key())
assert.Equal(t, []ulid.ULID{metas[9].ULID, m3.ULID}, groups[0].IDs())
assert.Equal(t, "1000@17241709254077376921", groups[1].Key())
assert.Equal(t, []ulid.ULID{m4.ULID}, groups[1].IDs())

assert.Equal(t, "1000@17241709254077376921-merge--0-7200000", groups[1].Key())
assert.Equal(t, []ulid.ULID{m4.ULID, m5.ULID}, groups[1].IDs())
})
}

Expand Down Expand Up @@ -215,7 +231,7 @@ func TestGroupCompactE2E(t *testing.T) {

ignoreDeletionMarkFilter := NewExcludeMarkedForDeletionFilter(objstore.WithNoopInstr(bkt))
duplicateBlocksFilter := NewShardAwareDeduplicateFilter()
noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), false)
noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
Expand All @@ -231,8 +247,8 @@ func TestGroupCompactE2E(t *testing.T) {
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil)
require.NoError(t, err)

planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
grouper := NewDefaultGrouper("user-1", metadata.NoneFunc)
planner := NewSplitAndMergePlanner([]int64{1000, 3000})
grouper := NewSplitAndMergeGrouper("user-1", []int64{1000, 3000}, 0, 0, logger)
metrics := NewBucketCompactorMetrics(blocksMarkedForDeletion, garbageCollectedBlocks, prometheus.NewPedanticRegistry())
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true, ownAllJobs, sortJobsByNewestBlocksFirst, 4, metrics)
require.NoError(t, err)
Expand Down Expand Up @@ -344,8 +360,8 @@ func TestGroupCompactE2E(t *testing.T) {
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.blocksMarkedForNoCompact))
assert.Equal(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactions))
assert.Equal(t, 12.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted))
assert.Equal(t, 11.0, promtest.ToFloat64(metrics.groupCompactionRunsCompleted))
assert.Equal(t, 3.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactionRunsCompleted))
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.groupCompactionRunsFailed))

_, err = os.Stat(dir)
Expand Down
217 changes: 0 additions & 217 deletions pkg/compactor/planner.go

This file was deleted.

Loading

0 comments on commit 4979b21

Please sign in to comment.