diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 4140fec8753c2..ba26c5f200e19 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -38,6 +38,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type TriggerUpdateTargetVersion = func(collectionID int64) @@ -194,6 +195,8 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons channels := lo.SliceToMap(resp.GetChannels(), func(channel *querypb.ChannelVersionInfo) (string, *querypb.ChannelVersionInfo) { return channel.GetChannel(), channel }) + + collectionsToSync := typeutil.NewUniqueSet() for _, lview := range resp.GetLeaderViews() { segments := make(map[int64]*meta.Segment) @@ -246,9 +249,10 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v", lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion)) - // segment and channel already loaded, trigger target observer to check target version - dh.syncTargetVersionFn(lview.GetCollection()) view.UnServiceableError = err + // make dist handler pull next distribution until all delegator is serviceable + dh.lastUpdateTs = 0 + collectionsToSync.Insert(lview.Collection) log.Info("leader is not available due to target version not ready", zap.Int64("collectionID", view.CollectionID), zap.Int64("nodeID", view.ID), @@ -258,6 +262,12 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons } dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...) + + // segment and channel already loaded, trigger target observer to update + collectionsToSync.Range(func(collection int64) bool { + dh.syncTargetVersionFn(collection) + return true + }) } func (dh *distHandler) getDistribution(ctx context.Context) (*querypb.GetDataDistributionResponse, error) { diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index eaf6c0de3c2f5..87caa9e91c036 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -374,16 +374,16 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect channelReadyLeaders := lo.Filter(ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel)), func(leader *meta.LeaderView, _ int) bool { return utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, leader, meta.NextTarget) == nil }) - collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...) // to avoid stuck here in dynamic increase replica case, we just check available delegator number - if int32(len(collectionReadyLeaders)) < replicaNum { + if int32(len(channelReadyLeaders)) < replicaNum { log.RatedInfo(10, "channel not ready", zap.Int("readyReplicaNum", len(channelReadyLeaders)), zap.String("channelName", channel), ) return false } + collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...) } var collectionInfo *milvuspb.DescribeCollectionResponse diff --git a/tests/integration/target/target_test.go b/tests/integration/target/target_test.go index 3434a6f2c2084..43166330503f2 100644 --- a/tests/integration/target/target_test.go +++ b/tests/integration/target/target_test.go @@ -147,7 +147,6 @@ func (s *TargetTestSuit) TestQueryCoordRestart() { s.initCollection(name, 1, 2, 2, 2000) ctx := context.Background() - info, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ Base: commonpbutil.NewMsgBase(), CollectionName: name, @@ -156,6 +155,16 @@ func (s *TargetTestSuit) TestQueryCoordRestart() { s.True(merr.Ok(info.GetStatus())) collectionID := info.GetCollectionID() + // wait until all shards are ready + // cause showCollections won't just wait all collection becomes loaded, proxy will use retry to block until all shard are ready + s.Eventually(func() bool { + resp, err := s.Cluster.QueryCoord.GetShardLeaders(ctx, &querypb.GetShardLeadersRequest{ + Base: commonpbutil.NewMsgBase(), + CollectionID: collectionID, + }) + return err == nil && merr.Ok(resp.GetStatus()) && len(resp.Shards) == 2 + }, 60*time.Second, 1*time.Second) + // trigger old coord stop s.Cluster.StopQueryCoord()