Skip to content

Commit

Permalink
enhance: Enable score based balance channel policy (#38143) (#38378)
Browse files Browse the repository at this point in the history
issue: #38142
pr: #38143
current balance channel policy only consider current collection's
distribution, so if all collections has 1 channel, and all channels has
been loaded on same querynode, after querynode num increase, balance
channel won't be triggered.

This PR enable score based balance channel policy, to achieve:
1. distribute all channels evenly across multiple querynodes
2. distribute each collection's channel evenly across multiple
querynodes.

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Dec 13, 2024
1 parent df73f93 commit 83e162f
Show file tree
Hide file tree
Showing 18 changed files with 547 additions and 49 deletions.
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ queryCoord:
rowCountFactor: 0.4 # the row count weight used when balancing segments among queryNodes
segmentCountFactor: 0.4 # the segment count weight used when balancing segments among queryNodes
globalSegmentCountFactor: 0.1 # the segment count weight used when balancing segments among queryNodes
# the channel count weight used when balancing channels among queryNodes,
# A higher value reduces the likelihood of assigning channels from the same collection to the same QueryNode. Set to 1 to disable this feature.
collectionChannelCountFactor: 10
segmentCountMaxSteps: 50 # segment count based plan generator max steps
rowCountMaxSteps: 50 # segment count based plan generator max steps
randomMaxSteps: 10 # segment count based plan generator max steps
Expand Down
1 change: 0 additions & 1 deletion internal/datacoord/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,6 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
})

}

func TestShouldDropChannel(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (chanPlan *ChannelAssignPlan) String() string {

type Balance interface {
AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan
AssignChannel(channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan)
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
return ret
}

func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RoundRobinBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(c.assignments, c.nodeIDs, false)
plans := suite.roundRobinBalancer.AssignChannel(1, c.assignments, c.nodeIDs, false)
suite.ElementsMatch(c.expectPlans, plans)
})
}
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingChannelPlan(replica *meta.Replica
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range offlineNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID), meta.WithChannelName2Channel(channelName))
plans := b.AssignChannel(dmChannels, onlineNodes, false)
plans := b.AssignChannel(replica.GetCollectionID(), dmChannels, onlineNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
Expand Down Expand Up @@ -175,7 +175,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica

func (b *ChannelLevelScoreBalancer) genSegmentPlan(br *balanceReport, replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes)
nodeItemsMap := b.convertToNodeItemsBySegment(br, replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
Expand Down Expand Up @@ -261,7 +261,7 @@ func (b *ChannelLevelScoreBalancer) genChannelPlan(replica *meta.Replica, channe
return nil
}

channelPlans := b.AssignChannel(channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
Expand Down
21 changes: 11 additions & 10 deletions internal/querycoordv2/balance/mock_balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me

// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
// try to make every query node has channel count
func (b *RowCountBasedBalancer) AssignChannel(channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RowCountBasedBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand Down Expand Up @@ -311,7 +311,7 @@ func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, rw
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range roNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
plans := b.AssignChannel(dmChannels, rwNodes, false)
plans := b.AssignChannel(replica.GetCollectionID(), dmChannels, rwNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
Expand Down Expand Up @@ -349,7 +349,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(br *balanceReport, replica *meta.
return nil
}

channelPlans := b.AssignChannel(channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
Expand Down
Loading

0 comments on commit 83e162f

Please sign in to comment.