From 7ee41a84f24d512475ce7ed74c405c4e07d9723f Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 3 Sep 2024 11:48:05 +0800 Subject: [PATCH] fix: Fix dynamic release partition may fail search/query request cause concurrent issue may occur between remove parition in target manager and sync segment list to delegator. when it happens, some segment may be released in delegator, and those segment may also be synced to delegator, which cause delegator become unserviceable due to lack of necessary segments, then search/query fails. this PR make sure that all write access to target_manager will be executed in serial to avoid the concurrent issues. Signed-off-by: Wei Liu --- internal/querycoordv2/job/job_release.go | 4 +- internal/querycoordv2/job/undo.go | 3 +- .../observers/collection_observer.go | 6 +- .../querycoordv2/observers/target_observer.go | 97 ++++++++++++++----- .../observers/target_observer_test.go | 15 +++ 5 files changed, 95 insertions(+), 30 deletions(-) diff --git a/internal/querycoordv2/job/job_release.go b/internal/querycoordv2/job/job_release.go index 49841510312f1..4551b504dccc6 100644 --- a/internal/querycoordv2/job/job_release.go +++ b/internal/querycoordv2/job/job_release.go @@ -95,7 +95,6 @@ func (job *ReleaseCollectionJob) Execute() error { log.Warn(msg, zap.Error(err)) } - job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() @@ -178,7 +177,6 @@ func (job *ReleasePartitionJob) Execute() error { if err != nil { log.Warn("failed to remove replicas", zap.Error(err)) } - job.targetMgr.RemoveCollection(req.GetCollectionID()) job.targetObserver.ReleaseCollection(req.GetCollectionID()) metrics.QueryCoordNumCollections.WithLabelValues().Dec() waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID()) @@ -189,7 +187,7 @@ func (job *ReleasePartitionJob) Execute() error { log.Warn(msg, zap.Error(err)) return errors.Wrap(err, msg) } - job.targetMgr.RemovePartition(req.GetCollectionID(), toRelease...) + job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...) waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...) } metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease))) diff --git a/internal/querycoordv2/job/undo.go b/internal/querycoordv2/job/undo.go index 6ac0b62296b3b..e1314f0aec6e0 100644 --- a/internal/querycoordv2/job/undo.go +++ b/internal/querycoordv2/job/undo.go @@ -78,10 +78,9 @@ func (u *UndoList) RollBack() { if u.IsTargetUpdated { if u.IsNewCollection { - u.targetMgr.RemoveCollection(u.CollectionID) u.targetObserver.ReleaseCollection(u.CollectionID) } else { - u.targetMgr.RemovePartition(u.CollectionID, u.LackPartitions...) + u.targetObserver.ReleasePartition(u.CollectionID, u.LackPartitions...) } } } diff --git a/internal/querycoordv2/observers/collection_observer.go b/internal/querycoordv2/observers/collection_observer.go index 08c77858d3752..843e3a4d55065 100644 --- a/internal/querycoordv2/observers/collection_observer.go +++ b/internal/querycoordv2/observers/collection_observer.go @@ -180,7 +180,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Duration("loadTime", time.Since(collection.CreatedAt))) ob.meta.CollectionManager.RemoveCollection(collection.GetCollectionID()) ob.meta.ReplicaManager.RemoveCollection(collection.GetCollectionID()) - ob.targetMgr.RemoveCollection(collection.GetCollectionID()) + ob.targetObserver.ReleaseCollection(collection.GetCollectionID()) ob.loadTasks.Remove(traceID) } case querypb.LoadType_LoadPartition: @@ -214,7 +214,7 @@ func (ob *CollectionObserver) observeTimeout() { zap.Int64s("partitionIDs", task.PartitionIDs)) for _, partition := range partitions { ob.meta.CollectionManager.RemovePartition(partition.CollectionID, partition.GetPartitionID()) - ob.targetMgr.RemovePartition(partition.GetCollectionID(), partition.GetPartitionID()) + ob.targetObserver.ReleasePartition(partition.GetCollectionID(), partition.GetPartitionID()) } // all partition timeout, remove collection @@ -223,7 +223,7 @@ func (ob *CollectionObserver) observeTimeout() { ob.meta.CollectionManager.RemoveCollection(task.CollectionID) ob.meta.ReplicaManager.RemoveCollection(task.CollectionID) - ob.targetMgr.RemoveCollection(task.CollectionID) + ob.targetObserver.ReleaseCollection(task.CollectionID) } } } diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 5e6f7ede3c8b6..8b07415ebad69 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -38,15 +38,33 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type checkRequest struct { - CollectionID int64 - Notifier chan bool +type targetOp int + +func (op *targetOp) String() string { + switch *op { + case UpdateCollection: + return "UpdateCollection" + case ReleaseCollection: + return "ReleaseCollection" + case ReleasePartition: + return "ReleasePartition" + default: + return "Unknown" + } } +const ( + UpdateCollection targetOp = iota + 1 + ReleaseCollection + ReleasePartition +) + type targetUpdateRequest struct { CollectionID int64 + PartitionIDs []int64 Notifier chan error ReadyNotifier chan struct{} + opType targetOp } type initRequest struct{} @@ -60,8 +78,7 @@ type TargetObserver struct { broker meta.Broker cluster session.Cluster - initChan chan initRequest - manualCheck chan checkRequest + initChan chan initRequest // nextTargetLastUpdate map[int64]time.Time nextTargetLastUpdate *typeutil.ConcurrentMap[int64, time.Time] updateChan chan targetUpdateRequest @@ -88,7 +105,6 @@ func NewTargetObserver( distMgr: distMgr, broker: broker, cluster: cluster, - manualCheck: make(chan checkRequest, 10), nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](), updateChan: make(chan targetUpdateRequest), readyNotifiers: make(map[int64][]chan struct{}), @@ -152,23 +168,37 @@ func (ob *TargetObserver) schedule(ctx context.Context) { ob.dispatcher.AddTask(ob.meta.GetAll()...) case req := <-ob.updateChan: - log := log.With(zap.Int64("collectionID", req.CollectionID)) - log.Info("manually trigger update next target") - ob.keylocks.Lock(req.CollectionID) - err := ob.updateNextTarget(req.CollectionID) - ob.keylocks.Unlock(req.CollectionID) - if err != nil { - log.Warn("failed to manually update next target", zap.Error(err)) - close(req.ReadyNotifier) - } else { - ob.mut.Lock() - ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier) - ob.mut.Unlock() + log.Info("manually trigger update target", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String()), + ) + switch req.opType { + case UpdateCollection: + ob.keylocks.Lock(req.CollectionID) + err := ob.updateNextTarget(req.CollectionID) + ob.keylocks.Unlock(req.CollectionID) + if err != nil { + log.Warn("failed to manually update next target", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String()), + zap.Error(err)) + close(req.ReadyNotifier) + } else { + ob.mut.Lock() + ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier) + ob.mut.Unlock() + } + req.Notifier <- err + case ReleaseCollection: + ob.targetMgr.RemoveCollection(req.CollectionID) + req.Notifier <- nil + case ReleasePartition: + ob.targetMgr.RemovePartition(req.CollectionID, req.PartitionIDs...) + req.Notifier <- nil } - - log.Info("manually trigger update target done") - req.Notifier <- err - log.Info("notify manually trigger update target done") + log.Info("manually trigger update target done", + zap.Int64("collectionID", req.CollectionID), + zap.String("opType", req.opType.String())) } } } @@ -229,6 +259,7 @@ func (ob *TargetObserver) UpdateNextTarget(collectionID int64) (chan struct{}, e ob.updateChan <- targetUpdateRequest{ CollectionID: collectionID, + opType: UpdateCollection, Notifier: notifier, ReadyNotifier: readyCh, } @@ -242,6 +273,28 @@ func (ob *TargetObserver) ReleaseCollection(collectionID int64) { close(notifier) } delete(ob.readyNotifiers, collectionID) + + notifier := make(chan error) + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + opType: ReleaseCollection, + Notifier: notifier, + } + <-notifier +} + +func (ob *TargetObserver) ReleasePartition(collectionID int64, partitionID ...int64) { + ob.mut.Lock() + defer ob.mut.Unlock() + + notifier := make(chan error) + ob.updateChan <- targetUpdateRequest{ + CollectionID: collectionID, + PartitionIDs: partitionID, + opType: ReleasePartition, + Notifier: notifier, + } + <-notifier } func (ob *TargetObserver) clean() { diff --git a/internal/querycoordv2/observers/target_observer_test.go b/internal/querycoordv2/observers/target_observer_test.go index 825a2b28bba3a..45d804b0f897e 100644 --- a/internal/querycoordv2/observers/target_observer_test.go +++ b/internal/querycoordv2/observers/target_observer_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -214,6 +215,20 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() { }, 7*time.Second, 1*time.Second) } +func (suite *TargetObserverSuite) TestTriggerRelease() { + // Manually update next target + _, err := suite.observer.UpdateNextTarget(suite.collectionID) + suite.NoError(err) + + // manually release partition + partitions := suite.meta.CollectionManager.GetPartitionsByCollection(suite.collectionID) + partitionIDs := lo.Map(partitions, func(partition *meta.Partition, _ int) int64 { return partition.PartitionID }) + suite.observer.ReleasePartition(suite.collectionID, partitionIDs[0]) + + // manually release collection + suite.observer.ReleaseCollection(suite.collectionID) +} + func (suite *TargetObserverSuite) TearDownTest() { suite.kv.Close() suite.observer.Stop()