Skip to content

Commit

Permalink
enhance: reduce the cpu usage when collection number is high
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofanluan <[email protected]>
  • Loading branch information
xiaofan-luan committed Apr 25, 2024
1 parent b287fba commit 4e4e849
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 28 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNode
func (b *RowCountBasedBalancer) genStoppingChannelPlan(replica *meta.Replica, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan {
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range offlineNodes {
dmChannels := b.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(nodeID))
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
plans := b.AssignChannel(dmChannels, onlineNodes, false)
for i := range plans {
plans[i].From = nodeID
Expand All @@ -341,7 +341,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(replica *meta.Replica, onlineNode
nodeWithLessChannel := make([]int64, 0)
channelsToMove := make([]*meta.DmChannel, 0)
for _, node := range onlineNodes {
channels := b.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node))
channels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node))

if len(channels) <= average {
nodeWithLessChannel = append(nodeWithLessChannel, node)
Expand Down
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica,
// 3. print stopping nodes channel distribution
distInfo += "[stoppingNodesChannelDist:"
for stoppingNodeID := range stoppingNodesSegments {
stoppingNodeChannels := channelManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(stoppingNodeID))
stoppingNodeChannels := channelManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(stoppingNodeID))
distInfo += fmt.Sprintf("[nodeID:%d, count:%d,", stoppingNodeID, len(stoppingNodeChannels))
distInfo += "channels:["
for _, stoppingChan := range stoppingNodeChannels {
Expand All @@ -189,7 +189,7 @@ func PrintCurrentReplicaDist(replica *meta.Replica,
// 4. print normal nodes channel distribution
distInfo += "[normalNodesChannelDist:"
for normalNodeID := range nodeSegments {
normalNodeChannels := channelManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(normalNodeID))
normalNodeChannels := channelManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(normalNodeID))
distInfo += fmt.Sprintf("[nodeID:%d, count:%d,", normalNodeID, len(normalNodeChannels))
distInfo += "channels:["
for _, normalNodeChan := range normalNodeChannels {
Expand Down
8 changes: 2 additions & 6 deletions internal/querycoordv2/checkers/channel_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,7 @@ func (c *ChannelChecker) getDmChannelDiff(collectionID int64,
}

func (c *ChannelChecker) getChannelDist(replica *meta.Replica) []*meta.DmChannel {
dist := make([]*meta.DmChannel, 0)
for _, nodeID := range replica.GetNodes() {
dist = append(dist, c.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(nodeID))...)
}
return dist
return c.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeIDsFilter(replica.GetNodes()))
}

func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int64) []*meta.DmChannel {
Expand All @@ -183,7 +179,7 @@ func (c *ChannelChecker) findRepeatedChannels(ctx context.Context, replicaID int
for _, ch := range dist {
leaderView := c.dist.LeaderViewManager.GetLeaderShardView(ch.Node, ch.GetChannelName())
if leaderView == nil {
log.Info("shard leadview is not ready, skip",
log.Info("shard leader view is not ready, skip",
zap.Int64("collectionID", replica.GetCollectionID()),
zap.Int64("replicaID", replicaID),
zap.Int64("leaderID", ch.Node),
Expand Down
6 changes: 6 additions & 0 deletions internal/querycoordv2/job/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func waitCollectionReleased(dist *meta.DistributionManager, checkerController *c

if len(channels)+len(segments) == 0 {
break
} else {
log.Info("wait for release done", zap.Int64("collection", collection),
zap.Int64s("partitions", partitions),
zap.Int("channel", len(channels)),
zap.Int("segments", len(segments)),
)
}

// trigger check more frequently
Expand Down
60 changes: 55 additions & 5 deletions internal/querycoordv2/meta/channel_dist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,20 @@ func WithNodeID2Channel(nodeID int64) ChannelDistFilter {
}
}

func WithReplica2Channel(replica *Replica) ChannelDistFilter {
func WithNodeIDsFilter(nodeIDs []int64) ChannelDistFilter {
return func(ch *DmChannel) bool {
return ch.GetCollectionID() == replica.GetCollectionID() && replica.Contains(ch.Node)
for _, id := range nodeIDs {
if ch.Node == id {
return true
}
}
return false
}
}

func WithChannelName2Channel(channelName string) ChannelDistFilter {
func WithReplica2Channel(replica *Replica) ChannelDistFilter {
return func(ch *DmChannel) bool {
return ch.GetChannelName() == channelName
return ch.GetCollectionID() == replica.GetCollectionID() && replica.Contains(ch.Node)
}
}

Expand Down Expand Up @@ -76,11 +81,15 @@ type ChannelDistManager struct {

// NodeID -> Channels
channels map[UniqueID][]*DmChannel

// CollectionID -> Channels
collectionIndex map[int64]map[string]*DmChannel
}

func NewChannelDistManager() *ChannelDistManager {
return &ChannelDistManager{
channels: make(map[UniqueID][]*DmChannel),
channels: make(map[UniqueID][]*DmChannel),
collectionIndex: make(map[int64]map[string]*DmChannel),
}
}

Expand Down Expand Up @@ -146,6 +155,31 @@ func (m *ChannelDistManager) GetByFilter(filters ...ChannelDistFilter) []*DmChan
return ret
}

func (m *ChannelDistManager) GetByCollectionAndFilter(collectionID int64, filters ...ChannelDistFilter) []*DmChannel {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

mergedFilters := func(ch *DmChannel) bool {
for _, fn := range filters {
if fn != nil && !fn(ch) {
return false
}
}

return true
}

ret := make([]*DmChannel, 0)

// If a collection ID is provided, use the collection index
for _, channel := range m.collectionIndex[collectionID] {
if mergedFilters(channel) {
ret = append(ret, channel)
}
}
return ret
}

func (m *ChannelDistManager) Update(nodeID UniqueID, channels ...*DmChannel) {
m.rwmutex.Lock()
defer m.rwmutex.Unlock()
Expand All @@ -155,4 +189,20 @@ func (m *ChannelDistManager) Update(nodeID UniqueID, channels ...*DmChannel) {
}

m.channels[nodeID] = channels

m.updateCollectionIndex()
}

// update secondary index for channel distribution
func (m *ChannelDistManager) updateCollectionIndex() {
m.collectionIndex = make(map[int64]map[string]*DmChannel)
for _, nodeChannels := range m.channels {
for _, channel := range nodeChannels {
collectionID := channel.GetCollectionID()
if _, ok := m.collectionIndex[collectionID]; !ok {
m.collectionIndex[collectionID] = make(map[string]*DmChannel)
}
m.collectionIndex[collectionID][channel.GetChannelName()] = channel
}
}
}
10 changes: 5 additions & 5 deletions internal/querycoordv2/meta/channel_dist_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,26 @@ func (suite *ChannelDistManagerSuite) TestGetBy() {
}

// Test GetByCollection
channels = dist.GetByFilter(WithCollectionID2Channel(suite.collection))
channels = dist.GetByCollectionAndFilter(suite.collection)
suite.Len(channels, 4)
suite.AssertCollection(channels, suite.collection)
channels = dist.GetByFilter(WithCollectionID2Channel(-1))
channels = dist.GetByCollectionAndFilter(-1)
suite.Len(channels, 0)

// Test GetByNodeAndCollection
// 1. Valid node and valid collection
for _, node := range suite.nodes {
channels := dist.GetByFilter(WithCollectionID2Channel(suite.collection), WithNodeID2Channel(node))
channels := dist.GetByCollectionAndFilter(suite.collection, WithNodeID2Channel(node))
suite.AssertNode(channels, node)
suite.AssertCollection(channels, suite.collection)
}

// 2. Valid node and invalid collection
channels = dist.GetByFilter(WithCollectionID2Channel(-1), WithNodeID2Channel(suite.nodes[1]))
channels = dist.GetByCollectionAndFilter(-1, WithNodeID2Channel(suite.nodes[1]))
suite.Len(channels, 0)

// 3. Invalid node and valid collection
channels = dist.GetByFilter(WithCollectionID2Channel(suite.collection), WithNodeID2Channel(-1))
channels = dist.GetByCollectionAndFilter(suite.collection, WithNodeID2Channel(-1))
suite.Len(channels, 0)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/observers/replica_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (ob *ReplicaObserver) checkNodesInReplica() {
)
removeNodes := make([]int64, 0, len(roNodes))
for _, node := range roNodes {
channels := ob.distMgr.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(node))
channels := ob.distMgr.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(node))
segments := ob.distMgr.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(node))
if len(channels) == 0 && len(segments) == 0 {
removeNodes = append(removeNodes, node)
Expand Down
8 changes: 4 additions & 4 deletions internal/querycoordv2/observers/resource_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() {
manager := ob.meta.ResourceManager
rgNames := manager.ListResourceGroups()
enableRGAutoRecover := params.Params.QueryCoordCfg.EnableRGAutoRecover.GetAsBool()
log.Info("start to check resource group", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
log.Debug("start to check resource group", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))

// Check if there is any incoming node.
if manager.CheckIncomingNodeNum() > 0 {
Expand All @@ -100,10 +100,10 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() {
}

// Remove all down nodes in resource group manager.
log.Info("remove all down nodes in resource group manager...")
log.Debug("remove all down nodes in resource group manager...")
ob.meta.RemoveAllDownNode()

log.Info("recover resource groups...")
log.Debug("recover resource groups...")
// Recover all resource group into expected configuration.
for _, rgName := range rgNames {
if err := manager.MeetRequirement(rgName); err != nil {
Expand All @@ -126,5 +126,5 @@ func (ob *ResourceObserver) checkAndRecoverResourceGroup() {
if enableRGAutoRecover {
utils.RecoverAllCollection(ob.meta)
}
log.Info("check resource group done", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
log.Debug("check resource group done", zap.Bool("enableRGAutoRecover", enableRGAutoRecover), zap.Int("resourceGroupNum", len(rgNames)))
}
2 changes: 1 addition & 1 deletion internal/querycoordv2/ops_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (s *Server) TransferChannel(ctx context.Context, req *querypb.TransferChann
dstNodeSet.Remove(srcNode)

// check sealed segment list
channels := s.dist.ChannelDistManager.GetByFilter(meta.WithCollectionID2Channel(replica.GetCollectionID()), meta.WithNodeID2Channel(srcNode))
channels := s.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(srcNode))
toBalance := typeutil.NewSet[*meta.DmChannel]()
if req.GetTransferAll() {
toBalance.Insert(channels...)
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -1688,7 +1688,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
p.SegmentCheckInterval = ParamItem{
Key: "queryCoord.checkSegmentInterval",
Version: "2.3.0",
DefaultValue: "1000",
DefaultValue: "3000",
PanicIfEmpty: true,
Export: true,
}
Expand All @@ -1697,7 +1697,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
p.ChannelCheckInterval = ParamItem{
Key: "queryCoord.checkChannelInterval",
Version: "2.3.0",
DefaultValue: "1000",
DefaultValue: "3000",
PanicIfEmpty: true,
Export: true,
}
Expand Down

0 comments on commit 4e4e849

Please sign in to comment.