From f5f4fed88908c52ef1996ed6877f4b22d1ca295e Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 3 Jan 2025 12:05:03 +0800 Subject: [PATCH] fix: [10kcp] channel unbalance during stopping balance progress (#38972) issue: https://github.com/milvus-io/milvus/issues/38970, https://github.com/milvus-io/milvus/issues/37630 cause the stopping balance channel still use the row_count_based policy, which may causes channel unbalance in multi-collection case. This PR impl a score based stopping balance channel policy. pr: https://github.com/milvus-io/milvus/pull/38971 Signed-off-by: bigsheeper Co-authored-by: Wei Liu --- internal/querycoordv2/balance/balance.go | 11 ++++--- .../balance/rowcount_based_balancer.go | 2 +- .../balance/score_based_balancer.go | 30 ++++++++++++++----- .../balance/score_based_balancer_test.go | 1 + 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index f95b09a965a52..a9c8dfd4eb366 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -45,10 +45,13 @@ func (segPlan *SegmentAssignPlan) String() string { } type ChannelAssignPlan struct { - Channel *meta.DmChannel - Replica *meta.Replica - From int64 - To int64 + Channel *meta.DmChannel + Replica *meta.Replica + From int64 + To int64 + FromScore int64 + ToScore int64 + ChannelScore int64 } func (chanPlan *ChannelAssignPlan) String() string { diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 692bbd66f7f1a..ad6f7fdf86ea3 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -391,7 +391,7 @@ func newNodeItem(currentScore int, nodeID int64) nodeItem { func (b *nodeItem) getPriority() int { // if node lacks more score between assignedScore and currentScore, then higher priority - return int(b.currentScore - b.assignedScore) + return int(math.Ceil(b.currentScore - b.assignedScore)) } func (b *nodeItem) setPriority(priority int) { diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 9c549c3a791a9..cd97e77f02c73 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -192,19 +192,19 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64 } from := int64(-1) - // fromScore := int64(0) + fromScore := int64(0) if sourceNode != nil { from = sourceNode.nodeID - // fromScore = int64(sourceNode.getPriority()) + fromScore = int64(sourceNode.getPriority()) } plan := ChannelAssignPlan{ - From: from, - To: targetNode.nodeID, - Channel: ch, - // FromScore: fromScore, - // ToScore: int64(targetNode.getPriority()), - // SegmentScore: int64(scoreChanges), + From: from, + To: targetNode.nodeID, + Channel: ch, + FromScore: fromScore, + ToScore: int64(targetNode.getPriority()), + ChannelScore: int64(scoreChanges), } br.AddRecord(StrRecordf("add segment plan %s", plan)) plans = append(plans, plan) @@ -486,6 +486,20 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans return segmentPlans, channelPlans } +func (b *ScoreBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, rwNodes []int64, roNodes []int64) []ChannelAssignPlan { + channelPlans := make([]ChannelAssignPlan, 0) + for _, nodeID := range roNodes { + dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID)) + plans := b.AssignChannel(replica.GetCollectionID(), dmChannels, rwNodes, false) + for i := range plans { + plans[i].From = nodeID + plans[i].Replica = replica + } + channelPlans = append(channelPlans, plans...) + } + return channelPlans +} + func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan { segmentPlans := make([]SegmentAssignPlan, 0) for _, nodeID := range offlineNodes { diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 75975507f8e26..692774e3a6f20 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -21,6 +21,7 @@ import ( "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" + "go.uber.org/atomic" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord"