From eefcccb5b789c8a838707d6202c05cdc527ff05d Mon Sep 17 00:00:00 2001 From: wayblink Date: Wed, 17 Jul 2024 14:20:08 +0800 Subject: [PATCH] enhance: Add compaction task slot usage logic Signed-off-by: wayblink --- configs/milvus.yaml | 7 +- internal/datacoord/compaction.go | 31 +++-- .../datacoord/compaction_task_clustering.go | 12 +- internal/datacoord/compaction_task_l0.go | 1 + internal/datacoord/compaction_task_mix.go | 1 + internal/datacoord/compaction_test.go | 62 +++++++++- internal/datacoord/compaction_trigger_test.go | 1 + .../compaction/clustering_compactor.go | 9 +- internal/datanode/compaction/compactor.go | 2 + internal/datanode/compaction/executor.go | 55 +++++++-- internal/datanode/compaction/executor_test.go | 70 +++++++++++- internal/datanode/compaction/l0_compactor.go | 8 ++ internal/datanode/compaction/mix_compactor.go | 8 ++ .../datanode/compaction/mock_compactor.go | 108 +++++++++++++++++- internal/datanode/services.go | 8 +- internal/datanode/services_test.go | 2 + internal/proto/data_coord.proto | 1 + pkg/util/merr/errors.go | 3 + pkg/util/merr/utils.go | 16 +++ pkg/util/paramtable/component_param.go | 38 +++++- pkg/util/paramtable/component_param_test.go | 11 +- 21 files changed, 422 insertions(+), 32 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e4cd6d1193dd6..d689bff328275 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -519,6 +519,10 @@ dataCoord: clientMaxSendSize: 268435456 clientMaxRecvSize: 536870912 syncSegmentsInterval: 300 + slot: + clusteringCompactionUsage: 16 + mixCompactionUsage: 8 + l0DeleteCompactionUsage: 8 dataNode: dataSync: @@ -567,10 +571,11 @@ dataNode: clientMaxSendSize: 268435456 clientMaxRecvSize: 536870912 slot: - slotCap: 2 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode. + slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode. clusteringCompaction: memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage. + workPoolSize: 8 # Configures the system log output. log: diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 40fe73e46f4ab..69a0357e6c6f7 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -163,7 +164,6 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) if t.GetTriggerID() == triggerID { cnt += 1 } - // if t.GetPlanID() } c.queueGuard.RUnlock() c.executingGuard.RLock() @@ -617,10 +617,10 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { } for _, t := range tasks { - nodeID := c.pickAnyNode(slots) + nodeID, useSlot := c.pickAnyNode(slots, t) if nodeID == NullNodeID { log.Info("compactionHandler cannot find datanode for compaction task", - zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel())) + zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel())) continue } err := t.SetNodeID(nodeID) @@ -628,6 +628,8 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { log.Info("compactionHandler assignNodeID failed", zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err)) } else { + // update the input nodeSlots + slots[nodeID] = slots[nodeID] - useSlot log.Info("compactionHandler assignNodeID success", zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID)) metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec() @@ -674,18 +676,27 @@ func (c *compactionPlanHandler) checkCompaction() error { return nil } -func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64) int64 { - var ( - nodeID int64 = NullNodeID - maxSlots int64 = -1 - ) +func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { + nodeID = NullNodeID + var maxSlots int64 = -1 + + switch task.GetType() { + case datapb.CompactionType_ClusteringCompaction: + useSlot = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_MixCompaction: + useSlot = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_Level0DeleteCompaction: + useSlot = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64() + } + for id, slots := range nodeSlots { - if slots > 0 && slots > maxSlots { + if slots >= useSlot && slots > maxSlots { nodeID = id maxSlots = slots } } - return nodeID + + return nodeID, useSlot } func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t CompactionTask) int64 { diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 2866f6dc2404f..19d7890528a80 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -151,12 +151,13 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP MaxSegmentRows: t.GetMaxSegmentRows(), PreferSegmentRows: t.GetPreferSegmentRows(), AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)), - AnalyzeSegmentIds: t.GetInputSegments(), // todo: if need + AnalyzeSegmentIds: t.GetInputSegments(), BeginLogID: beginLogID, PreAllocatedSegments: &datapb.IDRange{ Begin: t.GetResultSegments()[0], End: t.GetResultSegments()[1], }, + SlotUsage: Params.DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) @@ -416,8 +417,10 @@ func (t *clusteringCompactionTask) doAnalyze() error { func (t *clusteringCompactionTask) doCompact() error { log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) if t.NeedReAssignNodeID() { - return errors.New("not assign nodeID") + log.RatedWarn(10, "not assign nodeID") + return nil } + log = log.With(zap.Int64("nodeID", t.GetNodeID())) // todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird // check whether the compaction plan is already submitted considering @@ -446,6 +449,11 @@ func (t *clusteringCompactionTask) doCompact() error { } err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) if err != nil { + if errors.Is(err, merr.ErrDataNodeSlotExhausted) { + log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted") + t.updateAndSaveTaskMeta(setNodeID(NullNodeID)) + return nil + } log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return err diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index dd10181da9cf9..7bd7bc00fae45 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -246,6 +246,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err TotalRows: t.GetTotalRows(), Schema: t.GetSchema(), BeginLogID: beginLogID, + SlotUsage: Params.DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index c27cd2e35ab93..154d45a16b45b 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -347,6 +347,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er PreAllocatedSegments: &datapb.IDRange{ Begin: t.GetResultSegments()[0], }, + SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index fb31c7611b9a4..d40698377911a 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -343,15 +343,71 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { } func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { + s.SetupTest() + nodeSlots := map[int64]int64{ + 100: 16, + 101: 24, + } + node, useSlot := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(100), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{}) + s.Equal(int64(NullNodeID), node) +} + +func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() { s.SetupTest() nodeSlots := map[int64]int64{ 100: 2, - 101: 3, + 101: 16, + 102: 10, + } + executingTasks := make(map[int64]CompactionTask, 0) + executingTasks[1] = &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + } + executingTasks[2] = &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, } - node := s.handler.pickAnyNode(nodeSlots) + s.handler.executingTasks = executingTasks + node, useSlot := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + }) s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot - node = s.handler.pickAnyNode(map[int64]int64{}) + node, useSlot = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + }) s.Equal(int64(NullNodeID), node) } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index cf0ce1eab9c4a..11cc20df5c28e 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -483,6 +483,7 @@ func Test_compactionTrigger_force(t *testing.T) { TotalRows: 200, Schema: schema, PreAllocatedSegments: &datapb.IDRange{Begin: 100}, + SlotUsage: 8, }, }, }, diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index f348f7ba21398..250559d38fbfc 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -169,6 +169,10 @@ func (t *clusteringCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + func (t *clusteringCompactionTask) GetCollection() int64 { return t.plan.GetSegmentBinlogs()[0].GetCollectionID() } @@ -209,7 +213,6 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro log.Warn("compact wrong, illegal compaction type") return nil, merr.WrapErrIllegalCompactionPlan() } - log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan())) if !funcutil.CheckCtxValid(ctx) { log.Warn("compact wrong, task context done or timeout") return nil, ctx.Err() @@ -1158,3 +1161,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b buffer.writer = writer return pack, nil } + +func (t *clusteringCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/compactor.go b/internal/datanode/compaction/compactor.go index 825723a98fd52..6d929bd30af9a 100644 --- a/internal/datanode/compaction/compactor.go +++ b/internal/datanode/compaction/compactor.go @@ -29,4 +29,6 @@ type Compactor interface { GetPlanID() typeutil.UniqueID GetCollection() typeutil.UniqueID GetChannelName() string + GetCompactionType() datapb.CompactionType + GetSlotUsage() int64 } diff --git a/internal/datanode/compaction/executor.go b/internal/datanode/compaction/executor.go index 167fc03acaaae..3caa27ef1d561 100644 --- a/internal/datanode/compaction/executor.go +++ b/internal/datanode/compaction/executor.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -37,7 +38,7 @@ const ( type Executor interface { Start(ctx context.Context) - Execute(task Compactor) + Execute(task Compactor) (bool, error) Slots() int64 RemoveTask(planID int64) GetResults(planID int64) []*datapb.CompactionPlanResult @@ -50,8 +51,10 @@ type executor struct { completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult taskCh chan Compactor - taskSem *semaphore.Weighted + taskSem *semaphore.Weighted // todo remove this, unify with slot logic dropped *typeutil.ConcurrentSet[string] // vchannel dropped + usingSlots int64 + slotMu sync.RWMutex // To prevent concurrency of release channel and compaction get results // all released channel's compaction tasks will be discarded @@ -66,27 +69,65 @@ func NewExecutor() *executor { taskCh: make(chan Compactor, maxTaskQueueNum), taskSem: semaphore.NewWeighted(maxParallelTaskNum), dropped: typeutil.NewConcurrentSet[string](), + usingSlots: 0, } } -func (e *executor) Execute(task Compactor) { +func (e *executor) Execute(task Compactor) (bool, error) { + e.slotMu.Lock() + defer e.slotMu.Unlock() + if paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64()-e.usingSlots >= task.GetSlotUsage() { + newSlotUsage := task.GetSlotUsage() + // compatible for old datacoord or unexpected request + if task.GetSlotUsage() <= 0 { + switch task.GetCompactionType() { + case datapb.CompactionType_ClusteringCompaction: + newSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_MixCompaction: + newSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_Level0DeleteCompaction: + newSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64() + } + log.Warn("illegal task slot usage, change it to a default value", zap.Int64("illegalSlotUsage", task.GetSlotUsage()), zap.Int64("newSlotUsage", newSlotUsage)) + } + e.usingSlots = e.usingSlots + newSlotUsage + } else { + return false, merr.WrapErrDataNodeSlotExhausted() + } _, ok := e.executing.GetOrInsert(task.GetPlanID(), task) if ok { log.Warn("duplicated compaction task", zap.Int64("planID", task.GetPlanID()), zap.String("channel", task.GetChannelName())) - return + return false, merr.WrapErrDuplicatedCompactionTask() } e.taskCh <- task + return true, nil } func (e *executor) Slots() int64 { - return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len()) + return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - e.getUsingSlots() +} + +func (e *executor) getUsingSlots() int64 { + e.slotMu.RLock() + defer e.slotMu.RUnlock() + return e.usingSlots } func (e *executor) toCompleteState(task Compactor) { task.Complete() - e.executing.GetAndRemove(task.GetPlanID()) + e.getAndRemoveExecuting(task.GetPlanID()) +} + +func (e *executor) getAndRemoveExecuting(planID typeutil.UniqueID) (Compactor, bool) { + task, ok := e.executing.GetAndRemove(planID) + if ok { + e.slotMu.Lock() + e.usingSlots = e.usingSlots - task.GetSlotUsage() + e.slotMu.Unlock() + } + return task, ok } func (e *executor) RemoveTask(planID int64) { @@ -140,7 +181,7 @@ func (e *executor) executeTask(task Compactor) { } func (e *executor) stopTask(planID int64) { - task, loaded := e.executing.GetAndRemove(planID) + task, loaded := e.getAndRemoveExecuting(planID) if loaded { log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName())) task.Stop() diff --git a/internal/datanode/compaction/executor_test.go b/internal/datanode/compaction/executor_test.go index 81b64556dafe9..dc491d3afd448 100644 --- a/internal/datanode/compaction/executor_test.go +++ b/internal/datanode/compaction/executor_test.go @@ -25,17 +25,82 @@ import ( "github.com/stretchr/testify/require" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestCompactionExecutor(t *testing.T) { t.Run("Test execute", func(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) planID := int64(1) mockC := NewMockCompactor(t) mockC.EXPECT().GetPlanID().Return(planID) mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetSlotUsage().Return(8) executor := NewExecutor() - executor.Execute(mockC) - executor.Execute(mockC) + succeed, err := executor.Execute(mockC) + assert.Equal(t, true, succeed) + assert.NoError(t, err) + assert.EqualValues(t, 1, len(executor.taskCh)) + assert.EqualValues(t, 1, executor.executing.Len()) + + mockC.EXPECT().Stop().Return().Once() + executor.stopTask(planID) + }) + + t.Run("Test deplicate execute", func(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) + planID := int64(1) + mockC := NewMockCompactor(t) + mockC.EXPECT().GetPlanID().Return(planID) + mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetSlotUsage().Return(8) + executor := NewExecutor() + succeed, err := executor.Execute(mockC) + assert.Equal(t, true, succeed) + assert.NoError(t, err) + + succeed2, err2 := executor.Execute(mockC) + assert.Equal(t, false, succeed2) + assert.Error(t, err2) + assert.True(t, errors.Is(err2, merr.ErrDuplicatedCompactionTask)) + + assert.EqualValues(t, 1, len(executor.taskCh)) + assert.EqualValues(t, 1, executor.executing.Len()) + + mockC.EXPECT().Stop().Return().Once() + executor.stopTask(planID) + }) + + t.Run("Test execute task slot usage larger than free slop", func(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) + mockC := NewMockCompactor(t) + mockC.EXPECT().GetSlotUsage().Return(100) + executor := NewExecutor() + + succeed, err := executor.Execute(mockC) + assert.Equal(t, false, succeed) + assert.True(t, errors.Is(err, merr.ErrDataNodeSlotExhausted)) + + assert.EqualValues(t, 0, len(executor.taskCh)) + assert.EqualValues(t, 0, executor.executing.Len()) + }) + + t.Run("Test execute task with slot=0", func(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) + planID := int64(1) + mockC := NewMockCompactor(t) + mockC.EXPECT().GetPlanID().Return(planID) + mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) + mockC.EXPECT().GetSlotUsage().Return(0) + executor := NewExecutor() + + succeed, err := executor.Execute(mockC) + assert.Equal(t, true, succeed) + assert.NoError(t, err) + assert.Equal(t, int64(8), executor.Slots()) + assert.Equal(t, int64(8), executor.usingSlots) assert.EqualValues(t, 1, len(executor.taskCh)) assert.EqualValues(t, 1, executor.executing.Len()) @@ -115,6 +180,7 @@ func TestCompactionExecutor(t *testing.T) { mc.EXPECT().GetPlanID().Return(int64(1)) mc.EXPECT().GetChannelName().Return("mock") mc.EXPECT().Compact().Return(&datapb.CompactionPlanResult{PlanID: 1}, nil).Maybe() + mc.EXPECT().GetSlotUsage().Return(8) mc.EXPECT().Stop().Return().Once() ex.Execute(mc) diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index 8fc62ad52c57f..e42239f050d86 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -98,6 +98,10 @@ func (t *LevelZeroCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *LevelZeroCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + func (t *LevelZeroCompactionTask) GetCollection() int64 { // The length of SegmentBinlogs is checked before task enqueueing. return t.plan.GetSegmentBinlogs()[0].GetCollectionID() @@ -434,3 +438,7 @@ func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []* err := conc.AwaitAll(futures...) return bfs, err } + +func (t *LevelZeroCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index f22072c7673bd..657b0bb55a1dc 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -95,6 +95,10 @@ func (t *mixCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *mixCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + // return num rows of all segment compaction from func (t *mixCompactionTask) getNumRows() int64 { numRows := int64(0) @@ -388,3 +392,7 @@ func (t *mixCompactionTask) isExpiredEntity(ts typeutil.Timestamp) bool { return entityT.Add(time.Duration(t.plan.GetCollectionTtl())).Before(nowT) } + +func (t *mixCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/mock_compactor.go b/internal/datanode/compaction/mock_compactor.go index 19a83bf2e1b9d..073a25dac8e25 100644 --- a/internal/datanode/compaction/mock_compactor.go +++ b/internal/datanode/compaction/mock_compactor.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package compaction @@ -24,6 +24,10 @@ func (_m *MockCompactor) EXPECT() *MockCompactor_Expecter { func (_m *MockCompactor) Compact() (*datapb.CompactionPlanResult, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Compact") + } + var r0 *datapb.CompactionPlanResult var r1 error if rf, ok := ret.Get(0).(func() (*datapb.CompactionPlanResult, error)); ok { @@ -109,6 +113,10 @@ func (_c *MockCompactor_Complete_Call) RunAndReturn(run func()) *MockCompactor_C func (_m *MockCompactor) GetChannelName() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetChannelName") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -150,6 +158,10 @@ func (_c *MockCompactor_GetChannelName_Call) RunAndReturn(run func() string) *Mo func (_m *MockCompactor) GetCollection() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCollection") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -187,10 +199,59 @@ func (_c *MockCompactor_GetCollection_Call) RunAndReturn(run func() int64) *Mock return _c } +// GetCompactionType provides a mock function with given fields: +func (_m *MockCompactor) GetCompactionType() datapb.CompactionType { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetCompactionType") + } + + var r0 datapb.CompactionType + if rf, ok := ret.Get(0).(func() datapb.CompactionType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(datapb.CompactionType) + } + + return r0 +} + +// MockCompactor_GetCompactionType_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCompactionType' +type MockCompactor_GetCompactionType_Call struct { + *mock.Call +} + +// GetCompactionType is a helper method to define mock.On call +func (_e *MockCompactor_Expecter) GetCompactionType() *MockCompactor_GetCompactionType_Call { + return &MockCompactor_GetCompactionType_Call{Call: _e.mock.On("GetCompactionType")} +} + +func (_c *MockCompactor_GetCompactionType_Call) Run(run func()) *MockCompactor_GetCompactionType_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactor_GetCompactionType_Call) Return(_a0 datapb.CompactionType) *MockCompactor_GetCompactionType_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactor_GetCompactionType_Call) RunAndReturn(run func() datapb.CompactionType) *MockCompactor_GetCompactionType_Call { + _c.Call.Return(run) + return _c +} + // GetPlanID provides a mock function with given fields: func (_m *MockCompactor) GetPlanID() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetPlanID") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -228,6 +289,51 @@ func (_c *MockCompactor_GetPlanID_Call) RunAndReturn(run func() int64) *MockComp return _c } +// GetSlotUsage provides a mock function with given fields: +func (_m *MockCompactor) GetSlotUsage() int64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetSlotUsage") + } + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// MockCompactor_GetSlotUsage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSlotUsage' +type MockCompactor_GetSlotUsage_Call struct { + *mock.Call +} + +// GetSlotUsage is a helper method to define mock.On call +func (_e *MockCompactor_Expecter) GetSlotUsage() *MockCompactor_GetSlotUsage_Call { + return &MockCompactor_GetSlotUsage_Call{Call: _e.mock.On("GetSlotUsage")} +} + +func (_c *MockCompactor_GetSlotUsage_Call) Run(run func()) *MockCompactor_GetSlotUsage_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactor_GetSlotUsage_Call) Return(_a0 int64) *MockCompactor_GetSlotUsage_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactor_GetSlotUsage_Call) RunAndReturn(run func() int64) *MockCompactor_GetSlotUsage_Call { + _c.Call.Return(run) + return _c +} + // Stop provides a mock function with given fields: func (_m *MockCompactor) Stop() { _m.Called() diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 5263a37acf01d..a160c849aab1d 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -256,8 +256,12 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil } - node.compactionExecutor.Execute(task) - return merr.Success(), nil + succeed, err := node.compactionExecutor.Execute(task) + if succeed { + return merr.Success(), nil + } else { + return merr.Status(err), nil + } } // GetCompactionState called by DataCoord diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 59d823cfcdf3b..881d915a8cca9 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -171,6 +171,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { mockC.EXPECT().GetPlanID().Return(int64(1)) mockC.EXPECT().GetCollection().Return(collection) mockC.EXPECT().GetChannelName().Return(channel) + mockC.EXPECT().GetSlotUsage().Return(8) mockC.EXPECT().Complete().Return() mockC.EXPECT().Compact().Return(&datapb.CompactionPlanResult{ PlanID: 1, @@ -182,6 +183,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { mockC2.EXPECT().GetPlanID().Return(int64(2)) mockC2.EXPECT().GetCollection().Return(collection) mockC2.EXPECT().GetChannelName().Return(channel) + mockC2.EXPECT().GetSlotUsage().Return(8) mockC2.EXPECT().Complete().Return() mockC2.EXPECT().Compact().Return(&datapb.CompactionPlanResult{ PlanID: 2, diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 97573f05a7262..e2b5afb4eaea1 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -562,6 +562,7 @@ message CompactionPlan { int32 state = 16; int64 begin_logID = 17; IDRange pre_allocated_segments = 18; // only for clustering compaction + int64 slot_usage = 19; } message CompactionSegment { diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index c5433a5881fd3..d31fdc8044fbf 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -207,6 +207,9 @@ var ( ErrBuildCompactionRequestFail = newMilvusError("fail to build CompactionRequest", 2312, true) ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true) ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false) + ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false) + + ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false) // General ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index ad074c120c72a..1c84e51d02976 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1183,3 +1183,19 @@ func WrapErrCompactionResult(msg ...string) error { } return err } + +func WrapErrDataNodeSlotExhausted(msg ...string) error { + err := error(ErrDataNodeSlotExhausted) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + +func WrapErrDuplicatedCompactionTask(msg ...string) error { + err := error(ErrDuplicatedCompactionTask) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 80703b478928c..af755bef802f1 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2958,6 +2958,10 @@ type dataCoordConfig struct { WaitForIndex ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` + + ClusteringCompactionSlotUsage ParamItem `refreshable:"true"` + MixCompactionSlotUsage ParamItem `refreshable:"true"` + L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -3716,6 +3720,36 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: true, } p.GracefulStopTimeout.Init(base.mgr) + + p.ClusteringCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.clusteringCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of clustering compaction job.", + DefaultValue: "16", + PanicIfEmpty: false, + Export: true, + } + p.ClusteringCompactionSlotUsage.Init(base.mgr) + + p.MixCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.mixCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of mix compaction job.", + DefaultValue: "8", + PanicIfEmpty: false, + Export: true, + } + p.MixCompactionSlotUsage.Init(base.mgr) + + p.L0DeleteCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.l0DeleteCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of l0 compaction job.", + DefaultValue: "8", + PanicIfEmpty: false, + Export: true, + } + p.L0DeleteCompactionSlotUsage.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -4095,7 +4129,7 @@ if this parameter <= 0, will set it as 10`, p.SlotCap = ParamItem{ Key: "dataNode.slot.slotCap", Version: "2.4.2", - DefaultValue: "2", + DefaultValue: "16", Doc: "The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode", Export: true, } @@ -4115,7 +4149,7 @@ if this parameter <= 0, will set it as 10`, Key: "dataNode.clusteringCompaction.workPoolSize", Version: "2.4.6", Doc: "worker pool size for one clustering compaction job.", - DefaultValue: "1", + DefaultValue: "8", PanicIfEmpty: false, Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 6f0131fbff87c..96bb625ff311c 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -467,6 +467,12 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize()) params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m") assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionPreferSegmentSize.GetAsSize()) + params.Save("dataCoord.slot.clusteringCompactionUsage", "10") + assert.Equal(t, 10, Params.ClusteringCompactionSlotUsage.GetAsInt()) + params.Save("dataCoord.slot.mixCompactionUsage", "5") + assert.Equal(t, 5, Params.MixCompactionSlotUsage.GetAsInt()) + params.Save("dataCoord.slot.l0DeleteCompactionUsage", "4") + assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt()) }) t.Run("test dataNodeConfig", func(t *testing.T) { @@ -519,10 +525,13 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) - assert.Equal(t, 2, Params.SlotCap.GetAsInt()) + assert.Equal(t, 16, Params.SlotCap.GetAsInt()) + // clustering compaction params.Save("datanode.clusteringCompaction.memoryBufferRatio", "0.1") assert.Equal(t, 0.1, Params.ClusteringCompactionMemoryBufferRatio.GetAsFloat()) + params.Save("datanode.clusteringCompaction.workPoolSize", "2") + assert.Equal(t, int64(2), Params.ClusteringCompactionWorkerPoolSize.GetAsInt64()) assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) })