Skip to content

Commit

Permalink
br: Update mysql.stats_meta after checksum in restore (#29429) (#29479)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Nov 5, 2021
1 parent 8ec9a51 commit 07a27f8
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 17 deletions.
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Glue interface {
// Session is an abstraction of the session.Session interface.
type Session interface {
Execute(ctx context.Context, sql string) error
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
Close()
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ func (gs *tidbSession) Execute(ctx context.Context, sql string) error {
return errors.Trace(err)
}

func (gs *tidbSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error {
_, err := gs.se.ExecuteInternal(ctx, sql, args...)
return errors.Trace(err)
}

// CreateDatabase implements glue.Session.
func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
Expand Down
53 changes: 36 additions & 17 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ func (rc *Client) GoValidateChecksum(
// run the stat loader
go func() {
defer wg.Done()
rc.statLoader(ctx, loadStatCh)
rc.updateMetaAndLoadStats(ctx, loadStatCh)
}()
workers := utils.NewWorkerPool(defaultChecksumConcurrency, "RestoreChecksum")
go func() {
Expand Down Expand Up @@ -842,7 +842,13 @@ func (rc *Client) GoValidateChecksum(
return outCh
}

func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient kv.Client, concurrency uint, loadStatCh chan<- *CreatedTable) error {
func (rc *Client) execChecksum(
ctx context.Context,
tbl CreatedTable,
kvClient kv.Client,
concurrency uint,
loadStatCh chan<- *CreatedTable,
) error {
logger := log.With(
zap.String("db", tbl.OldTable.DB.Name.O),
zap.String("table", tbl.OldTable.Info.Name.O),
Expand Down Expand Up @@ -891,13 +897,12 @@ func (rc *Client) execChecksum(ctx context.Context, tbl CreatedTable, kvClient k
)
return errors.Annotate(berrors.ErrRestoreChecksumMismatch, "failed to validate checksum")
}
if table.Stats != nil {
loadStatCh <- &tbl
}

loadStatCh <- &tbl
return nil
}

func (rc *Client) statLoader(ctx context.Context, input <-chan *CreatedTable) {
func (rc *Client) updateMetaAndLoadStats(ctx context.Context, input <-chan *CreatedTable) {
for {
select {
case <-ctx.Done():
Expand All @@ -906,19 +911,33 @@ func (rc *Client) statLoader(ctx context.Context, input <-chan *CreatedTable) {
if !ok {
return
}

// Not need to return err when failed because of update analysis-meta
restoreTS, err := rc.GetTS(ctx)
if err != nil {
log.Error("getTS failed", zap.Error(err))
} else {
err = rc.db.UpdateStatsMeta(ctx, tbl.Table.ID, restoreTS, tbl.OldTable.TotalKvs)
if err != nil {
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(err))
}
}

table := tbl.OldTable
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
if table.Stats != nil {
log.Info("start loads analyze after validate checksum",
zap.Int64("old id", tbl.OldTable.Info.ID),
zap.Int64("new id", tbl.Table.ID),
)
start := time.Now()
if err := rc.statsHandler.LoadStatsFromJSON(rc.dom.InfoSchema(), table.Stats); err != nil {
log.Error("analyze table failed", zap.Any("table", table.Stats), zap.Error(err))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
}
log.Info("restore stat done",
zap.String("table", table.Info.Name.L),
zap.String("db", table.DB.Name.L),
zap.Duration("cost", time.Since(start)))
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -92,6 +93,28 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error {
return errors.Trace(err)
}

// UpdateStatsMeta update count and snapshot ts in mysql.stats_meta
func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint64, count uint64) error {
sysDB := mysql.SystemDB
statsMetaTbl := "stats_meta"

// set restoreTS to snapshot and version which is used to update stats_meta
err := db.se.ExecuteInternal(
ctx,
"update %n.%n set snapshot = %?, version = %?, count = %? where table_id = %?",
sysDB,
statsMetaTbl,
restoreTS,
restoreTS,
count,
tableID,
)
if err != nil {
log.Error("execute update sql failed", zap.Error(err))
}
return nil
}

// CreateDatabase executes a CREATE DATABASE SQL.
func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
err := db.se.CreateDatabase(ctx, schema)
Expand Down
6 changes: 6 additions & 0 deletions executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,12 @@ func (gs *tidbGlueSession) Execute(ctx context.Context, sql string) error {
return err
}

func (gs *tidbGlueSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error {
exec := gs.se.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(ctx, sql, args...)
return err
}

// CreateDatabase implements glue.Session
func (gs *tidbGlueSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
d := domain.GetDomain(gs.se).DDL()
Expand Down

0 comments on commit 07a27f8

Please sign in to comment.