diff --git a/configs/milvus.yaml b/configs/milvus.yaml index e4cd6d1193dd6..4124d5c36db6c 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -519,6 +519,10 @@ dataCoord: clientMaxSendSize: 268435456 clientMaxRecvSize: 536870912 syncSegmentsInterval: 300 + slot: + clusteringCompactionUsage: 16 + mixCompactionUsage: 8 + l0DeleteCompactionUsage: 8 dataNode: dataSync: @@ -567,10 +571,11 @@ dataNode: clientMaxSendSize: 268435456 clientMaxRecvSize: 536870912 slot: - slotCap: 2 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode. + slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode. clusteringCompaction: memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage. + workerPoolSize: 8 # Configures the system log output. log: diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index c4ae60b117ba8..a7d685e5ed1f3 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -163,7 +164,6 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) if t.GetTriggerID() == triggerID { cnt += 1 } - // if t.GetPlanID() } c.queueGuard.RUnlock() c.executingGuard.RLock() @@ -606,10 +606,10 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { } for _, t := range tasks { - nodeID := c.pickAnyNode(slots) + nodeID, useSlot := c.pickAnyNode(slots, t) if nodeID == NullNodeID { log.Info("cannot find datanode for compaction task", - zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel())) + zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel())) continue } err := t.SetNodeID(nodeID) @@ -617,6 +617,8 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { log.Info("compactionHandler assignNodeID failed", zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err)) } else { + // update the input nodeSlots + slots[nodeID] = slots[nodeID] - useSlot log.Info("compactionHandler assignNodeID success", zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID)) metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec() @@ -663,18 +665,27 @@ func (c *compactionPlanHandler) checkCompaction() error { return nil } -func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64) int64 { - var ( - nodeID int64 = NullNodeID - maxSlots int64 = -1 - ) +func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { + nodeID = NullNodeID + var maxSlots int64 = -1 + + switch task.GetType() { + case datapb.CompactionType_ClusteringCompaction: + useSlot = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_MixCompaction: + useSlot = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_Level0DeleteCompaction: + useSlot = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64() + } + for id, slots := range nodeSlots { - if slots > 0 && slots > maxSlots { + if slots >= useSlot && slots > maxSlots { nodeID = id maxSlots = slots } } - return nodeID + + return nodeID, useSlot } func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t CompactionTask) int64 { diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 230f4dcef21f8..314ae9e7e88a3 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -147,6 +147,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP PreferSegmentRows: t.GetPreferSegmentRows(), AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)), AnalyzeSegmentIds: t.GetInputSegments(), // todo: if need + SlotUsage: Params.DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) @@ -406,8 +407,10 @@ func (t *clusteringCompactionTask) doAnalyze() error { func (t *clusteringCompactionTask) doCompact() error { log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) if t.NeedReAssignNodeID() { - return errors.New("not assign nodeID") + log.RatedWarn(10, "not assign nodeID") + return nil } + log = log.With(zap.Int64("nodeID", t.GetNodeID())) // todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird // check whether the compaction plan is already submitted considering @@ -436,6 +439,11 @@ func (t *clusteringCompactionTask) doCompact() error { } err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) if err != nil { + if errors.Is(err, merr.ErrDataNodeSlotExhausted) { + log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted") + t.updateAndSaveTaskMeta(setNodeID(NullNodeID)) + return nil + } log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return err @@ -558,7 +566,7 @@ func (t *clusteringCompactionTask) GetLabel() string { } func (t *clusteringCompactionTask) NeedReAssignNodeID() bool { - return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0 + return t.GetState() == datapb.CompactionTaskState_pipelining && (t.GetNodeID() == 0 || t.GetNodeID() == NullNodeID) } func (t *clusteringCompactionTask) CleanLogPath() { diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 2af08b53177e1..2a0db8960e436 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -237,6 +237,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err CollectionTtl: t.GetCollectionTtl(), TotalRows: t.GetTotalRows(), Schema: t.GetSchema(), + SlotUsage: Params.DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index d28c11944244c..3130176fa3f80 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -321,6 +321,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er CollectionTtl: t.GetCollectionTtl(), TotalRows: t.GetTotalRows(), Schema: t.GetSchema(), + SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index fb31c7611b9a4..d40698377911a 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -343,15 +343,71 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { } func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { + s.SetupTest() + nodeSlots := map[int64]int64{ + 100: 16, + 101: 24, + } + node, useSlot := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(100), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{}) + s.Equal(int64(NullNodeID), node) +} + +func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() { s.SetupTest() nodeSlots := map[int64]int64{ 100: 2, - 101: 3, + 101: 16, + 102: 10, + } + executingTasks := make(map[int64]CompactionTask, 0) + executingTasks[1] = &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + } + executingTasks[2] = &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, } - node := s.handler.pickAnyNode(nodeSlots) + s.handler.executingTasks = executingTasks + node, useSlot := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + }) s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot - node = s.handler.pickAnyNode(map[int64]int64{}) + node, useSlot = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + }) s.Equal(int64(NullNodeID), node) } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index e05fa1b1b8cc5..95f2022ddcff1 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -479,6 +479,7 @@ func Test_compactionTrigger_force(t *testing.T) { Channel: "ch1", TotalRows: 200, Schema: schema, + SlotUsage: 8, }, }, }, diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 2d7126dd86fce..cc5c0d5219408 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -165,6 +165,10 @@ func (t *clusteringCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + func (t *clusteringCompactionTask) GetCollection() int64 { return t.plan.GetSegmentBinlogs()[0].GetCollectionID() } @@ -205,7 +209,6 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro log.Warn("compact wrong, illegal compaction type") return nil, merr.WrapErrIllegalCompactionPlan() } - log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan())) if !funcutil.CheckCtxValid(ctx) { log.Warn("compact wrong, task context done or timeout") return nil, ctx.Err() @@ -1145,3 +1148,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b buffer.writer = writer return pack, nil } + +func (t *clusteringCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/compactor.go b/internal/datanode/compaction/compactor.go index 825723a98fd52..6d929bd30af9a 100644 --- a/internal/datanode/compaction/compactor.go +++ b/internal/datanode/compaction/compactor.go @@ -29,4 +29,6 @@ type Compactor interface { GetPlanID() typeutil.UniqueID GetCollection() typeutil.UniqueID GetChannelName() string + GetCompactionType() datapb.CompactionType + GetSlotUsage() int64 } diff --git a/internal/datanode/compaction/executor.go b/internal/datanode/compaction/executor.go index 167fc03acaaae..cfd2ad30c783a 100644 --- a/internal/datanode/compaction/executor.go +++ b/internal/datanode/compaction/executor.go @@ -21,11 +21,13 @@ import ( "sync" "github.com/samber/lo" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/semaphore" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -37,7 +39,7 @@ const ( type Executor interface { Start(ctx context.Context) - Execute(task Compactor) + Execute(task Compactor) (bool, error) Slots() int64 RemoveTask(planID int64) GetResults(planID int64) []*datapb.CompactionPlanResult @@ -50,8 +52,10 @@ type executor struct { completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult taskCh chan Compactor - taskSem *semaphore.Weighted + taskSem *semaphore.Weighted // todo remove this, unify with slot logic dropped *typeutil.ConcurrentSet[string] // vchannel dropped + usingSlots *atomic.Int64 + slotMu sync.Mutex // To prevent concurrency of release channel and compaction get results // all released channel's compaction tasks will be discarded @@ -66,27 +70,46 @@ func NewExecutor() *executor { taskCh: make(chan Compactor, maxTaskQueueNum), taskSem: semaphore.NewWeighted(maxParallelTaskNum), dropped: typeutil.NewConcurrentSet[string](), + usingSlots: atomic.NewInt64(0), } } -func (e *executor) Execute(task Compactor) { +func (e *executor) Execute(task Compactor) (bool, error) { + e.slotMu.Lock() + defer e.slotMu.Unlock() + if e.Slots() >= task.GetSlotUsage() { + e.usingSlots.Add(task.GetSlotUsage()) + } else { + return false, merr.WrapErrDataNodeSlotExhausted() + } _, ok := e.executing.GetOrInsert(task.GetPlanID(), task) if ok { log.Warn("duplicated compaction task", zap.Int64("planID", task.GetPlanID()), zap.String("channel", task.GetChannelName())) - return + return false, merr.WrapErrDuplicatedCompactionTask() } e.taskCh <- task + return true, nil } func (e *executor) Slots() int64 { - return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len()) + return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - e.usingSlots.Load() } func (e *executor) toCompleteState(task Compactor) { task.Complete() - e.executing.GetAndRemove(task.GetPlanID()) + e.getAndRemoveExecuting(task.GetPlanID()) +} + +func (e *executor) getAndRemoveExecuting(planID typeutil.UniqueID) (Compactor, bool) { + task, ok := e.executing.GetAndRemove(planID) + if ok { + e.slotMu.Lock() + e.usingSlots.Sub(task.GetSlotUsage()) + e.slotMu.Unlock() + } + return task, ok } func (e *executor) RemoveTask(planID int64) { @@ -140,7 +163,7 @@ func (e *executor) executeTask(task Compactor) { } func (e *executor) stopTask(planID int64) { - task, loaded := e.executing.GetAndRemove(planID) + task, loaded := e.getAndRemoveExecuting(planID) if loaded { log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName())) task.Stop() diff --git a/internal/datanode/compaction/executor_test.go b/internal/datanode/compaction/executor_test.go index 81b64556dafe9..40f5b0aabb5e5 100644 --- a/internal/datanode/compaction/executor_test.go +++ b/internal/datanode/compaction/executor_test.go @@ -33,6 +33,7 @@ func TestCompactionExecutor(t *testing.T) { mockC := NewMockCompactor(t) mockC.EXPECT().GetPlanID().Return(planID) mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) executor := NewExecutor() executor.Execute(mockC) executor.Execute(mockC) diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index f2edabbb3d4bd..611fb407a7161 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -98,6 +98,10 @@ func (t *LevelZeroCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *LevelZeroCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + func (t *LevelZeroCompactionTask) GetCollection() int64 { // The length of SegmentBinlogs is checked before task enqueueing. return t.plan.GetSegmentBinlogs()[0].GetCollectionID() @@ -434,3 +438,7 @@ func (t *LevelZeroCompactionTask) loadBF(ctx context.Context, targetSegments []* err := conc.AwaitAll(futures...) return bfs, err } + +func (t *LevelZeroCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 8144ed8e07366..4b853adfdac39 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -94,6 +94,10 @@ func (t *mixCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *mixCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + // return num rows of all segment compaction from func (t *mixCompactionTask) getNumRows() int64 { numRows := int64(0) @@ -392,3 +396,7 @@ func (t *mixCompactionTask) isExpiredEntity(ts typeutil.Timestamp) bool { return entityT.Add(time.Duration(t.plan.GetCollectionTtl())).Before(nowT) } + +func (t *mixCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/mock_compactor.go b/internal/datanode/compaction/mock_compactor.go index 19a83bf2e1b9d..073a25dac8e25 100644 --- a/internal/datanode/compaction/mock_compactor.go +++ b/internal/datanode/compaction/mock_compactor.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package compaction @@ -24,6 +24,10 @@ func (_m *MockCompactor) EXPECT() *MockCompactor_Expecter { func (_m *MockCompactor) Compact() (*datapb.CompactionPlanResult, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Compact") + } + var r0 *datapb.CompactionPlanResult var r1 error if rf, ok := ret.Get(0).(func() (*datapb.CompactionPlanResult, error)); ok { @@ -109,6 +113,10 @@ func (_c *MockCompactor_Complete_Call) RunAndReturn(run func()) *MockCompactor_C func (_m *MockCompactor) GetChannelName() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetChannelName") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -150,6 +158,10 @@ func (_c *MockCompactor_GetChannelName_Call) RunAndReturn(run func() string) *Mo func (_m *MockCompactor) GetCollection() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCollection") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -187,10 +199,59 @@ func (_c *MockCompactor_GetCollection_Call) RunAndReturn(run func() int64) *Mock return _c } +// GetCompactionType provides a mock function with given fields: +func (_m *MockCompactor) GetCompactionType() datapb.CompactionType { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetCompactionType") + } + + var r0 datapb.CompactionType + if rf, ok := ret.Get(0).(func() datapb.CompactionType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(datapb.CompactionType) + } + + return r0 +} + +// MockCompactor_GetCompactionType_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCompactionType' +type MockCompactor_GetCompactionType_Call struct { + *mock.Call +} + +// GetCompactionType is a helper method to define mock.On call +func (_e *MockCompactor_Expecter) GetCompactionType() *MockCompactor_GetCompactionType_Call { + return &MockCompactor_GetCompactionType_Call{Call: _e.mock.On("GetCompactionType")} +} + +func (_c *MockCompactor_GetCompactionType_Call) Run(run func()) *MockCompactor_GetCompactionType_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactor_GetCompactionType_Call) Return(_a0 datapb.CompactionType) *MockCompactor_GetCompactionType_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactor_GetCompactionType_Call) RunAndReturn(run func() datapb.CompactionType) *MockCompactor_GetCompactionType_Call { + _c.Call.Return(run) + return _c +} + // GetPlanID provides a mock function with given fields: func (_m *MockCompactor) GetPlanID() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetPlanID") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -228,6 +289,51 @@ func (_c *MockCompactor_GetPlanID_Call) RunAndReturn(run func() int64) *MockComp return _c } +// GetSlotUsage provides a mock function with given fields: +func (_m *MockCompactor) GetSlotUsage() int64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetSlotUsage") + } + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// MockCompactor_GetSlotUsage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSlotUsage' +type MockCompactor_GetSlotUsage_Call struct { + *mock.Call +} + +// GetSlotUsage is a helper method to define mock.On call +func (_e *MockCompactor_Expecter) GetSlotUsage() *MockCompactor_GetSlotUsage_Call { + return &MockCompactor_GetSlotUsage_Call{Call: _e.mock.On("GetSlotUsage")} +} + +func (_c *MockCompactor_GetSlotUsage_Call) Run(run func()) *MockCompactor_GetSlotUsage_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactor_GetSlotUsage_Call) Return(_a0 int64) *MockCompactor_GetSlotUsage_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactor_GetSlotUsage_Call) RunAndReturn(run func() int64) *MockCompactor_GetSlotUsage_Call { + _c.Call.Return(run) + return _c +} + // Stop provides a mock function with given fields: func (_m *MockCompactor) Stop() { _m.Called() diff --git a/internal/datanode/services.go b/internal/datanode/services.go index ee235f2de2909..432e429fe3f45 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -249,8 +249,12 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil } - node.compactionExecutor.Execute(task) - return merr.Success(), nil + succeed, err := node.compactionExecutor.Execute(task) + if succeed { + return merr.Success(), nil + } else { + return merr.Status(err), nil + } } // GetCompactionState called by DataCoord diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 05ee57b474db4..2e47caf4c125a 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -171,6 +171,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { mockC.EXPECT().GetPlanID().Return(int64(1)) mockC.EXPECT().GetCollection().Return(collection) mockC.EXPECT().GetChannelName().Return(channel) + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) mockC.EXPECT().Complete().Return() mockC.EXPECT().Compact().Return(&datapb.CompactionPlanResult{ PlanID: 1, diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index c2b5a8e5e237d..cde40ff8ddd37 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -560,6 +560,7 @@ message CompactionPlan { string analyze_result_path = 14; repeated int64 analyze_segment_ids = 15; int32 state = 16; + int64 slot_usage = 17; } message CompactionSegment { diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index c5433a5881fd3..d31fdc8044fbf 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -207,6 +207,9 @@ var ( ErrBuildCompactionRequestFail = newMilvusError("fail to build CompactionRequest", 2312, true) ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true) ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false) + ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false) + + ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false) // General ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index ad074c120c72a..1c84e51d02976 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1183,3 +1183,19 @@ func WrapErrCompactionResult(msg ...string) error { } return err } + +func WrapErrDataNodeSlotExhausted(msg ...string) error { + err := error(ErrDataNodeSlotExhausted) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + +func WrapErrDuplicatedCompactionTask(msg ...string) error { + err := error(ErrDuplicatedCompactionTask) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f7b01120f1ff4..6544c52b4ce04 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2947,6 +2947,10 @@ type dataCoordConfig struct { WaitForIndex ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` + + ClusteringCompactionSlotUsage ParamItem `refreshable:"true"` + MixCompactionSlotUsage ParamItem `refreshable:"true"` + L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -3705,6 +3709,36 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: true, } p.GracefulStopTimeout.Init(base.mgr) + + p.ClusteringCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.clusteringCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of clustering compaction job.", + DefaultValue: "16", + PanicIfEmpty: false, + Export: true, + } + p.ClusteringCompactionSlotUsage.Init(base.mgr) + + p.MixCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.mixCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of mix compaction job.", + DefaultValue: "8", + PanicIfEmpty: false, + Export: true, + } + p.MixCompactionSlotUsage.Init(base.mgr) + + p.L0DeleteCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.l0DeleteCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of l0 compaction job.", + DefaultValue: "8", + PanicIfEmpty: false, + Export: true, + } + p.L0DeleteCompactionSlotUsage.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -4084,7 +4118,7 @@ if this parameter <= 0, will set it as 10`, p.SlotCap = ParamItem{ Key: "dataNode.slot.slotCap", Version: "2.4.2", - DefaultValue: "2", + DefaultValue: "16", Doc: "The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode", Export: true, } @@ -4104,7 +4138,7 @@ if this parameter <= 0, will set it as 10`, Key: "dataNode.clusteringCompaction.workPoolSize", Version: "2.4.6", Doc: "worker pool size for one clustering compaction job.", - DefaultValue: "1", + DefaultValue: "8", PanicIfEmpty: false, Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index e00fb841c1b1e..195608a0643a2 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -465,6 +465,12 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize()) params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m") assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionPreferSegmentSize.GetAsSize()) + params.Save("dataCoord.slot.clusteringCompactionUsage", "10") + assert.Equal(t, 10, Params.ClusteringCompactionSlotUsage.GetAsInt()) + params.Save("dataCoord.slot.mixCompactionUsage", "5") + assert.Equal(t, 5, Params.MixCompactionSlotUsage.GetAsInt()) + params.Save("dataCoord.slot.l0DeleteCompactionUsage", "4") + assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt()) }) t.Run("test dataNodeConfig", func(t *testing.T) { @@ -517,10 +523,13 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 16, Params.ReadBufferSizeInMB.GetAsInt()) params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) - assert.Equal(t, 2, Params.SlotCap.GetAsInt()) + assert.Equal(t, 16, Params.SlotCap.GetAsInt()) + // clustering compaction params.Save("datanode.clusteringCompaction.memoryBufferRatio", "0.1") assert.Equal(t, 0.1, Params.ClusteringCompactionMemoryBufferRatio.GetAsFloat()) + params.Save("datanode.clusteringCompaction.workPoolSize", "2") + assert.Equal(t, int64(2), Params.ClusteringCompactionWorkerPoolSize.GetAsInt64()) assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) })