diff --git a/internal/querycoordv2/observers/target_observer.go b/internal/querycoordv2/observers/target_observer.go index 8e69a329930af..83bddf48360bf 100644 --- a/internal/querycoordv2/observers/target_observer.go +++ b/internal/querycoordv2/observers/target_observer.go @@ -26,6 +26,8 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/params" @@ -390,6 +392,10 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect replicas := ob.meta.ReplicaManager.GetByCollection(collectionID) actions := make([]*querypb.SyncAction, 0, 1) + var collectionInfo *milvuspb.DescribeCollectionResponse + var partitions []int64 + var indexInfo []*indexpb.IndexInfo + var err error for _, replica := range replicas { leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica) for ch, leaderID := range leaders { @@ -406,7 +412,34 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect if updateVersionAction != nil { actions = append(actions, updateVersionAction) } - if !ob.sync(ctx, replica, leaderView, actions) { + + if len(actions) == 0 { + continue + } + + // init all the meta information + if collectionInfo == nil { + collectionInfo, err = ob.broker.DescribeCollection(ctx, collectionID) + if err != nil { + log.Warn("failed to get collection info", zap.Error(err)) + return false + } + + partitions, err = utils.GetPartitions(ob.meta.CollectionManager, collectionID) + if err != nil { + log.Warn("failed to get partitions", zap.Error(err)) + return false + } + + // Get collection index info + indexInfo, err = ob.broker.ListIndexes(ctx, collectionID) + if err != nil { + log.Warn("fail to get index info of collection", zap.Error(err)) + return false + } + } + + if !ob.sync(ctx, replica, leaderView, actions, collectionInfo, partitions, indexInfo) { return false } } @@ -415,7 +448,9 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect return true } -func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool { +func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction, + collectionInfo *milvuspb.DescribeCollectionResponse, partitions []int64, indexInfo []*indexpb.IndexInfo, +) bool { if len(diffs) == 0 { return true } @@ -427,24 +462,6 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade zap.String("channel", leaderView.Channel), ) - collectionInfo, err := ob.broker.DescribeCollection(ctx, leaderView.CollectionID) - if err != nil { - log.Warn("failed to get collection info", zap.Error(err)) - return false - } - partitions, err := utils.GetPartitions(ob.meta.CollectionManager, leaderView.CollectionID) - if err != nil { - log.Warn("failed to get partitions", zap.Error(err)) - return false - } - - // Get collection index info - indexInfo, err := ob.broker.ListIndexes(ctx, collectionInfo.GetCollectionID()) - if err != nil { - log.Warn("fail to get index info of collection", zap.Error(err)) - return false - } - req := &querypb.SyncDistributionRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),