Skip to content

Commit

Permalink
Support set compaction task slot usage in scheduling
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Jul 11, 2024
1 parent 9b37d3f commit 9b0e8db
Show file tree
Hide file tree
Showing 17 changed files with 268 additions and 24 deletions.
6 changes: 5 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,14 @@ dataNode:
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
slot:
slotCap: 2 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.
clusteringCompactionUsage: 16
mixCompactionUsage: 8
l0DeleteCompactionUsage: 8

clusteringCompaction:
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.
workerPoolSize: 8

# Configures the system log output.
log:
Expand Down
34 changes: 26 additions & 8 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -162,7 +163,6 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
if t.GetTriggerID() == triggerID {
cnt += 1
}
// if t.GetPlanID()
}
c.mu.RUnlock()
c.executingMu.RLock()
Expand Down Expand Up @@ -591,10 +591,15 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
}

for _, t := range tasks {
nodeID := c.pickAnyNode(slots)
nodeID := c.pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()))
if t.GetType() == datapb.CompactionType_ClusteringCompaction {
log.Info("cannot find datanode for clustering compaction task, largely because all the datanodes are busy",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()))
} else {
log.Info("cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()))
}
continue
}
err := t.SetNodeID(nodeID)
Expand Down Expand Up @@ -644,17 +649,30 @@ func (c *compactionPlanHandler) checkCompaction() error {
return nil
}

func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64) int64 {
func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) int64 {
var (
nodeID int64 = NullNodeID
maxSlots int64 = -1
nodeID int64 = NullNodeID
maxSlots int64 = -1
acquireSlot int64 = 0
)

switch task.GetType() {
case datapb.CompactionType_ClusteringCompaction:
acquireSlot = paramtable.Get().DataNodeCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
acquireSlot = paramtable.Get().DataNodeCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
acquireSlot = paramtable.Get().DataNodeCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}

for id, slots := range nodeSlots {
if slots > 0 && slots > maxSlots {
if slots >= acquireSlot && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
// update the input nodeSlots
nodeSlots[nodeID] = nodeSlots[nodeID] - acquireSlot
return nodeID
}

Expand Down
11 changes: 9 additions & 2 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,10 @@ func (t *clusteringCompactionTask) doAnalyze() error {
func (t *clusteringCompactionTask) doCompact() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
if t.NeedReAssignNodeID() {
return errors.New("not assign nodeID")
log.RatedWarn(10, "not assign nodeID")
return nil
}
log = log.With(zap.Int64("nodeID", t.GetNodeID()))

// todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird
// check whether the compaction plan is already submitted considering
Expand Down Expand Up @@ -434,6 +436,11 @@ func (t *clusteringCompactionTask) doCompact() error {
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
if errors.Is(err, merr.ErrTooManyClusteringCompaction) {
log.Warn("fail to notify compaction tasks to DataNode because the node has many clustering compaction executing")
t.updateAndSaveTaskMeta(setNodeID(NullNodeID))
return nil
}
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return err
Expand Down Expand Up @@ -556,7 +563,7 @@ func (t *clusteringCompactionTask) GetLabel() string {
}

func (t *clusteringCompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
}

func (t *clusteringCompactionTask) CleanLogPath() {
Expand Down
59 changes: 57 additions & 2 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,69 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {

func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.SetupTest()
Params.Save(Params.DataNodeCfg.MixCompactionSlotUsage.Key, "1")
nodeSlots := map[int64]int64{
100: 2,
101: 3,
}
node := s.handler.pickAnyNode(nodeSlots)
node := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
NodeID: 100,
},
})
s.Equal(int64(101), node)
node = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
NodeID: 100,
},
})
s.Equal(int64(100), node)
node = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
NodeID: 100,
},
})
s.Equal(int64(101), node)

node = s.handler.pickAnyNode(map[int64]int64{})
node = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
s.Equal(int64(NullNodeID), node)
}

func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 2,
101: 16,
102: 10,
}
executingTasks := make(map[int64]CompactionTask, 0)
executingTasks[1] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
NodeID: 100,
},
}
executingTasks[2] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
NodeID: 101,
},
}
s.handler.executingTasks = executingTasks
node := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
})
s.Equal(int64(101), node)
node = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
})
s.Equal(int64(NullNodeID), node)
}

Expand Down
5 changes: 4 additions & 1 deletion internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (t *clusteringCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}

func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}

func (t *clusteringCompactionTask) GetCollection() int64 {
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
}
Expand Down Expand Up @@ -205,7 +209,6 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
log.Warn("compact wrong, illegal compaction type")
return nil, merr.WrapErrIllegalCompactionPlan()
}
log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan()))
if !funcutil.CheckCtxValid(ctx) {
log.Warn("compact wrong, task context done or timeout")
return nil, ctx.Err()
Expand Down
1 change: 1 addition & 0 deletions internal/datanode/compaction/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ type Compactor interface {
GetPlanID() typeutil.UniqueID
GetCollection() typeutil.UniqueID
GetChannelName() string
GetCompactionType() datapb.CompactionType
}
32 changes: 28 additions & 4 deletions internal/datanode/compaction/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -37,7 +38,7 @@ const (

type Executor interface {
Start(ctx context.Context)
Execute(task Compactor)
Execute(task Compactor) (bool, error)
Slots() int64
RemoveTask(planID int64)
GetResults(planID int64) []*datapb.CompactionPlanResult
Expand Down Expand Up @@ -69,19 +70,42 @@ func NewExecutor() *executor {
}
}

func (e *executor) Execute(task Compactor) {
func (e *executor) Execute(task Compactor) (bool, error) {
clusteringTaskCnt := 0
e.executing.Range(func(key int64, comp Compactor) bool {
if comp.GetCompactionType() == datapb.CompactionType_ClusteringCompaction {
clusteringTaskCnt++
}
return true
})
if clusteringTaskCnt > 0 {
return false, merr.WrapErrTooManyClusteringCompaction()
}
_, ok := e.executing.GetOrInsert(task.GetPlanID(), task)
if ok {
log.Warn("duplicated compaction task",
zap.Int64("planID", task.GetPlanID()),
zap.String("channel", task.GetChannelName()))
return
return false, merr.WrapErrDuplicatedCompactionTask()
}
e.taskCh <- task
return true, nil
}

func (e *executor) Slots() int64 {
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len())
var usedSlot int64
e.executing.Range(func(key int64, comp Compactor) bool {
switch comp.GetCompactionType() {
case datapb.CompactionType_ClusteringCompaction:
usedSlot += paramtable.Get().DataNodeCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
usedSlot += paramtable.Get().DataNodeCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
usedSlot += paramtable.Get().DataNodeCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}
return true
})
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - usedSlot
}

func (e *executor) toCompleteState(task Compactor) {
Expand Down
1 change: 1 addition & 0 deletions internal/datanode/compaction/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestCompactionExecutor(t *testing.T) {
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(planID)
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
executor := NewExecutor()
executor.Execute(mockC)
executor.Execute(mockC)
Expand Down
4 changes: 4 additions & 0 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (t *LevelZeroCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}

func (t *LevelZeroCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}

func (t *LevelZeroCompactionTask) GetCollection() int64 {
// The length of SegmentBinlogs is checked before task enqueueing.
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
Expand Down
4 changes: 4 additions & 0 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (t *mixCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}

func (t *mixCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}

// return num rows of all segment compaction from
func (t *mixCompactionTask) getNumRows() int64 {
numRows := int64(0)
Expand Down
Loading

0 comments on commit 9b0e8db

Please sign in to comment.