Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Add compaction task slot usage logic #34581

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,10 @@ dataCoord:
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
syncSegmentsInterval: 300
slot:
clusteringCompactionUsage: 16
mixCompactionUsage: 8
l0DeleteCompactionUsage: 8

dataNode:
dataSync:
Expand Down Expand Up @@ -567,10 +571,11 @@ dataNode:
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
slot:
slotCap: 2 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.

clusteringCompaction:
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.
workPoolSize: 8

# Configures the system log output.
log:
Expand Down
31 changes: 21 additions & 10 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -163,7 +164,6 @@
if t.GetTriggerID() == triggerID {
cnt += 1
}
// if t.GetPlanID()
}
c.queueGuard.RUnlock()
c.executingGuard.RLock()
Expand Down Expand Up @@ -617,17 +617,19 @@
}

for _, t := range tasks {
nodeID := c.pickAnyNode(slots)
nodeID, useSlot := c.pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()))
zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel()))

Check warning on line 623 in internal/datacoord/compaction.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction.go#L623

Added line #L623 was not covered by tests
continue
}
err := t.SetNodeID(nodeID)
if err != nil {
log.Info("compactionHandler assignNodeID failed",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err))
} else {
// update the input nodeSlots
slots[nodeID] = slots[nodeID] - useSlot
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID))
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec()
Expand Down Expand Up @@ -674,18 +676,27 @@
return nil
}

func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64) int64 {
var (
nodeID int64 = NullNodeID
maxSlots int64 = -1
)
func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1

switch task.GetType() {
case datapb.CompactionType_ClusteringCompaction:
useSlot = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
useSlot = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
useSlot = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}

for id, slots := range nodeSlots {
if slots > 0 && slots > maxSlots {
if slots >= useSlot && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
return nodeID

return nodeID, useSlot
}

func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t CompactionTask) int64 {
Expand Down
12 changes: 10 additions & 2 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,13 @@
MaxSegmentRows: t.GetMaxSegmentRows(),
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(), // todo: if need
AnalyzeSegmentIds: t.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegments: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
SlotUsage: Params.DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down Expand Up @@ -416,8 +417,10 @@
func (t *clusteringCompactionTask) doCompact() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
if t.NeedReAssignNodeID() {
return errors.New("not assign nodeID")
log.RatedWarn(10, "not assign nodeID")
return nil

Check warning on line 421 in internal/datacoord/compaction_task_clustering.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_clustering.go#L420-L421

Added lines #L420 - L421 were not covered by tests
}
log = log.With(zap.Int64("nodeID", t.GetNodeID()))

// todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird
// check whether the compaction plan is already submitted considering
Expand Down Expand Up @@ -446,6 +449,11 @@
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
if errors.Is(err, merr.ErrDataNodeSlotExhausted) {
log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted")
t.updateAndSaveTaskMeta(setNodeID(NullNodeID))
return nil

Check warning on line 455 in internal/datacoord/compaction_task_clustering.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/compaction_task_clustering.go#L453-L455

Added lines #L453 - L455 were not covered by tests
}
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return err
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
SlotUsage: Params.DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
}

log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
PreAllocatedSegments: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
},
SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down
62 changes: 59 additions & 3 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,71 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
}

func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 16,
101: 24,
}
node, useSlot := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(100), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
s.Equal(int64(NullNodeID), node)
}

func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 2,
101: 3,
101: 16,
102: 10,
}
executingTasks := make(map[int64]CompactionTask, 0)
executingTasks[1] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
}
executingTasks[2] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
}
node := s.handler.pickAnyNode(nodeSlots)
s.handler.executingTasks = executingTasks
node, useSlot := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node = s.handler.pickAnyNode(map[int64]int64{})
node, useSlot = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
})
s.Equal(int64(NullNodeID), node)
}

Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func Test_compactionTrigger_force(t *testing.T) {
TotalRows: 200,
Schema: schema,
PreAllocatedSegments: &datapb.IDRange{Begin: 100},
SlotUsage: 8,
},
},
},
Expand Down
9 changes: 8 additions & 1 deletion internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func (t *clusteringCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}

func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}

func (t *clusteringCompactionTask) GetCollection() int64 {
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
}
Expand Down Expand Up @@ -209,7 +213,6 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
log.Warn("compact wrong, illegal compaction type")
return nil, merr.WrapErrIllegalCompactionPlan()
}
log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan()))
if !funcutil.CheckCtxValid(ctx) {
log.Warn("compact wrong, task context done or timeout")
return nil, ctx.Err()
Expand Down Expand Up @@ -1158,3 +1161,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b
buffer.writer = writer
return pack, nil
}

func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.plan.GetSlotUsage()
}
2 changes: 2 additions & 0 deletions internal/datanode/compaction/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ type Compactor interface {
GetPlanID() typeutil.UniqueID
GetCollection() typeutil.UniqueID
GetChannelName() string
GetCompactionType() datapb.CompactionType
GetSlotUsage() int64
}
55 changes: 48 additions & 7 deletions internal/datanode/compaction/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -37,7 +38,7 @@

type Executor interface {
Start(ctx context.Context)
Execute(task Compactor)
Execute(task Compactor) (bool, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if slots are used up. return a DataNodeSlotExhausted error

Slots() int64
RemoveTask(planID int64)
GetResults(planID int64) []*datapb.CompactionPlanResult
Expand All @@ -50,8 +51,10 @@
completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult
taskCh chan Compactor
taskSem *semaphore.Weighted
taskSem *semaphore.Weighted // todo remove this, unify with slot logic
dropped *typeutil.ConcurrentSet[string] // vchannel dropped
usingSlots int64
slotMu sync.RWMutex

// To prevent concurrency of release channel and compaction get results
// all released channel's compaction tasks will be discarded
Expand All @@ -66,27 +69,65 @@
taskCh: make(chan Compactor, maxTaskQueueNum),
taskSem: semaphore.NewWeighted(maxParallelTaskNum),
dropped: typeutil.NewConcurrentSet[string](),
usingSlots: 0,
}
}

func (e *executor) Execute(task Compactor) {
func (e *executor) Execute(task Compactor) (bool, error) {
e.slotMu.Lock()
defer e.slotMu.Unlock()
if paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64()-e.usingSlots >= task.GetSlotUsage() {
newSlotUsage := task.GetSlotUsage()
// compatible for old datacoord or unexpected request
if task.GetSlotUsage() <= 0 {
switch task.GetCompactionType() {
case datapb.CompactionType_ClusteringCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()

Check warning on line 89 in internal/datanode/compaction/executor.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/compaction/executor.go#L88-L89

Added lines #L88 - L89 were not covered by tests
}
log.Warn("illegal task slot usage, change it to a default value", zap.Int64("illegalSlotUsage", task.GetSlotUsage()), zap.Int64("newSlotUsage", newSlotUsage))
}
e.usingSlots = e.usingSlots + newSlotUsage
} else {
return false, merr.WrapErrDataNodeSlotExhausted()
}
_, ok := e.executing.GetOrInsert(task.GetPlanID(), task)
if ok {
log.Warn("duplicated compaction task",
zap.Int64("planID", task.GetPlanID()),
zap.String("channel", task.GetChannelName()))
return
return false, merr.WrapErrDuplicatedCompactionTask()
}
e.taskCh <- task
return true, nil
}

func (e *executor) Slots() int64 {
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len())
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - e.getUsingSlots()
}

func (e *executor) getUsingSlots() int64 {
e.slotMu.RLock()
defer e.slotMu.RUnlock()
return e.usingSlots
}

func (e *executor) toCompleteState(task Compactor) {
task.Complete()
e.executing.GetAndRemove(task.GetPlanID())
e.getAndRemoveExecuting(task.GetPlanID())
}

func (e *executor) getAndRemoveExecuting(planID typeutil.UniqueID) (Compactor, bool) {
task, ok := e.executing.GetAndRemove(planID)
if ok {
e.slotMu.Lock()
e.usingSlots = e.usingSlots - task.GetSlotUsage()
e.slotMu.Unlock()
}
return task, ok
}

func (e *executor) RemoveTask(planID int64) {
Expand Down Expand Up @@ -140,7 +181,7 @@
}

func (e *executor) stopTask(planID int64) {
task, loaded := e.executing.GetAndRemove(planID)
task, loaded := e.getAndRemoveExecuting(planID)
if loaded {
log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName()))
task.Stop()
Expand Down
Loading
Loading