From 07a27f804fa1d0c298a896275b5e180d462d44f8 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Fri, 5 Nov 2021 18:45:03 +0800 Subject: [PATCH] br: Update mysql.stats_meta after checksum in restore (#29429) (#29479) --- br/pkg/glue/glue.go | 1 + br/pkg/gluetidb/glue.go | 5 ++++ br/pkg/restore/client.go | 53 +++++++++++++++++++++++++++------------- br/pkg/restore/db.go | 23 +++++++++++++++++ executor/brie.go | 6 +++++ 5 files changed, 71 insertions(+), 17 deletions(-) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 0d40bf648e53a..7f2be30a60d34 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -33,6 +33,7 @@ type Glue interface { // Session is an abstraction of the session.Session interface. type Session interface { Execute(ctx context.Context, sql string) error + ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error CreateDatabase(ctx context.Context, schema *model.DBInfo) error CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error Close() diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index cd06e9e770a85..c2cb64d21328a 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -108,6 +108,11 @@ func (gs *tidbSession) Execute(ctx context.Context, sql string) error { return errors.Trace(err) } +func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error { + _, err := gs.se.ExecuteInternal(ctx, sql, args...) + return errors.Trace(err) +} + // CreateDatabase implements glue.Session. func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { d := domain.GetDomain(gs.se).DDL() diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index 62daee022c413..6c9e0b586845d 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -796,7 +796,7 @@ func (rc *Client) GoValidateChecksum( // run the stat loader go func() { defer wg.Done() - rc.statLoader(ctx, loadStatCh) + rc.updateMetaAndLoadStats(ctx, loadStatCh) }() workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum") go func() { @@ -842,7 +842,13 @@ func (rc *Client) GoValidateChecksum( return outCh } -func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client, concurrency uint, loadStatCh chan<- *CreatedTable) error { +func (rc *Client) execChecksum( + ctx context.Context, + tbl CreatedTable, + kvClient kv.Client, + concurrency uint, + loadStatCh chan<- *CreatedTable, +) error { logger := log.With( zap.String("db", tbl.OldTable.DB.Name.O), zap.String("table", tbl.OldTable.Info.Name.O), @@ -891,13 +897,12 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k ) return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum") } - if table.Stats != nil { - loadStatCh <- &tbl - } + + loadStatCh <- &tbl return nil } -func (rc *Client) statLoader(ctx context.Context, input <-chan *CreatedTable) { +func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) { for { select { case <-ctx.Done(): @@ -906,19 +911,33 @@ func (rc *Client) statLoader(ctx context.Context, input <-chan *CreatedTable) { if !ok { return } + + // Not need to return err when failed because of update analysis-meta + restoreTS, err := rc.GetTS(ctx) + if err != nil { + log.Error("getTS failed", zap.Error(err)) + } else { + err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs) + if err != nil { + log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err)) + } + } + table := tbl.OldTable - log.Info("start loads analyze after validate checksum", - zap.Int64("old id", tbl.OldTable.Info.ID), - zap.Int64("new id", tbl.Table.ID), - ) - start := time.Now() - if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil { - log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err)) + if table.Stats != nil { + log.Info("start loads analyze after validate checksum", + zap.Int64("old id", tbl.OldTable.Info.ID), + zap.Int64("new id", tbl.Table.ID), + ) + start := time.Now() + if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil { + log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err)) + } + log.Info("restore stat done", + zap.String("table", table.Info.Name.L), + zap.String("db", table.DB.Name.L), + zap.Duration("cost", time.Since(start))) } - log.Info("restore stat done", - zap.String("table", table.Info.Name.L), - zap.String("db", table.DB.Name.L), - zap.Duration("cost", time.Since(start))) } } } diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index af66f3aecfc22..3e9d35a124dfd 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "go.uber.org/zap" ) @@ -92,6 +93,28 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { return errors.Trace(err) } +// UpdateStatsMeta update count and snapshot ts in mysql.stats_meta +func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error { + sysDB := mysql.SystemDB + statsMetaTbl := "stats_meta" + + // set restoreTS to snapshot and version which is used to update stats_meta + err := db.se.ExecuteInternal( + ctx, + "update %n.%n set snapshot = %?, version = %?, count = %? where table_id = %?", + sysDB, + statsMetaTbl, + restoreTS, + restoreTS, + count, + tableID, + ) + if err != nil { + log.Error("execute update sql failed", zap.Error(err)) + } + return nil +} + // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { err := db.se.CreateDatabase(ctx, schema) diff --git a/executor/brie.go b/executor/brie.go index ddb12c13b9527..9be7349c494a8 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -470,6 +470,12 @@ func (gs *tidbGlueSession) Execute(ctx context.Context, sql string) error { return err } +func (gs *tidbGlueSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error { + exec := gs.se.(sqlexec.SQLExecutor) + _, err := exec.ExecuteInternal(ctx, sql, args...) + return err +} + // CreateDatabase implements glue.Session func (gs *tidbGlueSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { d := domain.GetDomain(gs.se).DDL()