From 10211ea0563e4f710e2559e209809610e2ac48a8 Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 6 Sep 2024 10:49:05 +0800 Subject: [PATCH] fix: Fix dynamic release partition may fail search/query request (#35919) (#36019) issue: #33550 pr: #35919 cause concurrent issue may occur between remove parition in target manager and sync segment list to delegator. when it happens, some segment may be released in delegator, and those segment may also be synced to delegator, which cause delegator become unserviceable due to lack of necessary segments, then search/query fails. this PR make sure that all write access to target_manager will be executed in serial to avoid the concurrent issues. Signed-off-by: Wei Liu --- internal/querycoordv2/job/job_release.go | 4 +- internal/querycoordv2/job/undo.go | 3 +- .../observers/collection_observer.go | 6 +- .../querycoordv2/observers/target_observer.go | 112 ++++++++++++------ .../observers/target_observer_test.go | 15 +++ 5 files changed, 98 insertions(+), 42 deletions(-) diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 821f99c68eb48..99811082a4e1e 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -97,7 +97,6 @@ func (job *ReleaseCollectionJob) Execute() error { log.Warn(msg, zap.Error(err)) } - job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() @@ -180,7 +179,6 @@ func (job *ReleasePartitionJob) Execute() error { if err != nil { log.Warn("failed to remove replicas", zap.Error(err)) } - job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) @@ -191,7 +189,7 @@ func (job *ReleasePartitionJob) Execute() error { log.Warn(msg, zap.Error(err)) return errors.Wrap(err, msg) } - job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...) + job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...) } metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease))) diff --git a/internal/querycoordv2/job/undo.go b/internal/querycoordv2/job/undo.go index 64b89bb78c2d2..21d29538639ac 100644 --- a/internal/querycoordv2/job/undo.go +++ b/internal/querycoordv2/job/undo.go @@ -78,10 +78,9 @@ func (u *UndoList) RollBack() { if u.IsTargetUpdated { if u.IsNewCollection { - u.targetMgr.RemoveCollection(u.CollectionID) u.targetObserver.ReleaseCollection(u.CollectionID) } else { - u.targetMgr.RemovePartition(u.CollectionID, u.LackPartitions...) + u.targetObserver.ReleasePartition(u.CollectionID, u.LackPartitions...) } } } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index f440d12a20260..10d4cf65a4094 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -180,7 +180,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Duration("loadTime", time.Since(collection.CreatedAt))) ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID()) ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID()) - ob.targetMgr.RemoveCollection(collection.GetCollectionID()) + ob.targetObserver.ReleaseCollection(collection.GetCollectionID()) ob.loadTasks.Remove(traceID) } case querypb.LoadType_LoadPartition: @@ -214,7 +214,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Int64s("partitionIDs", task.PartitionIDs)) for _, partition := range partitions { ob.meta.CollectionManager.RemovePartition(partition.CollectionID, partition.GetPartitionID()) - ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID()) + ob.targetObserver.ReleasePartition(partition.GetCollectionID(), partition.GetPartitionID()) } // all partition timeout, remove collection @@ -223,7 +223,7 @@ func (ob *CollectionObserver) observeTimeout() { ob.meta.CollectionManager.RemoveCollection(task.CollectionID) ob.meta.ReplicaManager.RemoveCollection(task.CollectionID) - ob.targetMgr.RemoveCollection(task.CollectionID) + ob.targetObserver.ReleaseCollection(task.CollectionID) } } } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index a430ed5f9f966..24bf930050e6a 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -38,15 +38,33 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type checkRequest struct { - CollectionID int64 - Notifier chan bool +type targetOp int + +func (op *targetOp) String() string { + switch *op { + case UpdateCollection: + return "UpdateCollection" + case ReleaseCollection: + return "ReleaseCollection" + case ReleasePartition: + return "ReleasePartition" + default: + return "Unknown" + } } +const ( + UpdateCollection targetOp = iota + 1 + ReleaseCollection + ReleasePartition +) + type targetUpdateRequest struct { CollectionID int64 + PartitionIDs []int64 Notifier chan error ReadyNotifier chan struct{} + opType targetOp } type initRequest struct{} @@ -60,8 +78,7 @@ type TargetObserver struct { broker meta.Broker cluster session.Cluster - initChan chan initRequest - manualCheck chan checkRequest + initChan chan initRequest // nextTargetLastUpdate map[int64]time.Time nextTargetLastUpdate *typeutil.ConcurrentMap[int64, time.Time] updateChan chan targetUpdateRequest @@ -88,9 +105,8 @@ func NewTargetObserver( distMgr: distMgr, broker: broker, cluster: cluster, - manualCheck: make(chan checkRequest, 10), nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](), - updateChan: make(chan targetUpdateRequest), + updateChan: make(chan targetUpdateRequest, 10), readyNotifiers: make(map[int64][]chan struct{}), initChan: make(chan initRequest), keylocks: lock.NewKeyLock[int64](), @@ -152,23 +168,44 @@ func (ob *TargetObserver) schedule(ctx context.Context) { ob.dispatcher.AddTask(ob.meta.GetAll()...) case req := <-ob.updateChan: - log := log.With(zap.Int64("collectionID", req.CollectionID)) - log.Info("manually trigger update next target") - ob.keylocks.Lock(req.CollectionID) - err := ob.updateNextTarget(req.CollectionID) - ob.keylocks.Unlock(req.CollectionID) - if err != nil { - log.Warn("failed to manually update next target", zap.Error(err)) - close(req.ReadyNotifier) - } else { + log.Info("manually trigger update target", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String()), + ) + switch req.opType { + case UpdateCollection: + ob.keylocks.Lock(req.CollectionID) + err := ob.updateNextTarget(req.CollectionID) + ob.keylocks.Unlock(req.CollectionID) + if err != nil { + log.Warn("failed to manually update next target", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String()), + zap.Error(err)) + close(req.ReadyNotifier) + } else { + ob.mut.Lock() + ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier) + ob.mut.Unlock() + } + req.Notifier <- err + case ReleaseCollection: ob.mut.Lock() - ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier) + for _, notifier := range ob.readyNotifiers[req.CollectionID] { + close(notifier) + } + delete(ob.readyNotifiers, req.CollectionID) ob.mut.Unlock() - } - log.Info("manually trigger update target done") - req.Notifier <- err - log.Info("notify manually trigger update target done") + ob.targetMgr.RemoveCollection(req.CollectionID) + req.Notifier <- nil + case ReleasePartition: + ob.targetMgr.RemovePartition(req.CollectionID, req.PartitionIDs...) + req.Notifier <- nil + } + log.Info("manually trigger update target done", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String())) } } } @@ -184,14 +221,6 @@ func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partiti } func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { - if !ob.meta.Exist(collectionID) { - ob.ReleaseCollection(collectionID) - ob.targetMgr.RemoveCollection(collectionID) - log.Info("collection has been removed from target observer", - zap.Int64("collectionID", collectionID)) - return - } - ob.keylocks.Lock(collectionID) defer ob.keylocks.Unlock(collectionID) @@ -229,6 +258,7 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e ob.updateChan <- targetUpdateRequest{ CollectionID: collectionID, + opType: UpdateCollection, Notifier: notifier, ReadyNotifier: readyCh, } @@ -236,12 +266,26 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e } func (ob *TargetObserver) ReleaseCollection(collectionID int64) { - ob.mut.Lock() - defer ob.mut.Unlock() - for _, notifier := range ob.readyNotifiers[collectionID] { - close(notifier) + notifier := make(chan error) + defer close(notifier) + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + opType: ReleaseCollection, + Notifier: notifier, + } + <-notifier +} + +func (ob *TargetObserver) ReleasePartition(collectionID int64, partitionID ...int64) { + notifier := make(chan error) + defer close(notifier) + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + PartitionIDs: partitionID, + opType: ReleasePartition, + Notifier: notifier, } - delete(ob.readyNotifiers, collectionID) + <-notifier } func (ob *TargetObserver) clean() { diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 825a2b28bba3a..45d804b0f897e 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -214,6 +215,20 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { }, 7*time.Second, 1*time.Second) } +func (suite *TargetObserverSuite) TestTriggerRelease() { + // Manually update next target + _, err := suite.observer.UpdateNextTarget(suite.collectionID) + suite.NoError(err) + + // manually release partition + partitions := suite.meta.CollectionManager.GetPartitionsByCollection(suite.collectionID) + partitionIDs := lo.Map(partitions, func(partition *meta.Partition, _ int) int64 { return partition.PartitionID }) + suite.observer.ReleasePartition(suite.collectionID, partitionIDs[0]) + + // manually release collection + suite.observer.ReleaseCollection(suite.collectionID) +} + func (suite *TargetObserverSuite) TearDownTest() { suite.kv.Close() suite.observer.Stop()