Skip to content

Commit

Permalink
enhance: refine meta cache log and logic
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofanluan <[email protected]>
  • Loading branch information
xiaofan-luan committed Oct 30, 2024
1 parent 7961568 commit 5e13bd7
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 33 deletions.
58 changes: 28 additions & 30 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,41 +120,39 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p

log.Info("received request to invalidate collection meta cache")

collectionName := request.CollectionName
collectionID := request.CollectionID
msgType := request.GetBase().GetMsgType()
var aliasName []string
var aliasNames []string

if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
}
log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", collectionName))
case commonpb.MsgType_DropPartition:
if collectionName != "" && request.GetPartitionName() != "" {
globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName())
} else {
log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty",
zap.String("collectionName", collectionName),
zap.String("partitionName", request.GetPartitionName()))
return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil
}
case commonpb.MsgType_LoadCollection:
// this is to update partial field load info.
aliasNames = globalMetaCache.RemoveCollectionsByID(ctx, request.GetCollectionID())
log.Info("complete to invalidate collection meta cache with on load", zap.Int64("collectionID", request.GetCollectionID()), zap.Strings("names", aliasNames))
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_AlterCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias:
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), request.GetCollectionName()) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), request.GetCollectionName())
log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", request.GetCollectionName()))
case commonpb.MsgType_CreatePartition, commonpb.MsgType_DropPartition:
// here we only clean meta cache but do not change shard cache. Because under both cases
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), request.GetCollectionName())
log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", request.GetCollectionName()))
case commonpb.MsgType_DropDatabase:
globalMetaCache.RemoveDatabase(ctx, request.GetDbName())
default:
log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String()))

if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
log.Warn("receive unexpected msgType of invalidate collection meta cache",
zap.String("msgType", request.GetBase().GetMsgType().String()), zap.Int64("collectionID", request.GetCollectionID()),
zap.String("names", request.GetCollectionName()))

// just reset all cache if we don't know what is going on here, should not happen
if request.GetCollectionName() != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), request.GetCollectionName()) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), request.GetCollectionName())
} else if request.GetCollectionID() != UniqueID(0) {
aliasNames = globalMetaCache.RemoveCollectionsByID(ctx, request.GetCollectionID())
for _, alias := range aliasNames {
globalMetaCache.DeprecateShardCache(request.GetDbName(), alias)
}
}
}
}
Expand All @@ -163,8 +161,8 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p
// no need to handle error, since this Proxy may not create dml stream for the collection.
node.chMgr.removeDMLStream(request.GetCollectionID())
// clean up collection level metrics
metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), collectionName)
for _, alias := range aliasName {
metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), request.GetCollectionName())
for _, alias := range aliasNames {
metrics.CleanupProxyCollectionMetrics(paramtable.GetNodeID(), alias)
}
DeregisterSubLabel(ratelimitutil.GetCollectionSubLabel(request.GetDbName(), request.GetCollectionName()))
Expand Down
12 changes: 9 additions & 3 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,18 +853,20 @@ func (m *MetaCache) RemoveCollection(ctx context.Context, database, collectionNa
_, dbOk := m.collInfo[database]
if dbOk {
delete(m.collInfo[database], collectionName)
log.Info("Removed collection from cache", zap.String("database", database), zap.String("collectionName", collectionName))
}
}

func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string {
m.mu.Lock()
defer m.mu.Unlock()
var collNames []string
for database, db := range m.collInfo {
for k, v := range db {
for database, dbCollection := range m.collInfo {
for k, v := range dbCollection {
if v.collID == collectionID {
delete(m.collInfo[database], k)
collNames = append(collNames, k)
log.Info("Removed collection by ID from cache", zap.String("database", database), zap.String("collectionName", k))
}
}
}
Expand Down Expand Up @@ -896,6 +898,7 @@ func (m *MetaCache) RemovePartition(ctx context.Context, database, collectionNam
})

m.collInfo[database][collectionName].partInfo = parsePartitionsInfo(filteredInfos, collInfo.schema.hasPartitionKeyField)
log.Info("Removed partition from cache", zap.String("database", database), zap.String("collectionName", collectionName), zap.String("partitionName", partitionName))
}

// GetCredentialInfo returns the credential related to provided username
Expand Down Expand Up @@ -931,6 +934,7 @@ func (m *MetaCache) RemoveCredential(username string) {
defer m.credMut.Unlock()
// delete pair in credMap
delete(m.credMap, username)
log.Info("Removed credential from cache", zap.String("username", username))
}

func (m *MetaCache) UpdateCredential(credInfo *internalpb.CredentialInfo) {
Expand All @@ -945,6 +949,7 @@ func (m *MetaCache) UpdateCredential(credInfo *internalpb.CredentialInfo) {
// Do not cache encrypted password content
m.credMap[username].Username = username
m.credMap[username].Sha256Password = credInfo.Sha256Password
log.Info("Updated credential in cache", zap.String("username", username))
}

// GetShards update cache if withCache == false
Expand Down Expand Up @@ -1038,7 +1043,7 @@ func parseShardLeaderList2QueryNode(shardsLeaders []*querypb.ShardLeadersList) m

// DeprecateShardCache clear the shard leader cache of a collection
func (m *MetaCache) DeprecateShardCache(database, collectionName string) {
log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName))
log.Info("Deprecate shard cache for collection", zap.String("collectionName", collectionName))
if shards, ok := m.getCollectionShardLeader(database, collectionName); ok {
shards.deprecated.Store(true)
}
Expand Down Expand Up @@ -1186,6 +1191,7 @@ func (m *MetaCache) RemoveDatabase(ctx context.Context, database string) {
m.leaderMut.Lock()
delete(m.collLeader, database)
m.leaderMut.Unlock()
log.Info("Removed database from cache", zap.String("database", database))
}

func (m *MetaCache) HasDatabase(ctx context.Context, database string) bool {
Expand Down
2 changes: 2 additions & 0 deletions internal/rootcoord/create_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
)
Expand Down Expand Up @@ -84,6 +85,7 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {
collectionID: t.collMeta.CollectionID,
partitionName: t.Req.GetPartitionName(),
ts: t.GetTs(),
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_CreatePartition)},
}, &nullStep{})

undoTask.AddStep(&addPartitionMetaStep{
Expand Down

0 comments on commit 5e13bd7

Please sign in to comment.