Skip to content

Commit

Permalink
Add compaction task slot usage logic
Browse files Browse the repository at this point in the history
Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink committed Jul 15, 2024
1 parent 8b5754f commit afc4110
Show file tree
Hide file tree
Showing 20 changed files with 327 additions and 26 deletions.
7 changes: 6 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,10 @@ dataCoord:
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
syncSegmentsInterval: 300
slot:
clusteringCompactionUsage: 16
mixCompactionUsage: 8
l0DeleteCompactionUsage: 8

dataNode:
dataSync:
Expand Down Expand Up @@ -567,10 +571,11 @@ dataNode:
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
slot:
slotCap: 2 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.

clusteringCompaction:
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.
workerPoolSize: 8

# Configures the system log output.
log:
Expand Down
27 changes: 20 additions & 7 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -163,7 +164,6 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
if t.GetTriggerID() == triggerID {
cnt += 1
}
// if t.GetPlanID()
}
c.queueGuard.RUnlock()
c.executingGuard.RLock()
Expand Down Expand Up @@ -606,10 +606,10 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
}

for _, t := range tasks {
nodeID := c.pickAnyNode(slots)
nodeID := c.pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()))
zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel()))
continue
}
err := t.SetNodeID(nodeID)
Expand Down Expand Up @@ -663,17 +663,30 @@ func (c *compactionPlanHandler) checkCompaction() error {
return nil
}

func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64) int64 {
func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) int64 {
var (
nodeID int64 = NullNodeID
maxSlots int64 = -1
nodeID int64 = NullNodeID
maxSlots int64 = -1
acquireSlot int64 = 0
)

switch task.GetType() {
case datapb.CompactionType_ClusteringCompaction:
acquireSlot = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
acquireSlot = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
acquireSlot = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}

for id, slots := range nodeSlots {
if slots > 0 && slots > maxSlots {
if slots >= acquireSlot && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
// update the input nodeSlots
nodeSlots[nodeID] = nodeSlots[nodeID] - acquireSlot
return nodeID
}

Expand Down
12 changes: 10 additions & 2 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(), // todo: if need
SlotUsage: Params.DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down Expand Up @@ -406,8 +407,10 @@ func (t *clusteringCompactionTask) doAnalyze() error {
func (t *clusteringCompactionTask) doCompact() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
if t.NeedReAssignNodeID() {
return errors.New("not assign nodeID")
log.RatedWarn(10, "not assign nodeID")
return nil
}
log = log.With(zap.Int64("nodeID", t.GetNodeID()))

// todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird
// check whether the compaction plan is already submitted considering
Expand Down Expand Up @@ -436,6 +439,11 @@ func (t *clusteringCompactionTask) doCompact() error {
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
if errors.Is(err, merr.ErrDataNodeSlotExhausted) {
log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted")
t.updateAndSaveTaskMeta(setNodeID(NullNodeID))
return nil
}
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return err
Expand Down Expand Up @@ -558,7 +566,7 @@ func (t *clusteringCompactionTask) GetLabel() string {
}

func (t *clusteringCompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0
return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID)
}

func (t *clusteringCompactionTask) CleanLogPath() {
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
SlotUsage: Params.DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
}

log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down
54 changes: 52 additions & 2 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,64 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {

func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.SetupTest()
Params.Save(Params.DataCoordCfg.MixCompactionSlotUsage.Key, "1")
nodeSlots := map[int64]int64{
100: 2,
101: 3,
}
node := s.handler.pickAnyNode(nodeSlots)
node := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(101), node)
node = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(100), node)
node = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(101), node)

node = s.handler.pickAnyNode(map[int64]int64{})
node = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
s.Equal(int64(NullNodeID), node)
}

func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 2,
101: 16,
102: 10,
}
executingTasks := make(map[int64]CompactionTask, 0)
executingTasks[1] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
}
executingTasks[2] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
}
s.handler.executingTasks = executingTasks
node := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
})
s.Equal(int64(101), node)
node = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
})
s.Equal(int64(NullNodeID), node)
}

Expand Down
9 changes: 8 additions & 1 deletion internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (t *clusteringCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}

func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}

func (t *clusteringCompactionTask) GetCollection() int64 {
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
}
Expand Down Expand Up @@ -205,7 +209,6 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
log.Warn("compact wrong, illegal compaction type")
return nil, merr.WrapErrIllegalCompactionPlan()
}
log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan()))
if !funcutil.CheckCtxValid(ctx) {
log.Warn("compact wrong, task context done or timeout")
return nil, ctx.Err()
Expand Down Expand Up @@ -1145,3 +1148,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b
buffer.writer = writer
return pack, nil
}

func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.plan.GetSlotUsage()
}
2 changes: 2 additions & 0 deletions internal/datanode/compaction/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ type Compactor interface {
GetPlanID() typeutil.UniqueID
GetCollection() typeutil.UniqueID
GetChannelName() string
GetCompactionType() datapb.CompactionType
GetSlotUsage() int64
}
37 changes: 30 additions & 7 deletions internal/datanode/compaction/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"sync"

"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -37,7 +39,7 @@ const (

type Executor interface {
Start(ctx context.Context)
Execute(task Compactor)
Execute(task Compactor) (bool, error)
Slots() int64
RemoveTask(planID int64)
GetResults(planID int64) []*datapb.CompactionPlanResult
Expand All @@ -50,8 +52,10 @@ type executor struct {
completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult
taskCh chan Compactor
taskSem *semaphore.Weighted
taskSem *semaphore.Weighted // todo remove this, unify with slot logic
dropped *typeutil.ConcurrentSet[string] // vchannel dropped
usingSlots *atomic.Int64
slotMu sync.Mutex

// To prevent concurrency of release channel and compaction get results
// all released channel's compaction tasks will be discarded
Expand All @@ -66,27 +70,46 @@ func NewExecutor() *executor {
taskCh: make(chan Compactor, maxTaskQueueNum),
taskSem: semaphore.NewWeighted(maxParallelTaskNum),
dropped: typeutil.NewConcurrentSet[string](),
usingSlots: atomic.NewInt64(0),
}
}

func (e *executor) Execute(task Compactor) {
func (e *executor) Execute(task Compactor) (bool, error) {
e.slotMu.Lock()
defer e.slotMu.Unlock()
if e.Slots() >= task.GetSlotUsage() {
e.usingSlots.Add(task.GetSlotUsage())
} else {
return false, merr.WrapErrDataNodeSlotExhausted()
}
_, ok := e.executing.GetOrInsert(task.GetPlanID(), task)
if ok {
log.Warn("duplicated compaction task",
zap.Int64("planID", task.GetPlanID()),
zap.String("channel", task.GetChannelName()))
return
return false, merr.WrapErrDuplicatedCompactionTask()
}
e.taskCh <- task
return true, nil
}

func (e *executor) Slots() int64 {
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len())
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - e.usingSlots.Load()
}

func (e *executor) toCompleteState(task Compactor) {
task.Complete()
e.executing.GetAndRemove(task.GetPlanID())
e.getAndRemoveExecuting(task.GetPlanID())
}

func (e *executor) getAndRemoveExecuting(planID typeutil.UniqueID) (Compactor, bool) {
task, ok := e.executing.GetAndRemove(planID)
if ok {
e.slotMu.Lock()
e.usingSlots.Sub(task.GetSlotUsage())
e.slotMu.Unlock()
}
return task, ok
}

func (e *executor) RemoveTask(planID int64) {
Expand Down Expand Up @@ -140,7 +163,7 @@ func (e *executor) executeTask(task Compactor) {
}

func (e *executor) stopTask(planID int64) {
task, loaded := e.executing.GetAndRemove(planID)
task, loaded := e.getAndRemoveExecuting(planID)
if loaded {
log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName()))
task.Stop()
Expand Down
1 change: 1 addition & 0 deletions internal/datanode/compaction/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestCompactionExecutor(t *testing.T) {
mockC := NewMockCompactor(t)
mockC.EXPECT().GetPlanID().Return(planID)
mockC.EXPECT().GetChannelName().Return("ch1")
mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction)
executor := NewExecutor()
executor.Execute(mockC)
executor.Execute(mockC)
Expand Down
8 changes: 8 additions & 0 deletions internal/datanode/compaction/l0_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (t *LevelZeroCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}

func (t *LevelZeroCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}

func (t *LevelZeroCompactionTask) GetCollection() int64 {
// The length of SegmentBinlogs is checked before task enqueueing.
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
Expand Down Expand Up @@ -434,3 +438,7 @@ func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []*
err := conc.AwaitAll(futures...)
return bfs, err
}

func (t *LevelZeroCompactionTask) GetSlotUsage() int64 {
return t.plan.GetSlotUsage()
}
8 changes: 8 additions & 0 deletions internal/datanode/compaction/mix_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func (t *mixCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}

func (t *mixCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}

// return num rows of all segment compaction from
func (t *mixCompactionTask) getNumRows() int64 {
numRows := int64(0)
Expand Down Expand Up @@ -392,3 +396,7 @@ func (t *mixCompactionTask) isExpiredEntity(ts typeutil.Timestamp) bool {

return entityT.Add(time.Duration(t.plan.GetCollectionTtl())).Before(nowT)
}

func (t *mixCompactionTask) GetSlotUsage() int64 {
return t.plan.GetSlotUsage()
}
Loading

0 comments on commit afc4110

Please sign in to comment.