Skip to content

Commit

Permalink
fix: Fix load slowly (milvus-io#37454)
Browse files Browse the repository at this point in the history
When there're a lot of loaded collections, they would occupy the target
observer scheduler’s pool. This prevents loading collections from
updating the current target in time, slowing down the load process.
This PR adds a separate target dispatcher for loading collections.

issue: milvus-io#37166

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper committed Nov 21, 2024
1 parent 0bd2617 commit 9473ba8
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 11 deletions.
30 changes: 21 additions & 9 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,12 @@ type TargetObserver struct {
mut sync.Mutex // Guard readyNotifiers
readyNotifiers map[int64][]chan struct{} // CollectionID -> Notifiers

dispatcher *taskDispatcher[int64]
keylocks *lock.KeyLock[int64]
// loadingDispatcher updates targets for collections that are loading (also collections without a current target).
loadingDispatcher *taskDispatcher[int64]
// loadedDispatcher updates targets for loaded collections.
loadedDispatcher *taskDispatcher[int64]

keylocks *lock.KeyLock[int64]

startOnce sync.Once
stopOnce sync.Once
Expand Down Expand Up @@ -117,8 +121,8 @@ func NewTargetObserver(
keylocks: lock.NewKeyLock[int64](),
}

dispatcher := newTaskDispatcher(result.check)
result.dispatcher = dispatcher
result.loadingDispatcher = newTaskDispatcher(result.check)
result.loadedDispatcher = newTaskDispatcher(result.check)
return result
}

Expand All @@ -127,7 +131,8 @@ func (ob *TargetObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
ob.cancel = cancel

ob.dispatcher.Start()
ob.loadingDispatcher.Start()
ob.loadedDispatcher.Start()

ob.wg.Add(1)
go func() {
Expand All @@ -147,7 +152,8 @@ func (ob *TargetObserver) Stop() {
}
ob.wg.Wait()

ob.dispatcher.Stop()
ob.loadingDispatcher.Stop()
ob.loadedDispatcher.Stop()
})
}

Expand All @@ -170,7 +176,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) {

case <-ticker.C:
ob.clean()
ob.dispatcher.AddTask(ob.meta.GetAll()...)
loaded := lo.FilterMap(ob.meta.GetAllCollections(), func(collection *meta.Collection, _ int) (int64, bool) {
if collection.GetStatus() == querypb.LoadStatus_Loaded {
return collection.GetCollectionID(), true
}
return 0, false
})
ob.loadedDispatcher.AddTask(loaded...)

case req := <-ob.updateChan:
log.Info("manually trigger update target",
Expand Down Expand Up @@ -220,13 +232,13 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partitionID int64) bool {
result := ob.targetMgr.IsCurrentTargetExist(collectionID, partitionID)
if !result {
ob.dispatcher.AddTask(collectionID)
ob.loadingDispatcher.AddTask(collectionID)
}
return result
}

func (ob *TargetObserver) TriggerUpdateCurrentTarget(collectionID int64) {
ob.dispatcher.AddTask(collectionID)
ob.loadingDispatcher.AddTask(collectionID)
}

func (ob *TargetObserver) check(ctx context.Context, collectionID int64) {
Expand Down
7 changes: 5 additions & 2 deletions internal/querycoordv2/observers/target_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.collectionID = int64(1000)
suite.partitionID = int64(100)

err = suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(suite.collectionID, 1))
testCollection := utils.CreateTestCollection(suite.collectionID, 1)
testCollection.Status = querypb.LoadStatus_Loaded
err = suite.meta.CollectionManager.PutCollection(testCollection)
suite.NoError(err)
err = suite.meta.CollectionManager.PutPartition(utils.CreateTestPartition(suite.collectionID, suite.partitionID))
suite.NoError(err)
Expand Down Expand Up @@ -318,7 +320,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
func (s *TargetObserverCheckSuite) TestCheck() {
r := s.observer.Check(context.Background(), s.collectionID, common.AllPartitionsID)
s.False(r)
s.True(s.observer.dispatcher.tasks.Contain(s.collectionID))
s.False(s.observer.loadedDispatcher.tasks.Contain(s.collectionID))
s.True(s.observer.loadingDispatcher.tasks.Contain(s.collectionID))
}

func TestTargetObserver(t *testing.T) {
Expand Down

0 comments on commit 9473ba8

Please sign in to comment.