diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 87caa9e91c036..3bb1c31924a40 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -88,8 +88,12 @@ type TargetObserver struct { mut sync.Mutex // Guard readyNotifiers readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers - dispatcher *taskDispatcher[int64] - keylocks *lock.KeyLock[int64] + // loadingDispatcher updates targets for collections that are loading (also collections without a current target). + loadingDispatcher *taskDispatcher[int64] + // loadedDispatcher updates targets for loaded collections. + loadedDispatcher *taskDispatcher[int64] + + keylocks *lock.KeyLock[int64] startOnce sync.Once stopOnce sync.Once @@ -117,8 +121,8 @@ func NewTargetObserver( keylocks: lock.NewKeyLock[int64](), } - dispatcher := newTaskDispatcher(result.check) - result.dispatcher = dispatcher + result.loadingDispatcher = newTaskDispatcher(result.check) + result.loadedDispatcher = newTaskDispatcher(result.check) return result } @@ -127,7 +131,8 @@ func (ob *TargetObserver) Start() { ctx, cancel := context.WithCancel(context.Background()) ob.cancel = cancel - ob.dispatcher.Start() + ob.loadingDispatcher.Start() + ob.loadedDispatcher.Start() ob.wg.Add(1) go func() { @@ -147,7 +152,8 @@ func (ob *TargetObserver) Stop() { } ob.wg.Wait() - ob.dispatcher.Stop() + ob.loadingDispatcher.Stop() + ob.loadedDispatcher.Stop() }) } @@ -170,7 +176,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) { case <-ticker.C: ob.clean() - ob.dispatcher.AddTask(ob.meta.GetAll()...) + loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) { + if collection.GetStatus() == querypb.LoadStatus_Loaded { + return collection.GetCollectionID(), true + } + return 0, false + }) + ob.loadedDispatcher.AddTask(loaded...) case req := <-ob.updateChan: log.Info("manually trigger update target", @@ -220,13 +232,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) { func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool { result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID) if !result { - ob.dispatcher.AddTask(collectionID) + ob.loadingDispatcher.AddTask(collectionID) } return result } func (ob *TargetObserver) TriggerUpdateCurrentTarget(collectionID int64) { - ob.dispatcher.AddTask(collectionID) + ob.loadingDispatcher.AddTask(collectionID) } func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index dc269264d8a82..dcbd8ec5f247e 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -98,7 +98,9 @@ func (suite *TargetObserverSuite) SetupTest() { suite.collectionID = int64(1000) suite.partitionID = int64(100) - err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1)) + testCollection := utils.CreateTestCollection(suite.collectionID, 1) + testCollection.Status = querypb.LoadStatus_Loaded + err = suite.meta.CollectionManager.PutCollection(testCollection) suite.NoError(err) err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID)) suite.NoError(err) @@ -318,7 +320,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() { func (s *TargetObserverCheckSuite) TestCheck() { r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID) s.False(r) - s.True(s.observer.dispatcher.tasks.Contain(s.collectionID)) + s.False(s.observer.loadedDispatcher.tasks.Contain(s.collectionID)) + s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID)) } func TestTargetObserver(t *testing.T) {