Skip to content

Commit

Permalink
enhance: refine meta cache and add logs
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofanluan <[email protected]>
  • Loading branch information
xiaofan-luan committed Nov 16, 2024
1 parent caf207f commit dc78518
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 78 deletions.
35 changes: 25 additions & 10 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,34 +128,49 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p

if globalMetaCache != nil {
switch msgType {
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias, commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection:
case commonpb.MsgType_DropCollection, commonpb.MsgType_RenameCollection, commonpb.MsgType_DropAlias, commonpb.MsgType_AlterAlias:
if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
}
log.Info("complete to invalidate collection meta cache with collection name", zap.String("collectionName", collectionName))
case commonpb.MsgType_DropPartition:
if collectionName != "" && request.GetPartitionName() != "" {
globalMetaCache.RemovePartition(ctx, request.GetDbName(), request.GetCollectionName(), request.GetPartitionName())
} else {
log.Warn("invalidate collection meta cache failed. collectionName or partitionName is empty",
zap.String("collectionName", collectionName),
zap.String("partitionName", request.GetPartitionName()))
return merr.Status(merr.WrapErrPartitionNotFound(request.GetPartitionName(), "partition name not specified")), nil
log.Info("complete to invalidate collection meta cache with collection name", zap.String("type", request.GetBase().GetMsgType().String()))
case commonpb.MsgType_LoadCollection, commonpb.MsgType_ReleaseCollection:
// All the request from query use collectionID
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
}
log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String()))
case commonpb.MsgType_CreatePartition, commonpb.MsgType_DropPartition:
if request.GetPartitionName() == "" {
log.Warn("invalidate collection meta cache failed. partitionName is empty")
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
}
// no need to deprecate shard cache because shard won't change when create o drop partition
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName)
log.Info("complete to invalidate collection meta cache", zap.String("type", request.GetBase().GetMsgType().String()))
case commonpb.MsgType_DropDatabase:
globalMetaCache.RemoveDatabase(ctx, request.GetDbName())
default:
log.Warn("receive unexpected msgType of invalidate collection meta cache", zap.String("msgType", request.GetBase().GetMsgType().String()))

if collectionName != "" {
globalMetaCache.RemoveCollection(ctx, request.GetDbName(), collectionName) // no need to return error, though collection may be not cached
globalMetaCache.DeprecateShardCache(request.GetDbName(), collectionName)
}
if request.CollectionID != UniqueID(0) {
aliasName = globalMetaCache.RemoveCollectionsByID(ctx, collectionID)
for _, name := range aliasName {
globalMetaCache.DeprecateShardCache(request.GetDbName(), name)
}
}
}
}
Expand Down
101 changes: 39 additions & 62 deletions internal/proxy/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type Cache interface {
ListShardLocation() map[int64]nodeInfo
RemoveCollection(ctx context.Context, database, collectionName string)
RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string
RemovePartition(ctx context.Context, database, collectionName string, partitionName string)

// GetCredentialInfo operate credential cache
GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error)
Expand Down Expand Up @@ -418,21 +417,6 @@ func (m *MetaCache) getCollection(database, collectionName string, collectionID
return nil, false
}

func (m *MetaCache) getCollectionShardLeader(database, collectionName string) *shardLeaders {
m.leaderMut.RLock()
defer m.leaderMut.RUnlock()

db, ok := m.collLeader[database]
if !ok {
return nil
}

if leaders, ok := db[collectionName]; ok {
return leaders
}
return nil
}

func (m *MetaCache) update(ctx context.Context, database, collectionName string, collectionID UniqueID) (*collectionInfo, error) {
if collInfo, ok := m.getCollection(database, collectionName, collectionID); ok {
return collInfo, nil
Expand Down Expand Up @@ -497,7 +481,9 @@ func (m *MetaCache) update(ctx context.Context, database, collectionName string,
}

log.Info("meta update success", zap.String("database", database), zap.String("collectionName", collectionName),
zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID))
zap.String("actual collection Name", collection.Schema.GetName()), zap.Int64("collectionID", collection.CollectionID)
zap.Strings("partition", partitions.PartitionNames)
)
return m.collInfo[database][collectionName], nil
}

Expand Down Expand Up @@ -856,6 +842,7 @@ func (m *MetaCache) RemoveCollection(ctx context.Context, database, collectionNa
if dbOk {
delete(m.collInfo[database], collectionName)
}
log.Debug("remove collection", zap.String("db", database), zap.String("collection", collectionName))
}

func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID UniqueID) []string {
Expand All @@ -870,36 +857,10 @@ func (m *MetaCache) RemoveCollectionsByID(ctx context.Context, collectionID Uniq
}
}
}
log.Debug("remove collection by id", zap.Int64("id", collectionID), zap.Strings("collection", collNames))
return collNames
}

func (m *MetaCache) RemovePartition(ctx context.Context, database, collectionName, partitionName string) {
m.mu.Lock()
defer m.mu.Unlock()

var ok bool
var collInfo *collectionInfo

db, dbOk := m.collInfo[database]
if dbOk {
collInfo, ok = db[collectionName]
}

if !ok {
return
}

partInfo := m.collInfo[database][collectionName].partInfo
if partInfo == nil {
return
}
filteredInfos := lo.Filter(partInfo.partitionInfos, func(info *partitionInfo, idx int) bool {
return info.name != partitionName
})

m.collInfo[database][collectionName].partInfo = parsePartitionsInfo(filteredInfos, collInfo.schema.hasPartitionKeyField)
}

// GetCredentialInfo returns the credential related to provided username
// If the cache missed, proxy will try to fetch from storage
func (m *MetaCache) GetCredentialInfo(ctx context.Context, username string) (*internalpb.CredentialInfo, error) {
Expand Down Expand Up @@ -953,10 +914,20 @@ func (m *MetaCache) UpdateCredential(credInfo *internalpb.CredentialInfo) {
func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, collectionName string, collectionID int64) (map[string][]nodeInfo, error) {
method := "GetShards"
log := log.Ctx(ctx).With(
zap.String("db", database),
zap.String("collectionName", collectionName),
zap.Int64("collectionID", collectionID))

cacheShardLeaders := m.getCollectionShardLeader(database, collectionName)
// check cache first
m.leaderMut.RLock()
var cacheShardLeaders *shardLeaders
db, ok := m.collLeader[database]
if !ok {
cacheShardLeaders = nil
} else {
cacheShardLeaders = db[collectionName]
}
m.leaderMut.RUnlock()
if withCache {
if cacheShardLeaders != nil {
metrics.ProxyCacheStatsCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method, metrics.CacheHitLabel).Inc()
Expand Down Expand Up @@ -1002,11 +973,16 @@ func (m *MetaCache) GetShards(ctx context.Context, withCache bool, database, col
m.collLeader[database] = make(map[string]*shardLeaders)
}
m.collLeader[database][collectionName] = newShardLeaders

iterator := newShardLeaders.GetReader()
ret := iterator.Shuffle()
m.leaderMut.Unlock()

nodeInfos := make([]string, 0)
for _, shardLeader := range newShardLeaders.shardLeaders {
for _, nodeInfo := range shardLeader {
nodeInfos = append(nodeInfos, fmt.Sprintf("nodeID: %d, nodeAddr: %s", nodeInfo.nodeID, nodeInfo.address))
}
}
log.Debug("fill new collection shard leader", zap.Strings("nodeInfos", nodeInfos))
metrics.ProxyUpdateCacheLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
return ret, nil
}
Expand All @@ -1027,20 +1003,6 @@ func parseShardLeaderList2QueryNode(shardsLeaders []*querypb.ShardLeadersList) m
return shard2QueryNodes
}

// DeprecateShardCache clear the shard leader cache of a collection
func (m *MetaCache) DeprecateShardCache(database, collectionName string) {
log.Info("clearing shard cache for collection", zap.String("collectionName", collectionName))
m.leaderMut.Lock()
defer m.leaderMut.Unlock()
dbInfo, ok := m.collLeader[database]
if ok {
delete(dbInfo, collectionName)
if len(dbInfo) == 0 {
delete(m.collLeader, database)
}
}
}

// used for Garbage collection shard client
func (m *MetaCache) ListShardLocation() map[int64]nodeInfo {
m.leaderMut.RLock()
Expand All @@ -1059,11 +1021,25 @@ func (m *MetaCache) ListShardLocation() map[int64]nodeInfo {
return shardLeaderInfo
}

// DeprecateShardCache clear the shard leader cache of a collection
func (m *MetaCache) DeprecateShardCache(database, collectionName string) {
log.Info("deprecate shard cache for collection", zap.String("collectionName", collectionName))
m.leaderMut.Lock()
defer m.leaderMut.Unlock()
dbInfo, ok := m.collLeader[database]
if ok {
delete(dbInfo, collectionName)
if len(dbInfo) == 0 {
delete(m.collLeader, database)
}
}
}

// InvalidateShardLeaderCache called when Shard leader balance happened
func (m *MetaCache) InvalidateShardLeaderCache(collections []int64) {
log.Info("Invalidate shard cache for collections", zap.Int64s("collectionIDs", collections))
m.leaderMut.Lock()
defer m.leaderMut.Unlock()

collectionSet := typeutil.NewUniqueSet(collections...)
for dbName, dbInfo := range m.collLeader {
for collectionName, shardLeaders := range dbInfo {
Expand Down Expand Up @@ -1202,6 +1178,7 @@ func (m *MetaCache) RefreshPolicyInfo(op typeutil.CacheOp) (err error) {
}

func (m *MetaCache) RemoveDatabase(ctx context.Context, database string) {
log.Debug("remove database", zap.String("name", database))
m.mu.Lock()
delete(m.collInfo, database)
delete(m.dbInfo, database)
Expand Down
2 changes: 0 additions & 2 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1723,7 +1723,6 @@ func (t *releaseCollectionTask) Execute(ctx context.Context) (err error) {
}

func (t *releaseCollectionTask) PostExecute(ctx context.Context) error {
globalMetaCache.DeprecateShardCache(t.GetDbName(), t.CollectionName)
return nil
}

Expand Down Expand Up @@ -1993,7 +1992,6 @@ func (t *releasePartitionsTask) Execute(ctx context.Context) (err error) {
}

func (t *releasePartitionsTask) PostExecute(ctx context.Context) error {
globalMetaCache.DeprecateShardCache(t.GetDbName(), t.CollectionName)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/alter_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (a *alterCollectionTask) Execute(ctx context.Context) error {
baseStep: baseStep{core: a.core},
dbName: a.Req.GetDbName(),
collectionNames: append(aliases, a.Req.GetCollectionName()),
collectionID: oldColl.CollectionID,
collectionID: InvalidCollectionID,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_AlterCollection)},
})

Expand Down
4 changes: 3 additions & 1 deletion internal/rootcoord/create_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/log"
)
Expand Down Expand Up @@ -81,9 +82,10 @@ func (t *createPartitionTask) Execute(ctx context.Context) error {
baseStep: baseStep{core: t.core},
dbName: t.Req.GetDbName(),
collectionNames: []string{t.collMeta.Name},
collectionID: t.collMeta.CollectionID,
collectionID: InvalidCollectionID,
partitionName: t.Req.GetPartitionName(),
ts: t.GetTs(),
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_CreatePartition)},
}, &nullStep{})

undoTask.AddStep(&addPartitionMetaStep{
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/drop_collection_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
baseStep: baseStep{core: t.core},
dbName: t.Req.GetDbName(),
collectionNames: append(aliases, collMeta.Name),
collectionID: collMeta.CollectionID,
collectionID: InvalidCollectionID,
ts: ts,
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropCollection)},
})
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/drop_partition_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
baseStep: baseStep{core: t.core},
dbName: t.Req.GetDbName(),
collectionNames: []string{t.collMeta.Name},
collectionID: t.collMeta.CollectionID,
collectionID: InvalidCollectionID,
partitionName: t.Req.GetPartitionName(),
ts: t.GetTs(),
opts: []proxyutil.ExpireCacheOpt{proxyutil.SetMsgType(commonpb.MsgType_DropPartition)},
Expand Down

0 comments on commit dc78518

Please sign in to comment.