From dc7851840dba293dc8deb240dabd92515e0dbc31 Mon Sep 17 00:00:00 2001 From: xiaofanluan Date: Fri, 15 Nov 2024 21:32:15 -0800 Subject: [PATCH] enhance: refine meta cache and add logs Signed-off-by: xiaofanluan --- internal/proxy/impl.go | 35 +++++-- internal/proxy/meta_cache.go | 101 ++++++++------------ internal/proxy/task.go | 2 - internal/rootcoord/alter_collection_task.go | 2 +- internal/rootcoord/create_partition_task.go | 4 +- internal/rootcoord/drop_collection_task.go | 2 +- internal/rootcoord/drop_partition_task.go | 2 +- 7 files changed, 70 insertions(+), 78 deletions(-) diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 0157c63c23028..c3d3d4f7dd75b 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -128,24 +128,35 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p if globalMetaCache != nil { switch msgType { - case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection: + case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias: if collectionName != "" { globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName) } if request.CollectionID != UniqueID(0) { aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) + for _, name := range aliasName { + globalMetaCache.DeprecateShardCache(request.GetDbName(), name) + } } - log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", collectionName)) - case commonpb.MsgType_DropPartition: - if collectionName != "" && request.GetPartitionName() != "" { - globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName()) - } else { - log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty", - zap.String("collectionName", collectionName), - zap.String("partitionName", request.GetPartitionName())) - return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil + log.Info("complete to invalidate collection meta cache with collection name", zap.String("type", request.GetBase().GetMsgType().String())) + case commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection: + // All the request from query use collectionID + if request.CollectionID != UniqueID(0) { + aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) + for _, name := range aliasName { + globalMetaCache.DeprecateShardCache(request.GetDbName(), name) + } } + log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) + case commonpb.MsgType_CreatePartition, commonpb.MsgType_DropPartition: + if request.GetPartitionName() == "" { + log.Warn("invalidate collection meta cache failed. partitionName is empty") + return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil + } + // no need to deprecate shard cache because shard won't change when create o drop partition + globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) + log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String())) case commonpb.MsgType_DropDatabase: globalMetaCache.RemoveDatabase(ctx, request.GetDbName()) default: @@ -153,9 +164,13 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p if collectionName != "" { globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached + globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName) } if request.CollectionID != UniqueID(0) { aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID) + for _, name := range aliasName { + globalMetaCache.DeprecateShardCache(request.GetDbName(), name) + } } } } diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 47d282fae56b6..8562c2c4b8583 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -75,7 +75,6 @@ type Cache interface { ListShardLocation() map[int64]nodeInfo RemoveCollection(ctx context.Context, database, collectionName string) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string - RemovePartition(ctx context.Context, database, collectionName string, partitionName string) // GetCredentialInfo operate credential cache GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error) @@ -418,21 +417,6 @@ func (m *MetaCache) getCollection(database, collectionName string, collectionID return nil, false } -func (m *MetaCache) getCollectionShardLeader(database, collectionName string) *shardLeaders { - m.leaderMut.RLock() - defer m.leaderMut.RUnlock() - - db, ok := m.collLeader[database] - if !ok { - return nil - } - - if leaders, ok := db[collectionName]; ok { - return leaders - } - return nil -} - func (m *MetaCache) update(ctx context.Context, database, collectionName string, collectionID UniqueID) (*collectionInfo, error) { if collInfo, ok := m.getCollection(database, collectionName, collectionID); ok { return collInfo, nil @@ -497,7 +481,9 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string, } log.Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName), - zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID)) + zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID) + zap.Strings("partition", partitions.PartitionNames) + ) return m.collInfo[database][collectionName], nil } @@ -856,6 +842,7 @@ func (m *MetaCache) RemoveCollection(ctx context.Context, database, collectionNa if dbOk { delete(m.collInfo[database], collectionName) } + log.Debug("remove collection", zap.String("db", database), zap.String("collection", collectionName)) } func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string { @@ -870,36 +857,10 @@ func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID Uniq } } } + log.Debug("remove collection by id", zap.Int64("id", collectionID), zap.Strings("collection", collNames)) return collNames } -func (m *MetaCache) RemovePartition(ctx context.Context, database, collectionName, partitionName string) { - m.mu.Lock() - defer m.mu.Unlock() - - var ok bool - var collInfo *collectionInfo - - db, dbOk := m.collInfo[database] - if dbOk { - collInfo, ok = db[collectionName] - } - - if !ok { - return - } - - partInfo := m.collInfo[database][collectionName].partInfo - if partInfo == nil { - return - } - filteredInfos := lo.Filter(partInfo.partitionInfos, func(info *partitionInfo, idx int) bool { - return info.name != partitionName - }) - - m.collInfo[database][collectionName].partInfo = parsePartitionsInfo(filteredInfos, collInfo.schema.hasPartitionKeyField) -} - // GetCredentialInfo returns the credential related to provided username // If the cache missed, proxy will try to fetch from storage func (m *MetaCache) GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error) { @@ -953,10 +914,20 @@ func (m *MetaCache) UpdateCredential(credInfo *internalpb.CredentialInfo) { func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, collectionName string, collectionID int64) (map[string][]nodeInfo, error) { method := "GetShards" log := log.Ctx(ctx).With( + zap.String("db", database), zap.String("collectionName", collectionName), zap.Int64("collectionID", collectionID)) - cacheShardLeaders := m.getCollectionShardLeader(database, collectionName) + // check cache first + m.leaderMut.RLock() + var cacheShardLeaders *shardLeaders + db, ok := m.collLeader[database] + if !ok { + cacheShardLeaders = nil + } else { + cacheShardLeaders = db[collectionName] + } + m.leaderMut.RUnlock() if withCache { if cacheShardLeaders != nil { metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc() @@ -1002,11 +973,16 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col m.collLeader[database] = make(map[string]*shardLeaders) } m.collLeader[database][collectionName] = newShardLeaders - iterator := newShardLeaders.GetReader() ret := iterator.Shuffle() m.leaderMut.Unlock() - + nodeInfos := make([]string, 0) + for _, shardLeader := range newShardLeaders.shardLeaders { + for _, nodeInfo := range shardLeader { + nodeInfos = append(nodeInfos, fmt.Sprintf("nodeID: %d, nodeAddr: %s", nodeInfo.nodeID, nodeInfo.address)) + } + } + log.Debug("fill new collection shard leader", zap.Strings("nodeInfos", nodeInfos)) metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return ret, nil } @@ -1027,20 +1003,6 @@ func parseShardLeaderList2QueryNode(shardsLeaders []*querypb.ShardLeadersList) m return shard2QueryNodes } -// DeprecateShardCache clear the shard leader cache of a collection -func (m *MetaCache) DeprecateShardCache(database, collectionName string) { - log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName)) - m.leaderMut.Lock() - defer m.leaderMut.Unlock() - dbInfo, ok := m.collLeader[database] - if ok { - delete(dbInfo, collectionName) - if len(dbInfo) == 0 { - delete(m.collLeader, database) - } - } -} - // used for Garbage collection shard client func (m *MetaCache) ListShardLocation() map[int64]nodeInfo { m.leaderMut.RLock() @@ -1059,11 +1021,25 @@ func (m *MetaCache) ListShardLocation() map[int64]nodeInfo { return shardLeaderInfo } +// DeprecateShardCache clear the shard leader cache of a collection +func (m *MetaCache) DeprecateShardCache(database, collectionName string) { + log.Info("deprecate shard cache for collection", zap.String("collectionName", collectionName)) + m.leaderMut.Lock() + defer m.leaderMut.Unlock() + dbInfo, ok := m.collLeader[database] + if ok { + delete(dbInfo, collectionName) + if len(dbInfo) == 0 { + delete(m.collLeader, database) + } + } +} + +// InvalidateShardLeaderCache called when Shard leader balance happened func (m *MetaCache) InvalidateShardLeaderCache(collections []int64) { log.Info("Invalidate shard cache for collections", zap.Int64s("collectionIDs", collections)) m.leaderMut.Lock() defer m.leaderMut.Unlock() - collectionSet := typeutil.NewUniqueSet(collections...) for dbName, dbInfo := range m.collLeader { for collectionName, shardLeaders := range dbInfo { @@ -1202,6 +1178,7 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) { } func (m *MetaCache) RemoveDatabase(ctx context.Context, database string) { + log.Debug("remove database", zap.String("name", database)) m.mu.Lock() delete(m.collInfo, database) delete(m.dbInfo, database) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index ce9aa746d2921..07b6cf6f864e1 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -1723,7 +1723,6 @@ func (t *releaseCollectionTask) Execute(ctx context.Context) (err error) { } func (t *releaseCollectionTask) PostExecute(ctx context.Context) error { - globalMetaCache.DeprecateShardCache(t.GetDbName(), t.CollectionName) return nil } @@ -1993,7 +1992,6 @@ func (t *releasePartitionsTask) Execute(ctx context.Context) (err error) { } func (t *releasePartitionsTask) PostExecute(ctx context.Context) error { - globalMetaCache.DeprecateShardCache(t.GetDbName(), t.CollectionName) return nil } diff --git a/internal/rootcoord/alter_collection_task.go b/internal/rootcoord/alter_collection_task.go index 9d7381c7f5668..f22fdb59ac468 100644 --- a/internal/rootcoord/alter_collection_task.go +++ b/internal/rootcoord/alter_collection_task.go @@ -89,7 +89,7 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error { baseStep: baseStep{core: a.core}, dbName: a.Req.GetDbName(), collectionNames: append(aliases, a.Req.GetCollectionName()), - collectionID: oldColl.CollectionID, + collectionID: InvalidCollectionID, opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)}, }) diff --git a/internal/rootcoord/create_partition_task.go b/internal/rootcoord/create_partition_task.go index 7e3fe097aa935..a9cd31fca3168 100644 --- a/internal/rootcoord/create_partition_task.go +++ b/internal/rootcoord/create_partition_task.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/log" ) @@ -81,9 +82,10 @@ func (t *createPartitionTask) Execute(ctx context.Context) error { baseStep: baseStep{core: t.core}, dbName: t.Req.GetDbName(), collectionNames: []string{t.collMeta.Name}, - collectionID: t.collMeta.CollectionID, + collectionID: InvalidCollectionID, partitionName: t.Req.GetPartitionName(), ts: t.GetTs(), + opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_CreatePartition)}, }, &nullStep{}) undoTask.AddStep(&addPartitionMetaStep{ diff --git a/internal/rootcoord/drop_collection_task.go b/internal/rootcoord/drop_collection_task.go index 4d443a1ad3ad7..5eee40ed7cd82 100644 --- a/internal/rootcoord/drop_collection_task.go +++ b/internal/rootcoord/drop_collection_task.go @@ -78,7 +78,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error { baseStep: baseStep{core: t.core}, dbName: t.Req.GetDbName(), collectionNames: append(aliases, collMeta.Name), - collectionID: collMeta.CollectionID, + collectionID: InvalidCollectionID, ts: ts, opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropCollection)}, }) diff --git a/internal/rootcoord/drop_partition_task.go b/internal/rootcoord/drop_partition_task.go index c65c91f9c1099..896cdb0dd9e06 100644 --- a/internal/rootcoord/drop_partition_task.go +++ b/internal/rootcoord/drop_partition_task.go @@ -73,7 +73,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error { baseStep: baseStep{core: t.core}, dbName: t.Req.GetDbName(), collectionNames: []string{t.collMeta.Name}, - collectionID: t.collMeta.CollectionID, + collectionID: InvalidCollectionID, partitionName: t.Req.GetPartitionName(), ts: t.GetTs(), opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropPartition)},