Skip to content

Commit

Permalink
enhance: adding mix compaction first prioritizer (milvus-io#36956)
Browse files Browse the repository at this point in the history
Signed-off-by: Ted Xu <[email protected]>
  • Loading branch information
tedxu authored Oct 18, 2024
1 parent b7ffa83 commit 50da48a
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 8 deletions.
6 changes: 5 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,11 @@ dataCoord:
# This configuration takes effect only when dataCoord.enableCompaction is set as true.
enableAutoCompaction: true
indexBasedCompaction: true
taskPrioritizer: default # compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.
# compaction task prioritizer, options: [default, level, mix].
# default is FIFO.
# level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.
# mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions.
taskPrioritizer: default
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2
Expand Down
19 changes: 16 additions & 3 deletions internal/datacoord/compaction_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,22 @@ var (
return 10
case datapb.CompactionType_ClusteringCompaction:
return 100
case datapb.CompactionType_MinorCompaction:
case datapb.CompactionType_MajorCompaction:
default:
return 1000
}
}

MixFirstPrioritizer Prioritizer = func(task CompactionTask) int {
switch task.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
return 10
case datapb.CompactionType_MixCompaction:
return 1
case datapb.CompactionType_ClusteringCompaction:
return 100
default:
return 1000
}
return 0xffff
}
)

Expand All @@ -181,6 +192,8 @@ func getPrioritizer() Prioritizer {
switch p {
case "level":
return LevelPrioritizer
case "mix":
return MixFirstPrioritizer
default:
return DefaultPrioritizer
}
Expand Down
26 changes: 24 additions & 2 deletions internal/datacoord/compaction_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestCompactionQueue(t *testing.T) {
t3 := &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 2,
Type: datapb.CompactionType_MajorCompaction,
Type: datapb.CompactionType_ClusteringCompaction,
},
}

Expand Down Expand Up @@ -88,7 +88,29 @@ func TestCompactionQueue(t *testing.T) {
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_MajorCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetType())
})

t.Run("mix first prioritizer", func(t *testing.T) {
cq := NewCompactionQueue(3, MixFirstPrioritizer)
err := cq.Enqueue(t1)
assert.NoError(t, err)
err = cq.Enqueue(t2)
assert.NoError(t, err)
err = cq.Enqueue(t3)
assert.NoError(t, err)
err = cq.Enqueue(&mixCompactionTask{})
assert.Error(t, err)

task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetType())
})

t.Run("update prioritizer", func(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3465,8 +3465,11 @@ This configuration takes effect only when dataCoord.enableCompaction is set as t
Key: "dataCoord.compaction.taskPrioritizer",
Version: "2.5.0",
DefaultValue: "default",
Doc: "compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.",
Export: true,
Doc: `compaction task prioritizer, options: [default, level, mix].
default is FIFO.
level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.
mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions.`,
Export: true,
}
p.CompactionTaskPrioritizer.Init(base.mgr)

Expand Down

0 comments on commit 50da48a

Please sign in to comment.