diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 8bef5b7f5a33f..f95d18b0658f3 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -548,13 +548,13 @@ func (m *CollectionManager) updateLoadMetrics() { metrics.QueryCoordNumPartitions.WithLabelValues().Set(float64(len(lo.Filter(lo.Values(m.partitions), func(part *Partition, _ int) bool { return part.LoadPercentage == 100 })))) } -func (m *CollectionManager) UpdateLoadPercent(ctx context.Context, partitionID int64, loadPercent int32) (int32, error) { +func (m *CollectionManager) UpdatePartitionLoadPercent(ctx context.Context, partitionID int64, loadPercent int32) error { m.rwmutex.Lock() defer m.rwmutex.Unlock() oldPartition, ok := m.partitions[partitionID] if !ok { - return 0, merr.WrapErrPartitionNotFound(partitionID) + return merr.WrapErrPartitionNotFound(partitionID) } // update partition load percentage @@ -562,7 +562,7 @@ func (m *CollectionManager) UpdateLoadPercent(ctx context.Context, partitionID i newPartition.LoadPercentage = loadPercent savePartition := false if loadPercent == 100 { - savePartition = true + savePartition = newPartition.Status != querypb.LoadStatus_Loaded || newPartition.RecoverTimes != 0 newPartition.Status = querypb.LoadStatus_Loaded // if partition becomes loaded, clear it's recoverTimes in load info newPartition.RecoverTimes = 0 @@ -570,22 +570,24 @@ func (m *CollectionManager) UpdateLoadPercent(ctx context.Context, partitionID i metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds())) eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Partition %d loaded", partitionID))) } - err := m.putPartition(ctx, []*Partition{newPartition}, savePartition) - if err != nil { - return 0, err - } + return m.putPartition(ctx, []*Partition{newPartition}, savePartition) +} + +func (m *CollectionManager) UpdateCollectionLoadPercent(ctx context.Context, collectionID int64) (int32, error) { + m.rwmutex.Lock() + defer m.rwmutex.Unlock() // update collection load percentage - oldCollection, ok := m.collections[newPartition.CollectionID] + oldCollection, ok := m.collections[collectionID] if !ok { - return 0, merr.WrapErrCollectionNotFound(newPartition.CollectionID) + return 0, merr.WrapErrCollectionNotFound(collectionID) } collectionPercent := m.calculateLoadPercentage(oldCollection.CollectionID) newCollection := oldCollection.Clone() newCollection.LoadPercentage = collectionPercent saveCollection := false if collectionPercent == 100 { - saveCollection = true + saveCollection = newCollection.Status != querypb.LoadStatus_Loaded || newCollection.RecoverTimes != 0 if newCollection.LoadSpan != nil { newCollection.LoadSpan.End() newCollection.LoadSpan = nil diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index 2ee5e99dcc318..2a8b5823bffd9 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -383,8 +383,11 @@ func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() { // update load percent, then recover for second time for _, collectionID := range suite.collections { for _, partitionID := range suite.partitions[collectionID] { - mgr.UpdateLoadPercent(ctx, partitionID, 10) + err = mgr.UpdatePartitionLoadPercent(ctx, partitionID, 10) + suite.NoError(err) } + _, err = mgr.UpdateCollectionLoadPercent(ctx, collectionID) + suite.NoError(err) } suite.clearMemory() err = mgr.Recover(ctx, suite.broker) @@ -444,27 +447,32 @@ func (suite *CollectionManagerSuite) TestUpdateLoadPercentage() { }) } // test update partition load percentage - mgr.UpdateLoadPercent(ctx, 1, 30) + err := mgr.UpdatePartitionLoadPercent(ctx, 1, 30) + suite.NoError(err) partition := mgr.GetPartition(ctx, 1) suite.Equal(int32(30), partition.LoadPercentage) suite.Equal(int32(30), mgr.GetPartitionLoadPercentage(ctx, partition.PartitionID)) suite.Equal(querypb.LoadStatus_Loading, partition.Status) collection := mgr.GetCollection(ctx, 1) - suite.Equal(int32(15), collection.LoadPercentage) + suite.Equal(int32(0), collection.LoadPercentage) suite.Equal(querypb.LoadStatus_Loading, collection.Status) // test update partition load percentage to 100 - mgr.UpdateLoadPercent(ctx, 1, 100) + err = mgr.UpdatePartitionLoadPercent(ctx, 1, 100) + suite.NoError(err) partition = mgr.GetPartition(ctx, 1) suite.Equal(int32(100), partition.LoadPercentage) suite.Equal(querypb.LoadStatus_Loaded, partition.Status) collection = mgr.GetCollection(ctx, 1) - suite.Equal(int32(50), collection.LoadPercentage) + suite.Equal(int32(0), collection.LoadPercentage) suite.Equal(querypb.LoadStatus_Loading, collection.Status) // test update collection load percentage - mgr.UpdateLoadPercent(ctx, 2, 100) + err = mgr.UpdatePartitionLoadPercent(ctx, 2, 100) + suite.NoError(err) partition = mgr.GetPartition(ctx, 1) suite.Equal(int32(100), partition.LoadPercentage) suite.Equal(querypb.LoadStatus_Loaded, partition.Status) + _, err = mgr.UpdateCollectionLoadPercent(ctx, 1) + suite.NoError(err) collection = mgr.GetCollection(ctx, 1) suite.Equal(int32(100), collection.LoadPercentage) suite.Equal(querypb.LoadStatus_Loaded, collection.Status) diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index f36cf2da4d262..99f8c2f06a341 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -261,19 +261,31 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { } loaded := true + hasUpdate := false + + channelTargetNum, subChannelCount := ob.observeChannelStatus(ctx, task.CollectionID) + for _, partition := range partitions { if partition.LoadPercentage == 100 { continue } if ob.readyToObserve(ctx, partition.CollectionID) { replicaNum := ob.meta.GetReplicaNumber(ctx, partition.GetCollectionID()) - ob.observePartitionLoadStatus(ctx, partition, replicaNum) + has := ob.observePartitionLoadStatus(ctx, partition, replicaNum, channelTargetNum, subChannelCount) + if has { + hasUpdate = true + } } partition = ob.meta.GetPartition(ctx, partition.PartitionID) if partition != nil && partition.LoadPercentage != 100 { loaded = false } } + + if hasUpdate { + ob.observeCollectionLoadStatus(ctx, task.CollectionID) + } + // all partition loaded, finish task if len(partitions) > 0 && loaded { log.Info("Load task finish", @@ -293,37 +305,48 @@ func (ob *CollectionObserver) observeLoadStatus(ctx context.Context) { } } -func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32) { +func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collectionID int64) (int, int) { + channelTargets := ob.targetMgr.GetDmChannelsByCollection(ctx, collectionID, meta.NextTarget) + + channelTargetNum := len(channelTargets) + if channelTargetNum == 0 { + log.Info("channels in target is empty, waiting for new target content") + return 0, 0 + } + + subChannelCount := 0 + for _, channel := range channelTargets { + views := ob.dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) + nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID }) + group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, collectionID, nodes) + subChannelCount += len(group) + } + return channelTargetNum, subChannelCount +} + +func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool { log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With( zap.Int64("collectionID", partition.GetCollectionID()), zap.Int64("partitionID", partition.GetPartitionID()), ) segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget) - channelTargets := ob.targetMgr.GetDmChannelsByCollection(ctx, partition.GetCollectionID(), meta.NextTarget) - targetNum := len(segmentTargets) + len(channelTargets) + targetNum := len(segmentTargets) + channelTargetNum if targetNum == 0 { log.Info("segments and channels in target are both empty, waiting for new target content") - return + return false } log.RatedInfo(10, "partition targets", zap.Int("segmentTargetNum", len(segmentTargets)), - zap.Int("channelTargetNum", len(channelTargets)), + zap.Int("channelTargetNum", channelTargetNum), zap.Int("totalTargetNum", targetNum), zap.Int32("replicaNum", replicaNum), ) - loadedCount := 0 + loadedCount := subChannelCount loadPercentage := int32(0) - for _, channel := range channelTargets { - views := ob.dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) - nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID }) - group := utils.GroupNodesByReplica(ctx, ob.meta.ReplicaManager, partition.GetCollectionID(), nodes) - loadedCount += len(group) - } - subChannelCount := loadedCount for _, segment := range segmentTargets { views := ob.dist.LeaderViewManager.GetByFilter( meta.WithChannelName2LeaderView(segment.GetInsertChannel()), @@ -341,29 +364,42 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa if loadedCount <= ob.partitionLoadedCount[partition.GetPartitionID()] && loadPercentage != 100 { ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount - return + return false } ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount if loadPercentage == 100 { if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) { log.Warn("failed to manual check current target, skip update load status") - return + return false } delete(ob.partitionLoadedCount, partition.GetPartitionID()) } - collectionPercentage, err := ob.meta.CollectionManager.UpdateLoadPercent(ctx, partition.PartitionID, loadPercentage) + err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage) if err != nil { - log.Warn("failed to update load percentage") + log.Warn("failed to update partition load percentage") } - log.Info("load status updated", + log.Info("partition load status updated", zap.Int32("partitionLoadPercentage", loadPercentage), + ) + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage))) + return true +} + +func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) { + log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID)) + + collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, collectionID) + if err != nil { + log.Warn("failed to update collection load percentage") + } + log.Info("collection load status updated", zap.Int32("collectionLoadPercentage", collectionPercentage), ) if collectionPercentage == 100 { - ob.invalidateCache(ctx, partition.GetCollectionID()) + ob.invalidateCache(ctx, collectionID) } - eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("collection %d load percentage update: %d", partition.CollectionID, loadPercentage))) + eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("collection %d load percentage update: %d", collectionID, collectionPercentage))) } func (ob *CollectionObserver) invalidateCache(ctx context.Context, collectionID int64) {