Skip to content

Commit

Permalink
enhance: [10kcp] Enable score based balance channel policy (#38301)
Browse files Browse the repository at this point in the history
issue: #38142
current balance channel policy only consider current collection's
distribution, so if all collections has 1 channel, and all channels has
been loaded on same querynode, after querynode num increase, balance
channel won't be triggered.

This PR enable score based balance channel policy, to achieve:

1. distribute all channels evenly across multiple querynodes
2. distribute each collection's channel evenly across multiple
querynodes.

pr: #38143

---------

Signed-off-by: bigsheeper <[email protected]>
Co-authored-by: Wei Liu <[email protected]>
  • Loading branch information
bigsheeper and weiliu1031 authored Dec 9, 2024
1 parent ae4e2b8 commit 3e65cc5
Show file tree
Hide file tree
Showing 15 changed files with 285 additions and 32 deletions.
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ queryCoord:
rowCountFactor: 0.4 # the row count weight used when balancing segments among queryNodes
segmentCountFactor: 0.4 # the segment count weight used when balancing segments among queryNodes
globalSegmentCountFactor: 0.1 # the segment count weight used when balancing segments among queryNodes
# the channel count weight used when balancing channels among queryNodes,
# A higher value reduces the likelihood of assigning channels from the same collection to the same QueryNode. Set to 1 to disable this feature.
collectionChannelCountFactor: 10
segmentCountMaxSteps: 50 # segment count based plan generator max steps
rowCountMaxSteps: 50 # segment count based plan generator max steps
randomMaxSteps: 10 # segment count based plan generator max steps
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (chanPlan *ChannelAssignPlan) String() string {

type Balance interface {
AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan
AssignChannel(channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan)
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
return ret
}

func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RoundRobinBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(c.assignments, c.nodeIDs, false)
plans := suite.roundRobinBalancer.AssignChannel(1, c.assignments, c.nodeIDs, false)
suite.ElementsMatch(c.expectPlans, plans)
})
}
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingChannelPlan(replica *meta.Replica
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range offlineNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID), meta.WithChannelName2Channel(channelName))
plans := b.AssignChannel(dmChannels, onlineNodes, false)
plans := b.AssignChannel(replica.GetCollectionID(), dmChannels, onlineNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
Expand Down Expand Up @@ -172,7 +172,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica

func (b *ChannelLevelScoreBalancer) genSegmentPlan(br *balanceReport, replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes)
nodeItemsMap := b.convertToNodeItemsBySegment(br, replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (b *ChannelLevelScoreBalancer) genChannelPlan(replica *meta.Replica, channe
return nil
}

channelPlans := b.AssignChannel(channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
Expand Down
21 changes: 11 additions & 10 deletions internal/querycoordv2/balance/mock_balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me

// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
// try to make every query node has channel count
func (b *RowCountBasedBalancer) AssignChannel(channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RowCountBasedBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand Down Expand Up @@ -308,7 +308,7 @@ func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, rw
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range roNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
plans := b.AssignChannel(dmChannels, rwNodes, false)
plans := b.AssignChannel(replica.GetCollectionID(), dmChannels, rwNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
Expand Down Expand Up @@ -346,7 +346,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(br *balanceReport, replica *meta.
return nil
}

channelPlans := b.AssignChannel(channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
Expand Down
Loading

0 comments on commit 3e65cc5

Please sign in to comment.