From 89f66bc7782d2d46ff04267cab24e53b546e1ce3 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Tue, 14 May 2024 17:34:34 +0800 Subject: [PATCH 1/2] snapshot (ticdc): reduce list tables time consumption Signed-off-by: dongmen <414110582@qq.com> --- cdc/entry/schema/snapshot.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index daf9e3078cb..70d6f50be9f 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -167,16 +167,30 @@ func NewSnapshotFromMeta( vname := newVersionedEntityName(-1, dbinfo.Name.O, tag) // -1 means the entity is a schema. vname.target = dbinfo.ID snap.inner.schemaNameToID.ReplaceOrInsert(vname) - - tableInfos, err := meta.ListTables(dbinfo.ID) + // get all tables Name + tableNames, err := meta.ListSimpleTables(dbinfo.ID) if err != nil { return nil, cerror.WrapError(cerror.ErrMetaListDatabases, err) } - for _, tableInfo := range tableInfos { - if filter.ShouldIgnoreTable(dbinfo.Name.O, tableInfo.Name.O) { - log.Debug("ignore table", zap.String("table", tableInfo.Name.O)) + tableNeeded := make([]*timodel.TableNameInfo, 0, len(tableNames)) + // filter tables + for _, table := range tableNames { + if filter.ShouldIgnoreTable(dbinfo.Name.O, table.Name.O) { + log.Debug("ignore table", zap.String("table", table.Name.O)) continue } + tableNeeded = append(tableNeeded, table) + } + tableInfos := make([]*timodel.TableInfo, 0, len(tableNeeded)) + for _, table := range tableNeeded { + tableInfo, err := meta.GetTable(dbinfo.ID, table.ID) + if err != nil { + return nil, errors.Trace(err) + } + tableInfos = append(tableInfos, tableInfo) + } + + for _, tableInfo := range tableInfos { tableInfo := model.WrapTableInfo(dbinfo.ID, dbinfo.Name.O, currentTs, tableInfo) snap.inner.tables.ReplaceOrInsert(versionedID{ id: tableInfo.ID, From 4e508e88f2e925eba3af37b5e522fd47c7d58a46 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Fri, 17 May 2024 09:18:36 +0800 Subject: [PATCH 2/2] snapshot (ticdc): remove verbose logs Signed-off-by: dongmen <414110582@qq.com> --- cdc/entry/schema/snapshot.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 70d6f50be9f..e01821ef551 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -18,6 +18,7 @@ import ( "math" "strings" "sync" + "time" "github.com/google/btree" "github.com/pingcap/errors" @@ -146,6 +147,7 @@ func NewSnapshotFromMeta( forceReplicate bool, filter filter.Filter, ) (*Snapshot, error) { + start := time.Now() snap := NewEmptySnapshot(forceReplicate) dbinfos, err := meta.ListDatabases() if err != nil { @@ -153,8 +155,6 @@ func NewSnapshotFromMeta( } // `tag` is used to reverse sort all versions in the generated snapshot. tag := negative(currentTs) - // record all tables to be replicated for logging use - tables := make([]*model.TableInfo, 0, 1024) for _, dbinfo := range dbinfos { if filter.ShouldIgnoreSchema(dbinfo.Name.O) { log.Debug("ignore database", zap.String("db", dbinfo.Name.O)) @@ -207,8 +207,6 @@ func NewSnapshotFromMeta( ineligible := !tableInfo.IsEligible(forceReplicate) if ineligible { snap.inner.ineligibleTables.ReplaceOrInsert(versionedID{id: tableInfo.ID, tag: tag}) - } else { - tables = append(tables, tableInfo) } if pi := tableInfo.GetPartitionInfo(); pi != nil { for _, partition := range pi.Definitions { @@ -223,15 +221,10 @@ func NewSnapshotFromMeta( } } snap.inner.currentTs = currentTs - var sb strings.Builder - sb.WriteString(fmt.Sprintf("%d tables to be replicated: ", len(tables))) - for _, table := range tables { - sb.WriteString(fmt.Sprintf("%s.%s, ", table.TableName.Schema, table.TableName.Table)) - } log.Info("schema snapshot created", zap.Stringer("changefeed", id), zap.Uint64("currentTs", currentTs), - zap.String("tables", sb.String())) + zap.Any("duration", time.Since(start).Seconds())) return snap, nil }