Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused compactor code #886

Merged
merged 3 commits into from
Jan 25, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 0 additions & 47 deletions pkg/compactor/bucket_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -213,52 +212,6 @@ func defaultGroupKey(res int64, lbls labels.Labels) string {
return fmt.Sprintf("%d@%v", res, lbls.Hash())
}

// DefaultGrouper is the default grouper. It groups blocks based on downsample
// resolution and block's labels.
type DefaultGrouper struct {
userID string
hashFunc metadata.HashFunc
}

// NewDefaultGrouper makes a new DefaultGrouper.
func NewDefaultGrouper(userID string, hashFunc metadata.HashFunc) *DefaultGrouper {
return &DefaultGrouper{
userID: userID,
hashFunc: hashFunc,
}
}

// Groups implements Grouper.Groups.
func (g *DefaultGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Job, err error) {
groups := map[string]*Job{}
for _, m := range blocks {
groupKey := DefaultGroupKey(m.Thanos)
job, ok := groups[groupKey]
if !ok {
lbls := labels.FromMap(m.Thanos.Labels)
job = NewJob(
g.userID,
groupKey,
lbls,
m.Thanos.Downsample.Resolution,
g.hashFunc,
false, // No splitting.
0, // No splitting shards.
"", // No sharding.
)
groups[groupKey] = job
res = append(res, job)
}
if err := job.AppendMeta(m); err != nil {
return nil, errors.Wrap(err, "add compaction group")
}
}
sort.Slice(res, func(i, j int) bool {
return res[i].Key() < res[j].Key()
})
return res, nil
}

func minTime(metas []*metadata.Meta) time.Time {
if len(metas) == 0 {
return time.Time{}
Expand Down
38 changes: 27 additions & 11 deletions pkg/compactor/bucket_compactor_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
m.ULID = ulid.MustNew(uint64(i), nil)
m.Compaction.Sources = []ulid.ULID{m.ULID}
m.Compaction.Level = 1
m.MinTime = 0
m.MaxTime = 2 * time.Hour.Milliseconds()

ids = append(ids, m.ULID)
metas = append(metas, &m)
Expand All @@ -91,16 +93,29 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
m3.Compaction.Level = 3
m3.Compaction.Sources = ids[:9] // last source ID is not part of level 3 block.
m3.Thanos.Downsample.Resolution = 0
m3.MinTime = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need it for m1 and m2?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need it for blocks that are passed to SplitAndMergeGrouper, that is metas[9], m3, m4 and m5. I can set it for all blocks if you prefer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Sorry for sending obsolete answer too. It's deleted now.)

m3.MaxTime = 2 * time.Hour.Milliseconds()

var m4 metadata.Meta
m4.Version = 1
m4.ULID = ulid.MustNew(400, nil)
m4.Compaction.Level = 2
m4.Compaction.Sources = ids[9:] // covers the last block but is a different resolution. Must not trigger deletion.
m4.Thanos.Downsample.Resolution = 1000
m4.MinTime = 0
m4.MaxTime = 2 * time.Hour.Milliseconds()

var m5 metadata.Meta
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added extra block, otherwise m4 would have nothing to merge with.

m5.Version = 1
m5.ULID = ulid.MustNew(500, nil)
m5.Compaction.Level = 2
m5.Compaction.Sources = ids[8:9] // built from block 8, but different resolution. Block 8 is already included in m3, can be deleted.
m5.Thanos.Downsample.Resolution = 1000
m5.MinTime = 0
m5.MaxTime = 2 * time.Hour.Milliseconds()

// Create all blocks in the bucket.
for _, m := range append(metas, &m1, &m2, &m3, &m4) {
for _, m := range append(metas, &m1, &m2, &m3, &m4, &m5) {
fmt.Println("create", m.ULID)
var buf bytes.Buffer
require.NoError(t, json.NewEncoder(&buf).Encode(&m))
Expand Down Expand Up @@ -147,21 +162,22 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
})

// Only the level 3 block, the last source block in both resolutions should be left.
assert.Equal(t, []ulid.ULID{metas[9].ULID, m3.ULID, m4.ULID}, rem)
assert.Equal(t, []ulid.ULID{metas[9].ULID, m3.ULID, m4.ULID, m5.ULID}, rem)

// After another sync the changes should also be reflected in the local groups.
require.NoError(t, sy.SyncMetas(ctx))
require.NoError(t, sy.GarbageCollect(ctx))

// Only the level 3 block, the last source block in both resolutions should be left.
grouper := NewDefaultGrouper("user-1", metadata.NoneFunc)
grouper := NewSplitAndMergeGrouper("user-1", []int64{2 * time.Hour.Milliseconds(), 12 * time.Hour.Milliseconds()}, 0, 0, log.NewNopLogger())
groups, err := grouper.Groups(sy.Metas())
require.NoError(t, err)

assert.Equal(t, "0@17241709254077376921", groups[0].Key())
assert.Equal(t, "0@17241709254077376921-merge--0-7200000", groups[0].Key())
assert.Equal(t, []ulid.ULID{metas[9].ULID, m3.ULID}, groups[0].IDs())
assert.Equal(t, "1000@17241709254077376921", groups[1].Key())
assert.Equal(t, []ulid.ULID{m4.ULID}, groups[1].IDs())

assert.Equal(t, "1000@17241709254077376921-merge--0-7200000", groups[1].Key())
assert.Equal(t, []ulid.ULID{m4.ULID, m5.ULID}, groups[1].IDs())
})
}

Expand Down Expand Up @@ -215,7 +231,7 @@ func TestGroupCompactE2E(t *testing.T) {

ignoreDeletionMarkFilter := NewExcludeMarkedForDeletionFilter(objstore.WithNoopInstr(bkt))
duplicateBlocksFilter := NewShardAwareDeduplicateFilter()
noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), false)
noCompactMarkerFilter := NewNoCompactionMarkFilter(objstore.WithNoopInstr(bkt), true)
metaFetcher, err := block.NewMetaFetcher(nil, 32, objstore.WithNoopInstr(bkt), "", nil, []block.MetadataFilter{
ignoreDeletionMarkFilter,
duplicateBlocksFilter,
Expand All @@ -231,8 +247,8 @@ func TestGroupCompactE2E(t *testing.T) {
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, nil)
require.NoError(t, err)

planner := NewPlanner(logger, []int64{1000, 3000}, noCompactMarkerFilter)
grouper := NewDefaultGrouper("user-1", metadata.NoneFunc)
planner := NewSplitAndMergePlanner([]int64{1000, 3000})
grouper := NewSplitAndMergeGrouper("user-1", []int64{1000, 3000}, 0, 0, logger)
metrics := NewBucketCompactorMetrics(blocksMarkedForDeletion, garbageCollectedBlocks, prometheus.NewPedanticRegistry())
bComp, err := NewBucketCompactor(logger, sy, grouper, planner, comp, dir, bkt, 2, true, ownAllJobs, sortJobsByNewestBlocksFirst, 4, metrics)
require.NoError(t, err)
Expand Down Expand Up @@ -344,8 +360,8 @@ func TestGroupCompactE2E(t *testing.T) {
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.blocksMarkedForNoCompact))
assert.Equal(t, 0.0, promtest.ToFloat64(sy.metrics.garbageCollectionFailures))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactions))
assert.Equal(t, 12.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted))
assert.Equal(t, 11.0, promtest.ToFloat64(metrics.groupCompactionRunsCompleted))
assert.Equal(t, 3.0, promtest.ToFloat64(metrics.groupCompactionRunsStarted))
assert.Equal(t, 2.0, promtest.ToFloat64(metrics.groupCompactionRunsCompleted))
assert.Equal(t, 1.0, promtest.ToFloat64(metrics.groupCompactionRunsFailed))

_, err = os.Stat(dir)
Expand Down
217 changes: 0 additions & 217 deletions pkg/compactor/planner.go

This file was deleted.

Loading