Skip to content

Commit

Permalink
fix: [10kcp] channel unbalance during stopping balance progress (#38972)
Browse files Browse the repository at this point in the history
issue: #38970,
#37630

cause the stopping balance channel still use the row_count_based policy,
which may causes channel unbalance in multi-collection case.

This PR impl a score based stopping balance channel policy.

pr: #38971

Signed-off-by: bigsheeper <[email protected]>
Co-authored-by: Wei Liu <[email protected]>
  • Loading branch information
bigsheeper and weiliu1031 authored Jan 3, 2025
1 parent 9b2b2a2 commit f5f4fed
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
11 changes: 7 additions & 4 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ func (segPlan *SegmentAssignPlan) String() string {
}

type ChannelAssignPlan struct {
Channel *meta.DmChannel
Replica *meta.Replica
From int64
To int64
Channel *meta.DmChannel
Replica *meta.Replica
From int64
To int64
FromScore int64
ToScore int64
ChannelScore int64
}

func (chanPlan *ChannelAssignPlan) String() string {
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func newNodeItem(currentScore int, nodeID int64) nodeItem {

func (b *nodeItem) getPriority() int {
// if node lacks more score between assignedScore and currentScore, then higher priority
return int(b.currentScore - b.assignedScore)
return int(math.Ceil(b.currentScore - b.assignedScore))
}

func (b *nodeItem) setPriority(priority int) {
Expand Down
30 changes: 22 additions & 8 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,19 +192,19 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
}

from := int64(-1)
// fromScore := int64(0)
fromScore := int64(0)
if sourceNode != nil {
from = sourceNode.nodeID
// fromScore = int64(sourceNode.getPriority())
fromScore = int64(sourceNode.getPriority())
}

plan := ChannelAssignPlan{
From: from,
To: targetNode.nodeID,
Channel: ch,
// FromScore: fromScore,
// ToScore: int64(targetNode.getPriority()),
// SegmentScore: int64(scoreChanges),
From: from,
To: targetNode.nodeID,
Channel: ch,
FromScore: fromScore,
ToScore: int64(targetNode.getPriority()),
ChannelScore: int64(scoreChanges),
}
br.AddRecord(StrRecordf("add segment plan %s", plan))
plans = append(plans, plan)
Expand Down Expand Up @@ -486,6 +486,20 @@ func (b *ScoreBasedBalancer) BalanceReplica(replica *meta.Replica) (segmentPlans
return segmentPlans, channelPlans
}

func (b *ScoreBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, rwNodes []int64, roNodes []int64) []ChannelAssignPlan {
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range roNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
plans := b.AssignChannel(replica.GetCollectionID(), dmChannels, rwNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
}
channelPlans = append(channelPlans, plans...)
}
return channelPlans
}

func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []SegmentAssignPlan {
segmentPlans := make([]SegmentAssignPlan, 0)
for _, nodeID := range offlineNodes {
Expand Down
1 change: 1 addition & 0 deletions internal/querycoordv2/balance/score_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"

etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
Expand Down

0 comments on commit f5f4fed

Please sign in to comment.