Skip to content

Commit

Permalink
enhance: optimize describe collection and index
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofanluan <[email protected]>
  • Loading branch information
xiaofan-luan committed Nov 7, 2024
1 parent 5310d34 commit 6d29d39
Showing 1 changed file with 37 additions and 20 deletions.
57 changes: 37 additions & 20 deletions internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
Expand Down Expand Up @@ -390,6 +392,10 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect

replicas := ob.meta.ReplicaManager.GetByCollection(collectionID)
actions := make([]*querypb.SyncAction, 0, 1)
var collectionInfo *milvuspb.DescribeCollectionResponse
var partitions []int64
var indexInfo []*indexpb.IndexInfo
var err error
for _, replica := range replicas {
leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
Expand All @@ -406,7 +412,34 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
if updateVersionAction != nil {
actions = append(actions, updateVersionAction)
}
if !ob.sync(ctx, replica, leaderView, actions) {

if len(actions) == 0 {
continue
}

// init all the meta information
if collectionInfo == nil {
collectionInfo, err = ob.broker.DescribeCollection(ctx, collectionID)
if err != nil {
log.Warn("failed to get collection info", zap.Error(err))
return false
}

partitions, err = utils.GetPartitions(ob.meta.CollectionManager, collectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return false
}

// Get collection index info
indexInfo, err = ob.broker.ListIndexes(ctx, collectionID)
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}
}

if !ob.sync(ctx, replica, leaderView, actions, collectionInfo, partitions, indexInfo) {
return false
}
}
Expand All @@ -415,7 +448,9 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
return true
}

func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool {
func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leaderView *meta.LeaderView, diffs []*querypb.SyncAction,
collectionInfo *milvuspb.DescribeCollectionResponse, partitions []int64, indexInfo []*indexpb.IndexInfo,
) bool {
if len(diffs) == 0 {
return true
}
Expand All @@ -427,24 +462,6 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade
zap.String("channel", leaderView.Channel),
)

collectionInfo, err := ob.broker.DescribeCollection(ctx, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get collection info", zap.Error(err))
return false
}
partitions, err := utils.GetPartitions(ob.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return false
}

// Get collection index info
indexInfo, err := ob.broker.ListIndexes(ctx, collectionInfo.GetCollectionID())
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}

req := &querypb.SyncDistributionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),
Expand Down

0 comments on commit 6d29d39

Please sign in to comment.