diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 821f99c68eb48..99811082a4e1e 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -97,7 +97,6 @@ func (job *ReleaseCollectionJob) Execute() error { log.Warn(msg, zap.Error(err)) } - job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() @@ -180,7 +179,6 @@ func (job *ReleasePartitionJob) Execute() error { if err != nil { log.Warn("failed to remove replicas", zap.Error(err)) } - job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) @@ -191,7 +189,7 @@ func (job *ReleasePartitionJob) Execute() error { log.Warn(msg, zap.Error(err)) return errors.Wrap(err, msg) } - job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...) + job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...) } metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease))) diff --git a/internal/querycoordv2/job/undo.go b/internal/querycoordv2/job/undo.go index 64b89bb78c2d2..21d29538639ac 100644 --- a/internal/querycoordv2/job/undo.go +++ b/internal/querycoordv2/job/undo.go @@ -78,10 +78,9 @@ func (u *UndoList) RollBack() { if u.IsTargetUpdated { if u.IsNewCollection { - u.targetMgr.RemoveCollection(u.CollectionID) u.targetObserver.ReleaseCollection(u.CollectionID) } else { - u.targetMgr.RemovePartition(u.CollectionID, u.LackPartitions...) + u.targetObserver.ReleasePartition(u.CollectionID, u.LackPartitions...) } } } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index f440d12a20260..10d4cf65a4094 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -180,7 +180,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Duration("loadTime", time.Since(collection.CreatedAt))) ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID()) ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID()) - ob.targetMgr.RemoveCollection(collection.GetCollectionID()) + ob.targetObserver.ReleaseCollection(collection.GetCollectionID()) ob.loadTasks.Remove(traceID) } case querypb.LoadType_LoadPartition: @@ -214,7 +214,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Int64s("partitionIDs", task.PartitionIDs)) for _, partition := range partitions { ob.meta.CollectionManager.RemovePartition(partition.CollectionID, partition.GetPartitionID()) - ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID()) + ob.targetObserver.ReleasePartition(partition.GetCollectionID(), partition.GetPartitionID()) } // all partition timeout, remove collection @@ -223,7 +223,7 @@ func (ob *CollectionObserver) observeTimeout() { ob.meta.CollectionManager.RemoveCollection(task.CollectionID) ob.meta.ReplicaManager.RemoveCollection(task.CollectionID) - ob.targetMgr.RemoveCollection(task.CollectionID) + ob.targetObserver.ReleaseCollection(task.CollectionID) } } } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index a430ed5f9f966..24bf930050e6a 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -38,15 +38,33 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type checkRequest struct { - CollectionID int64 - Notifier chan bool +type targetOp int + +func (op *targetOp) String() string { + switch *op { + case UpdateCollection: + return "UpdateCollection" + case ReleaseCollection: + return "ReleaseCollection" + case ReleasePartition: + return "ReleasePartition" + default: + return "Unknown" + } } +const ( + UpdateCollection targetOp = iota + 1 + ReleaseCollection + ReleasePartition +) + type targetUpdateRequest struct { CollectionID int64 + PartitionIDs []int64 Notifier chan error ReadyNotifier chan struct{} + opType targetOp } type initRequest struct{} @@ -60,8 +78,7 @@ type TargetObserver struct { broker meta.Broker cluster session.Cluster - initChan chan initRequest - manualCheck chan checkRequest + initChan chan initRequest // nextTargetLastUpdate map[int64]time.Time nextTargetLastUpdate *typeutil.ConcurrentMap[int64, time.Time] updateChan chan targetUpdateRequest @@ -88,9 +105,8 @@ func NewTargetObserver( distMgr: distMgr, broker: broker, cluster: cluster, - manualCheck: make(chan checkRequest, 10), nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](), - updateChan: make(chan targetUpdateRequest), + updateChan: make(chan targetUpdateRequest, 10), readyNotifiers: make(map[int64][]chan struct{}), initChan: make(chan initRequest), keylocks: lock.NewKeyLock[int64](), @@ -152,23 +168,44 @@ func (ob *TargetObserver) schedule(ctx context.Context) { ob.dispatcher.AddTask(ob.meta.GetAll()...) case req := <-ob.updateChan: - log := log.With(zap.Int64("collectionID", req.CollectionID)) - log.Info("manually trigger update next target") - ob.keylocks.Lock(req.CollectionID) - err := ob.updateNextTarget(req.CollectionID) - ob.keylocks.Unlock(req.CollectionID) - if err != nil { - log.Warn("failed to manually update next target", zap.Error(err)) - close(req.ReadyNotifier) - } else { + log.Info("manually trigger update target", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String()), + ) + switch req.opType { + case UpdateCollection: + ob.keylocks.Lock(req.CollectionID) + err := ob.updateNextTarget(req.CollectionID) + ob.keylocks.Unlock(req.CollectionID) + if err != nil { + log.Warn("failed to manually update next target", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String()), + zap.Error(err)) + close(req.ReadyNotifier) + } else { + ob.mut.Lock() + ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier) + ob.mut.Unlock() + } + req.Notifier <- err + case ReleaseCollection: ob.mut.Lock() - ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier) + for _, notifier := range ob.readyNotifiers[req.CollectionID] { + close(notifier) + } + delete(ob.readyNotifiers, req.CollectionID) ob.mut.Unlock() - } - log.Info("manually trigger update target done") - req.Notifier <- err - log.Info("notify manually trigger update target done") + ob.targetMgr.RemoveCollection(req.CollectionID) + req.Notifier <- nil + case ReleasePartition: + ob.targetMgr.RemovePartition(req.CollectionID, req.PartitionIDs...) + req.Notifier <- nil + } + log.Info("manually trigger update target done", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String())) } } } @@ -184,14 +221,6 @@ func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partiti } func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { - if !ob.meta.Exist(collectionID) { - ob.ReleaseCollection(collectionID) - ob.targetMgr.RemoveCollection(collectionID) - log.Info("collection has been removed from target observer", - zap.Int64("collectionID", collectionID)) - return - } - ob.keylocks.Lock(collectionID) defer ob.keylocks.Unlock(collectionID) @@ -229,6 +258,7 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e ob.updateChan <- targetUpdateRequest{ CollectionID: collectionID, + opType: UpdateCollection, Notifier: notifier, ReadyNotifier: readyCh, } @@ -236,12 +266,26 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e } func (ob *TargetObserver) ReleaseCollection(collectionID int64) { - ob.mut.Lock() - defer ob.mut.Unlock() - for _, notifier := range ob.readyNotifiers[collectionID] { - close(notifier) + notifier := make(chan error) + defer close(notifier) + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + opType: ReleaseCollection, + Notifier: notifier, + } + <-notifier +} + +func (ob *TargetObserver) ReleasePartition(collectionID int64, partitionID ...int64) { + notifier := make(chan error) + defer close(notifier) + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + PartitionIDs: partitionID, + opType: ReleasePartition, + Notifier: notifier, } - delete(ob.readyNotifiers, collectionID) + <-notifier } func (ob *TargetObserver) clean() { diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 825a2b28bba3a..45d804b0f897e 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -214,6 +215,20 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { }, 7*time.Second, 1*time.Second) } +func (suite *TargetObserverSuite) TestTriggerRelease() { + // Manually update next target + _, err := suite.observer.UpdateNextTarget(suite.collectionID) + suite.NoError(err) + + // manually release partition + partitions := suite.meta.CollectionManager.GetPartitionsByCollection(suite.collectionID) + partitionIDs := lo.Map(partitions, func(partition *meta.Partition, _ int) int64 { return partition.PartitionID }) + suite.observer.ReleasePartition(suite.collectionID, partitionIDs[0]) + + // manually release collection + suite.observer.ReleaseCollection(suite.collectionID) +} + func (suite *TargetObserverSuite) TearDownTest() { suite.kv.Close() suite.observer.Stop()