From 0b5a9060495ea95e08d58bc219d27d535ac077e3 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Thu, 26 Sep 2024 19:02:40 +0800 Subject: [PATCH] fix e2e Signed-off-by: Wei Liu --- .../querycoordv2/observers/target_observer.go | 17 ++++++++++++----- internal/querycoordv2/services.go | 17 +++++++++++++++++ internal/querycoordv2/utils/util.go | 4 ++-- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 8e69a329930af..f35c6bfa3a3b0 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -403,12 +403,19 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect continue } updateVersionAction := ob.checkNeedUpdateTargetVersion(ctx, leaderView) - if updateVersionAction != nil { - actions = append(actions, updateVersionAction) - } - if !ob.sync(ctx, replica, leaderView, actions) { - return false + if updateVersionAction == nil { + continue } + + actions = append(actions, updateVersionAction) + log.RatedInfo(10, "try to sync target version to delegator", + zap.Int64("collectionID", leaderView.CollectionID), + zap.String("channelName", leaderView.Channel), + zap.Int64("leaderID", leaderView.ID), + ) + ob.sync(ctx, replica, leaderView, actions) + + return false } } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 95ca6cead963a..9cd17e0da8793 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -112,6 +113,14 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio }, nil } + if !s.targetMgr.IsCurrentTargetExist(collectionID, common.AllPartitionsID) { + err := merr.WrapErrCollectionNotFullyLoaded(collectionID) + log.Warn("show collection failed", zap.Error(err)) + return &querypb.ShowCollectionsResponse{ + Status: merr.Status(err), + }, nil + } + if collection.IsRefreshed() { refreshProgress = 100 } @@ -174,6 +183,14 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions percentages = append(percentages, int64(percentage)) } + if !s.targetMgr.IsCurrentTargetExist(req.GetCollectionID(), common.AllPartitionsID) { + err := merr.WrapErrCollectionNotFullyLoaded(req.GetCollectionID()) + log.Warn("show partitions failed", zap.Error(err)) + return &querypb.ShowPartitionsResponse{ + Status: merr.Status(err), + }, nil + } + collection := s.meta.GetCollection(req.GetCollectionID()) if collection != nil && collection.IsRefreshed() { refreshProgress = 100 diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index f27b2843eb258..abfa7cb77d3db 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -72,10 +72,10 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetMan // if target version hasn't been synced, delegator will get empty readable segment list // so shard leader should be unserviceable until target version is synced currentTargetVersion := targetMgr.GetCollectionTargetVersion(leader.CollectionID, meta.CurrentTarget) - if leader.TargetVersion <= 0 || currentTargetVersion <= 0 || leader.TargetVersion != currentTargetVersion { + if leader.TargetVersion <= 0 { return merr.WrapErrServiceInternal( fmt.Sprintf("target version mismatch, collection: %d, current target version: %v, leader version: %v", - leader.CollectionID, currentTargetVersion, leader)) + leader.CollectionID, currentTargetVersion, leader.TargetVersion)) } segmentDist := targetMgr.GetSealedSegmentsByChannel(leader.CollectionID, leader.Channel, meta.CurrentTarget)