From 1bd502b585a5f29921d63b296ba5c5f2daa568aa Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 15 Nov 2024 14:52:30 +0800 Subject: [PATCH] fix: Delegator stuck at unserviceable status (#37694) (#37702) issue: #37679 pr: #37694 pr #36549 introduce the logic error which update current target when only parts of channel is ready. This PR fix the logic error and let dist handler keep pull distribution on querynode until all delegator becomes serviceable. Signed-off-by: Wei Liu --- internal/querycoordv2/dist/dist_handler.go | 14 ++++++++++++-- internal/querycoordv2/observers/target_observer.go | 4 ++-- tests/integration/target/target_test.go | 11 ++++++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 4140fec8753c2..ba26c5f200e19 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type TriggerUpdateTargetVersion = func(collectionID int64) @@ -194,6 +195,8 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons channels := lo.SliceToMap(resp.GetChannels(), func(channel *querypb.ChannelVersionInfo) (string, *querypb.ChannelVersionInfo) { return channel.GetChannel(), channel }) + + collectionsToSync := typeutil.NewUniqueSet() for _, lview := range resp.GetLeaderViews() { segments := make(map[int64]*meta.Segment) @@ -246,9 +249,10 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v", lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion)) - // segment and channel already loaded, trigger target observer to check target version - dh.syncTargetVersionFn(lview.GetCollection()) view.UnServiceableError = err + // make dist handler pull next distribution until all delegator is serviceable + dh.lastUpdateTs = 0 + collectionsToSync.Insert(lview.Collection) log.Info("leader is not available due to target version not ready", zap.Int64("collectionID", view.CollectionID), zap.Int64("nodeID", view.ID), @@ -258,6 +262,12 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons } dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...) + + // segment and channel already loaded, trigger target observer to update + collectionsToSync.Range(func(collection int64) bool { + dh.syncTargetVersionFn(collection) + return true + }) } func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDistributionResponse, error) { diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index eaf6c0de3c2f5..87caa9e91c036 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -374,16 +374,16 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect channelReadyLeaders := lo.Filter(ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel)), func(leader *meta.LeaderView, _ int) bool { return utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, leader, meta.NextTarget) == nil }) - collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...) // to avoid stuck here in dynamic increase replica case, we just check available delegator number - if int32(len(collectionReadyLeaders)) < replicaNum { + if int32(len(channelReadyLeaders)) < replicaNum { log.RatedInfo(10, "channel not ready", zap.Int("readyReplicaNum", len(channelReadyLeaders)), zap.String("channelName", channel), ) return false } + collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...) } var collectionInfo *milvuspb.DescribeCollectionResponse diff --git a/tests/integration/target/target_test.go b/tests/integration/target/target_test.go index 3434a6f2c2084..43166330503f2 100644 --- a/tests/integration/target/target_test.go +++ b/tests/integration/target/target_test.go @@ -147,7 +147,6 @@ func (s *TargetTestSuit) TestQueryCoordRestart() { s.initCollection(name, 1, 2, 2, 2000) ctx := context.Background() - info, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ Base: commonpbutil.NewMsgBase(), CollectionName: name, @@ -156,6 +155,16 @@ func (s *TargetTestSuit) TestQueryCoordRestart() { s.True(merr.Ok(info.GetStatus())) collectionID := info.GetCollectionID() + // wait until all shards are ready + // cause showCollections won't just wait all collection becomes loaded, proxy will use retry to block until all shard are ready + s.Eventually(func() bool { + resp, err := s.Cluster.QueryCoord.GetShardLeaders(ctx, &querypb.GetShardLeadersRequest{ + Base: commonpbutil.NewMsgBase(), + CollectionID: collectionID, + }) + return err == nil && merr.Ok(resp.GetStatus()) && len(resp.Shards) == 2 + }, 60*time.Second, 1*time.Second) + // trigger old coord stop s.Cluster.StopQueryCoord()