From 2d94ba153dbf90784d5bcaf3124eccc600609052 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 6 Dec 2022 11:43:12 +0800 Subject: [PATCH 1/5] add retry for azblob read file Signed-off-by: Leavrth --- br/cmd/br/cmd.go | 3 +- br/cmd/br/debug.go | 4 +- br/pkg/backup/client.go | 2 +- br/pkg/backup/schema.go | 8 ++-- br/pkg/backup/schema_test.go | 3 +- br/pkg/{utils => metautil}/schema.go | 13 +++-- br/pkg/{utils => metautil}/schema_test.go | 23 +++++---- br/pkg/restore/client.go | 26 +++++----- br/pkg/restore/client_test.go | 3 +- br/pkg/restore/db.go | 21 ++++----- br/pkg/restore/systable_restore.go | 22 ++++----- br/pkg/restore/tiflashrec/tiflash_recorder.go | 4 +- br/pkg/restore/util.go | 6 +-- br/pkg/storage/azblob.go | 22 +++++++-- br/pkg/task/common.go | 5 +- br/pkg/task/restore.go | 10 ++-- br/pkg/task/restore_test.go | 47 +++++++++---------- br/pkg/task/stream.go | 8 ++-- 18 files changed, 120 insertions(+), 110 deletions(-) rename br/pkg/{utils => metautil}/schema.go (90%) rename br/pkg/{utils => metautil}/schema_test.go (94%) diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index f3aeb3393df52..bbcce18fffb6c 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -14,6 +14,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/gluetidb" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/task" @@ -34,7 +35,7 @@ var ( filterOutSysAndMemTables = []string{ "*.*", - fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")), + fmt.Sprintf("!%s.*", metautil.TemporaryDBName("*")), "!mysql.*", "mysql.user", "mysql.db", diff --git a/br/cmd/br/debug.go b/br/cmd/br/debug.go index c62dd677f9a66..179bebdd17eee 100644 --- a/br/cmd/br/debug.go +++ b/br/cmd/br/debug.go @@ -80,7 +80,7 @@ func newCheckSumCommand() *cobra.Command { } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := utils.LoadBackupTables(ctx, reader) + dbs, err := metautil.LoadBackupTables(ctx, reader) if err != nil { return errors.Trace(err) } @@ -182,7 +182,7 @@ func newBackupMetaValidateCommand() *cobra.Command { return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := utils.LoadBackupTables(ctx, reader) + dbs, err := metautil.LoadBackupTables(ctx, reader) if err != nil { log.Error("load tables failed", zap.Error(err)) return errors.Trace(err) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 0241789e65103..26310aa0c7db6 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -570,7 +570,7 @@ func BuildBackupRangeAndSchema( switch { case tableInfo.IsSequence(): globalAutoID, err = seqAlloc.NextGlobalAutoID() - case tableInfo.IsView() || !utils.NeedAutoID(tableInfo): + case tableInfo.IsView() || !metautil.NeedAutoID(tableInfo): // no auto ID for views or table without either rowID nor auto_increment ID. default: globalAutoID, err = idAlloc.NextGlobalAutoID() diff --git a/br/pkg/backup/schema.go b/br/pkg/backup/schema.go index bb0cf7f884189..9425f0aca2ad6 100644 --- a/br/pkg/backup/schema.go +++ b/br/pkg/backup/schema.go @@ -65,13 +65,13 @@ func (ss *Schemas) AddSchema( dbInfo *model.DBInfo, tableInfo *model.TableInfo, ) { if tableInfo == nil { - ss.schemas[utils.EncloseName(dbInfo.Name.L)] = &schemaInfo{ + ss.schemas[metautil.EncloseName(dbInfo.Name.L)] = &schemaInfo{ dbInfo: dbInfo, } return } name := fmt.Sprintf("%s.%s", - utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L)) + metautil.EncloseName(dbInfo.Name.L), metautil.EncloseName(tableInfo.Name.L)) ss.schemas[name] = &schemaInfo{ tableInfo: tableInfo, dbInfo: dbInfo, @@ -106,8 +106,8 @@ func (ss *Schemas) BackupSchemas( schema := s // Because schema.dbInfo is a pointer that many tables point to. // Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations. - if utils.IsSysDB(schema.dbInfo.Name.L) { - schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O) + if metautil.IsSysDB(schema.dbInfo.Name.L) { + schema.dbInfo.Name = metautil.TemporaryDBName(schema.dbInfo.Name.O) } var checksum *checkpoint.ChecksumItem diff --git a/br/pkg/backup/schema_test.go b/br/pkg/backup/schema_test.go index 08d560bf03c25..f02b3cbd271b2 100644 --- a/br/pkg/backup/schema_test.go +++ b/br/pkg/backup/schema_test.go @@ -17,7 +17,6 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" filter "github.com/pingcap/tidb/util/table-filter" @@ -314,7 +313,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) { schemas2 := GetSchemasFromMeta(t, es2) require.Len(t, schemas2, systemTablesCount) for _, schema := range schemas2 { - require.Equal(t, utils.TemporaryDBName("mysql"), schema.DB.Name) + require.Equal(t, metautil.TemporaryDBName("mysql"), schema.DB.Name) require.Equal(t, true, strings.HasPrefix(schema.Info.Name.O, tablePrefix)) } } diff --git a/br/pkg/utils/schema.go b/br/pkg/metautil/schema.go similarity index 90% rename from br/pkg/utils/schema.go rename to br/pkg/metautil/schema.go index 8ceba24e140ad..d5ffedd5a2dc9 100644 --- a/br/pkg/utils/schema.go +++ b/br/pkg/metautil/schema.go @@ -1,6 +1,6 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package utils +package metautil import ( "context" @@ -9,7 +9,6 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" - "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" ) @@ -27,11 +26,11 @@ func NeedAutoID(tblInfo *model.TableInfo) bool { // Database wraps the schema and tables of a database. type Database struct { Info *model.DBInfo - Tables []*metautil.Table + Tables []*Table } // GetTable returns a table of the database by name. -func (db *Database) GetTable(name string) *metautil.Table { +func (db *Database) GetTable(name string) *Table { for _, table := range db.Tables { if table.Info.Name.String() == name { return table @@ -41,8 +40,8 @@ func (db *Database) GetTable(name string) *metautil.Table { } // LoadBackupTables loads schemas from BackupMeta. -func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error) { - ch := make(chan *metautil.Table) +func LoadBackupTables(ctx context.Context, reader *MetaReader) (map[string]*Database, error) { + ch := make(chan *Table) errCh := make(chan error) go func() { if err := reader.ReadSchemasFiles(ctx, ch); err != nil { @@ -68,7 +67,7 @@ func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[str if !ok { db = &Database{ Info: table.DB, - Tables: make([]*metautil.Table, 0), + Tables: make([]*Table, 0), } databases[dbName] = db } diff --git a/br/pkg/utils/schema_test.go b/br/pkg/metautil/schema_test.go similarity index 94% rename from br/pkg/utils/schema_test.go rename to br/pkg/metautil/schema_test.go index 95d649000d9ce..b36265da00bf9 100644 --- a/br/pkg/utils/schema_test.go +++ b/br/pkg/metautil/schema_test.go @@ -1,6 +1,6 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package utils +package metautil import ( "context" @@ -11,7 +11,6 @@ import ( "github.com/golang/protobuf/proto" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" - "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/statistics/handle" @@ -84,12 +83,12 @@ func TestLoadBackupMeta(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(t, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -179,12 +178,12 @@ func TestLoadBackupMetaPartionTable(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(t, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -265,12 +264,12 @@ func BenchmarkLoadBackupMeta64(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -297,12 +296,12 @@ func BenchmarkLoadBackupMeta1024(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -329,12 +328,12 @@ func BenchmarkLoadBackupMeta10240(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, metautil.MetaFile, data) + err = store.WriteFile(ctx, MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - metautil.NewMetaReader( + NewMetaReader( meta, store, &backuppb.CipherInfo{ diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index d7811574915f2..c824a34ca64fa 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -89,7 +89,7 @@ type Client struct { tlsConf *tls.Config keepaliveConf keepalive.ClientParameters - databases map[string]*utils.Database + databases map[string]*metautil.Database ddlJobs []*model.Job // store tables need to rebase info like auto id and random id and so on after create table @@ -358,7 +358,7 @@ func (rc *Client) InitBackupMeta( backend *backuppb.StorageBackend, reader *metautil.MetaReader) error { if !backupMeta.IsRawKv { - databases, err := utils.LoadBackupTables(c, reader) + databases, err := metautil.LoadBackupTables(c, reader) if err != nil { return errors.Trace(err) } @@ -525,8 +525,8 @@ func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pd } // GetDatabases returns all databases. -func (rc *Client) GetDatabases() []*utils.Database { - dbs := make([]*utils.Database, 0, len(rc.databases)) +func (rc *Client) GetDatabases() []*metautil.Database { + dbs := make([]*metautil.Database, 0, len(rc.databases)) for _, db := range rc.databases { dbs = append(dbs, db) } @@ -534,14 +534,14 @@ func (rc *Client) GetDatabases() []*utils.Database { } // GetDatabase returns a database by name. -func (rc *Client) GetDatabase(name string) *utils.Database { +func (rc *Client) GetDatabase(name string) *metautil.Database { return rc.databases[name] } // HasBackedUpSysDB whether we have backed up system tables // br backs system tables up since 5.1.0 func (rc *Client) HasBackedUpSysDB() bool { - temporaryDB := utils.TemporaryDBName(mysql.SystemDB) + temporaryDB := metautil.TemporaryDBName(mysql.SystemDB) _, backedUp := rc.databases[temporaryDB.O] return backedUp } @@ -927,8 +927,8 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau log.Info("checking target cluster system table compatibility with backed up data") privilegeTablesInBackup := make([]*metautil.Table, 0) for _, table := range tables { - decodedSysDBName, ok := utils.GetSysDBCIStrName(table.DB.Name) - if ok && utils.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableMap[table.Info.Name.L] != "" { + decodedSysDBName, ok := metautil.GetSysDBCIStrName(table.DB.Name) + if ok && metautil.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableMap[table.Info.Name.L] != "" { privilegeTablesInBackup = append(privilegeTablesInBackup, table) } } @@ -1792,8 +1792,8 @@ func (rc *Client) FixIndex(ctx context.Context, schema, table, index string) err } sql := fmt.Sprintf("ADMIN RECOVER INDEX %s %s;", - utils.EncloseDBAndTable(schema, table), - utils.EncloseName(index)) + metautil.EncloseDBAndTable(schema, table), + metautil.EncloseName(index)) log.Debug("Executing fix index sql.", zap.String("sql", sql)) err := rc.db.se.Execute(ctx, sql) if err != nil { @@ -1809,7 +1809,7 @@ func (rc *Client) FixIndicesOfTables( onProgress func(), ) error { for _, table := range fullBackupTables { - if name, ok := utils.GetSysDBName(table.DB.Name); utils.IsSysDB(name) && ok { + if name, ok := metautil.GetSysDBName(table.DB.Name); metautil.IsSysDB(name) && ok { // skip system table for now onProgress() continue @@ -2101,7 +2101,7 @@ func (rc *Client) InitSchemasReplaceForDDL( dbMap := make(map[stream.OldID]*stream.DBReplace) for _, t := range *tables { - name, _ := utils.GetSysDBName(t.DB.Name) + name, _ := metautil.GetSysDBName(t.DB.Name) dbName := model.NewCIStr(name) newDBInfo, exist := rc.GetDBSchema(rc.GetDomain(), dbName) if !exist { @@ -2647,7 +2647,7 @@ func (rc *Client) SetWithSysTable(withSysTable bool) { } // MockClient create a fake client used to test. -func MockClient(dbs map[string]*utils.Database) *Client { +func MockClient(dbs map[string]*metautil.Database) *Client { return &Client{databases: dbs} } diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index ae943a96f276b..55139f1534838 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -24,7 +24,6 @@ import ( "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/br/pkg/stream" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -190,7 +189,7 @@ func TestCheckSysTableCompatibility(t *testing.T) { dbSchema, isExist := info.SchemaByName(model.NewCIStr(mysql.SystemDB)) require.True(t, isExist) tmpSysDB := dbSchema.Clone() - tmpSysDB.Name = utils.TemporaryDBName(mysql.SystemDB) + tmpSysDB.Name = metautil.TemporaryDBName(mysql.SystemDB) sysDB := model.NewCIStr(mysql.SystemDB) userTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("user")) require.NoError(t, err) diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index ae62162c3e890..310f76d4820cf 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -12,7 +12,6 @@ import ( "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -116,7 +115,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { } if tableInfo != nil { - switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) + switchDBSQL := fmt.Sprintf("use %s;", metautil.EncloseName(ddlJob.SchemaName)) err = db.se.Execute(ctx, switchDBSQL) if err != nil { log.Error("switch db failed", @@ -183,8 +182,8 @@ func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error var err error if table.Info.IsSequence() { setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O)) + metautil.EncloseName(table.DB.Name.O), + metautil.EncloseName(table.Info.Name.O)) if table.Info.Sequence.Cycle { increment := table.Info.Sequence.Increment // TiDB sequence's behaviour is designed to keep the same pace @@ -193,8 +192,8 @@ func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error // https://github.com/pingcap/br/pull/242#issuecomment-631307978 // TODO use sql to set cycle round nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O)) + metautil.EncloseName(table.DB.Name.O), + metautil.EncloseName(table.Info.Name.O)) var setValSQL string if increment < 0 { setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) @@ -248,17 +247,17 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table, } // only table exists in restored cluster during incremental restoration should do alter after creation. case toBeCorrectedTables[UniqueTableName{table.DB.Name.String(), table.Info.Name.String()}]: - if utils.NeedAutoID(table.Info) { + if metautil.NeedAutoID(table.Info) { restoreMetaSQL = fmt.Sprintf( "alter table %s.%s auto_increment = %d;", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O), + metautil.EncloseName(table.DB.Name.O), + metautil.EncloseName(table.Info.Name.O), table.Info.AutoIncID) } else if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { restoreMetaSQL = fmt.Sprintf( "alter table %s.%s auto_random_base = %d", - utils.EncloseName(table.DB.Name.O), - utils.EncloseName(table.Info.Name.O), + metautil.EncloseName(table.DB.Name.O), + metautil.EncloseName(table.Info.Name.O), table.Info.AutoRandID) } else { log.Info("table exists in incremental ddl jobs, but don't need to be altered", diff --git a/br/pkg/restore/systable_restore.go b/br/pkg/restore/systable_restore.go index 02ea0860d5425..7502fea1c365c 100644 --- a/br/pkg/restore/systable_restore.go +++ b/br/pkg/restore/systable_restore.go @@ -11,7 +11,7 @@ import ( "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" filter "github.com/pingcap/tidb/util/table-filter" @@ -76,7 +76,7 @@ func isStatsTable(tableName string) bool { func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) { sysDB := mysql.SystemDB - temporaryDB := utils.TemporaryDBName(sysDB) + temporaryDB := metautil.TemporaryDBName(sysDB) defer rc.cleanTemporaryDatabase(ctx, sysDB) if !f.MatchSchema(sysDB) || !rc.withSysTable { @@ -133,7 +133,7 @@ func (rc *Client) getDatabaseByName(name string) (*database, bool) { db := &database{ ExistingTables: map[string]*model.TableInfo{}, Name: model.NewCIStr(name), - TemporaryName: utils.TemporaryDBName(name), + TemporaryName: metautil.TemporaryDBName(name), } for _, t := range schema.Tables { db.ExistingTables[t.Name.L] = t @@ -210,7 +210,7 @@ func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model log.Info("full cluster restore, delete existing data", zap.String("table", tableName), zap.Stringer("schema", db.Name)) deleteSQL := fmt.Sprintf("DELETE FROM %s %s;", - utils.EncloseDBAndTable(db.Name.L, tableName), whereClause) + metautil.EncloseDBAndTable(db.Name.L, tableName), whereClause) if err := execSQL(deleteSQL); err != nil { return err } @@ -221,28 +221,28 @@ func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model // target column order may different with source cluster columnNames := make([]string, 0, len(ti.Columns)) for _, col := range ti.Columns { - columnNames = append(columnNames, utils.EncloseName(col.Name.L)) + columnNames = append(columnNames, metautil.EncloseName(col.Name.L)) } colListStr := strings.Join(columnNames, ",") replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s(%s) SELECT %s FROM %s %s;", - utils.EncloseDBAndTable(db.Name.L, tableName), + metautil.EncloseDBAndTable(db.Name.L, tableName), colListStr, colListStr, - utils.EncloseDBAndTable(db.TemporaryName.L, tableName), + metautil.EncloseDBAndTable(db.TemporaryName.L, tableName), whereClause) return execSQL(replaceIntoSQL) } renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;", - utils.EncloseDBAndTable(db.TemporaryName.L, tableName), - utils.EncloseDBAndTable(db.Name.L, tableName), + metautil.EncloseDBAndTable(db.TemporaryName.L, tableName), + metautil.EncloseDBAndTable(db.Name.L, tableName), ) return execSQL(renameSQL) } func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) { - database := utils.TemporaryDBName(originDB) + database := metautil.TemporaryDBName(originDB) log.Debug("dropping temporary database", zap.Stringer("database", database)) - sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L)) + sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", metautil.EncloseName(database.L)) if err := rc.db.se.Execute(ctx, sql); err != nil { logutil.WarnTerm("failed to drop temporary database, it should be dropped manually", zap.Stringer("database", database), diff --git a/br/pkg/restore/tiflashrec/tiflash_recorder.go b/br/pkg/restore/tiflashrec/tiflash_recorder.go index 31dde982a7b69..4adfdb08b0de9 100644 --- a/br/pkg/restore/tiflashrec/tiflash_recorder.go +++ b/br/pkg/restore/tiflashrec/tiflash_recorder.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/utils" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" @@ -99,7 +99,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s } items = append(items, fmt.Sprintf( "ALTER TABLE %s %s", - utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), + metautil.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), altTableSpec), ) }) diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 73a4411c445c1..af6fe657adfc2 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -20,10 +20,10 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" @@ -575,8 +575,8 @@ func ZapTables(tables []CreatedTable) zapcore.Field { names := make([]string, 0, len(tables)) for _, t := range tables { names = append(names, fmt.Sprintf("%s.%s", - utils.EncloseName(t.OldTable.DB.Name.String()), - utils.EncloseName(t.OldTable.Info.Name.String()))) + metautil.EncloseName(t.OldTable.DB.Name.String()), + metautil.EncloseName(t.OldTable.Info.Name.String()))) } return names }) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index c557a79e3ac8f..2b9c33946a876 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -19,6 +19,7 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/spf13/pflag" "go.uber.org/zap" ) @@ -30,6 +31,8 @@ const ( azblobAccountKey = "azblob.account-key" ) +const maxRetry = 5 + // AzblobBackendOptions is the options for Azure Blob storage. type AzblobBackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` @@ -280,10 +283,23 @@ func (s *AzureBlobStorage) WriteFile(ctx context.Context, name string, data []by // ReadFile reads a file from Azure Blob Storage. func (s *AzureBlobStorage) ReadFile(ctx context.Context, name string) ([]byte, error) { client := s.containerClient.NewBlockBlobClient(s.withPrefix(name)) - resp, err := client.Download(ctx, nil) - if err != nil { - return nil, errors.Annotatef(err, "Failed to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) + var resp *azblob.DownloadResponse + var err error + for retryTimes := 0; ; retryTimes++ { + if retryTimes == maxRetry { + return nil, errors.Annotatef(err, "Failed to retry to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) + } + resp, err = client.Download(ctx, nil) + if err != nil { + if utils.MessageIsRetryableStorageError(err.Error()) { + log.Warn("Failed to download azure blob file, file info", zap.String("bucket(container)", s.options.Bucket), zap.String("key", s.withPrefix(name)), zap.Int("retry", retryTimes), zap.Error(err)) + continue + } + return nil, errors.Annotatef(err, "Failed to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) + } + break } + defer resp.RawResponse.Body.Close() data, err := io.ReadAll(resp.Body(azblob.RetryReaderOptions{})) if err != nil { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 2d04f916d98ec..7b093e19fd7b5 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -27,7 +27,6 @@ import ( "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/sessionctx/variable" filter "github.com/pingcap/tidb/util/table-filter" "github.com/spf13/cobra" @@ -506,13 +505,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { if len(db) == 0 { return errors.Annotate(berrors.ErrInvalidArgument, "empty database name is not allowed") } - cfg.Schemas[utils.EncloseName(db)] = struct{}{} + cfg.Schemas[metautil.EncloseName(db)] = struct{}{} if tblFlag := flags.Lookup(flagTable); tblFlag != nil { tbl := tblFlag.Value.String() if len(tbl) == 0 { return errors.Annotate(berrors.ErrInvalidArgument, "empty table name is not allowed") } - cfg.Tables[utils.EncloseDBAndTable(db, tbl)] = struct{}{} + cfg.Tables[metautil.EncloseDBAndTable(db, tbl)] = struct{}{} cfg.TableFilter = filter.NewTablesFilter(filter.Table{ Schema: db, Name: tbl, diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 83c22a29e61db..0147f79b3ff0d 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -421,16 +421,16 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error { tablesMap := make(map[string]struct{}) for _, db := range schemas { dbName := db.Info.Name.L - if dbCIStrName, ok := utils.GetSysDBCIStrName(db.Info.Name); utils.IsSysDB(dbCIStrName.O) && ok { + if dbCIStrName, ok := metautil.GetSysDBCIStrName(db.Info.Name); metautil.IsSysDB(dbCIStrName.O) && ok { dbName = dbCIStrName.L } - schemasMap[utils.EncloseName(dbName)] = struct{}{} + schemasMap[metautil.EncloseName(dbName)] = struct{}{} for _, table := range db.Tables { if table.Info == nil { // we may back up empty database. continue } - tablesMap[utils.EncloseDBAndTable(dbName, table.Info.Name.L)] = struct{}{} + tablesMap[metautil.EncloseDBAndTable(dbName, table.Info.Name.L)] = struct{}{} } } restoreSchemas := cfg.Schemas @@ -797,10 +797,10 @@ func dropToBlackhole( func filterRestoreFiles( client *restore.Client, cfg *RestoreConfig, -) (files []*backuppb.File, tables []*metautil.Table, dbs []*utils.Database) { +) (files []*backuppb.File, tables []*metautil.Table, dbs []*metautil.Database) { for _, db := range client.GetDatabases() { dbName := db.Info.Name.O - if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { + if name, ok := metautil.GetSysDBName(db.Info.Name); metautil.IsSysDB(name) && ok { dbName = name } if !cfg.TableFilter.MatchSchema(dbName) { diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index b13ecf0eccc08..66982a0626530 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -16,7 +16,6 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/tablecodec" @@ -77,15 +76,15 @@ func TestCheckRestoreDBAndTable(t *testing.T) { cases := []struct { cfgSchemas map[string]struct{} cfgTables map[string]struct{} - backupDBs map[string]*utils.Database + backupDBs map[string]*metautil.Database }{ { cfgSchemas: map[string]struct{}{ - utils.EncloseName("test"): {}, + metautil.EncloseName("test"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("test", "t"): {}, - utils.EncloseDBAndTable("test", "t2"): {}, + metautil.EncloseDBAndTable("test", "t"): {}, + metautil.EncloseDBAndTable("test", "t2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"T", "T2"}, @@ -93,11 +92,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("mysql"): {}, + metautil.EncloseName("mysql"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("mysql", "t"): {}, - utils.EncloseDBAndTable("mysql", "t2"): {}, + metautil.EncloseDBAndTable("mysql", "t"): {}, + metautil.EncloseDBAndTable("mysql", "t2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "__TiDB_BR_Temporary_mysql": {"T", "T2"}, @@ -105,11 +104,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("test"): {}, + metautil.EncloseName("test"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("test", "T"): {}, - utils.EncloseDBAndTable("test", "T2"): {}, + metautil.EncloseDBAndTable("test", "T"): {}, + metautil.EncloseDBAndTable("test", "T2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"t", "t2"}, @@ -117,11 +116,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("TEST"): {}, + metautil.EncloseName("TEST"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("TEST", "t"): {}, - utils.EncloseDBAndTable("TEST", "T2"): {}, + metautil.EncloseDBAndTable("TEST", "t"): {}, + metautil.EncloseDBAndTable("TEST", "T2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"t", "t2"}, @@ -129,11 +128,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("TeSt"): {}, + metautil.EncloseName("TeSt"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("TeSt", "tabLe"): {}, - utils.EncloseDBAndTable("TeSt", "taBle2"): {}, + metautil.EncloseDBAndTable("TeSt", "tabLe"): {}, + metautil.EncloseDBAndTable("TeSt", "taBle2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "TesT": {"TablE", "taBle2"}, @@ -141,13 +140,13 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - utils.EncloseName("TeSt"): {}, - utils.EncloseName("MYSQL"): {}, + metautil.EncloseName("TeSt"): {}, + metautil.EncloseName("MYSQL"): {}, }, cfgTables: map[string]struct{}{ - utils.EncloseDBAndTable("TeSt", "tabLe"): {}, - utils.EncloseDBAndTable("TeSt", "taBle2"): {}, - utils.EncloseDBAndTable("MYSQL", "taBle"): {}, + metautil.EncloseDBAndTable("TeSt", "tabLe"): {}, + metautil.EncloseDBAndTable("TeSt", "taBle2"): {}, + metautil.EncloseDBAndTable("MYSQL", "taBle"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "TesT": {"table", "TaBLE2"}, @@ -167,7 +166,7 @@ func TestCheckRestoreDBAndTable(t *testing.T) { } } -func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) map[string]*utils.Database { +func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) map[string]*metautil.Database { testDir := t.TempDir() store, err := storage.NewLocalStorage(testDir) require.NoError(t, err) @@ -236,7 +235,7 @@ func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) err = store.WriteFile(ctx, metautil.MetaFile, data) require.NoError(t, err) - dbs, err := utils.LoadBackupTables( + dbs, err := metautil.LoadBackupTables( ctx, metautil.NewMetaReader( meta, diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index b33459bb98e2e..6fae8ddf563e0 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1526,7 +1526,7 @@ func initFullBackupTables( // read full backup databases to get map[table]table.Info reader := metautil.NewMetaReader(backupMeta, s, nil) - databases, err := utils.LoadBackupTables(ctx, reader) + databases, err := metautil.LoadBackupTables(ctx, reader) if err != nil { return nil, errors.Trace(err) } @@ -1534,7 +1534,7 @@ func initFullBackupTables( tables := make(map[int64]*metautil.Table) for _, db := range databases { dbName := db.Info.Name.O - if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { + if name, ok := metautil.GetSysDBName(db.Info.Name); metautil.IsSysDB(name) && ok { dbName = name } @@ -1562,7 +1562,7 @@ func initRewriteRules(client *restore.Client, tables map[int64]*metautil.Table) // compare table exists in cluster and map[table]table.Info to get rewrite rules. rules := make(map[int64]*restore.RewriteRules) for _, t := range tables { - if name, ok := utils.GetSysDBName(t.DB.Name); utils.IsSysDB(name) && ok { + if name, ok := metautil.GetSysDBName(t.DB.Name); metautil.IsSysDB(name) && ok { // skip system table for now continue } @@ -1604,7 +1604,7 @@ func updateRewriteRules(rules map[int64]*restore.RewriteRules, schemasReplace *s for _, dbReplace := range schemasReplace.DbMap { if dbReplace.OldDBInfo == nil || - utils.IsSysDB(dbReplace.OldDBInfo.Name.O) || + metautil.IsSysDB(dbReplace.OldDBInfo.Name.O) || !filter.MatchSchema(dbReplace.OldDBInfo.Name.O) { continue } From 41697c9a50425a6a0053993322c9621d4cc34e1f Mon Sep 17 00:00:00 2001 From: Leavrth Date: Tue, 6 Dec 2022 13:42:09 +0800 Subject: [PATCH 2/5] Revert "add retry for azblob read file" This reverts commit 8608f42dccb333b92c79ae9c54c2284a96e6b13f. --- br/cmd/br/cmd.go | 3 +- br/cmd/br/debug.go | 4 +- br/pkg/backup/client.go | 2 +- br/pkg/backup/schema.go | 8 ++-- br/pkg/backup/schema_test.go | 3 +- br/pkg/restore/client.go | 26 +++++----- br/pkg/restore/client_test.go | 3 +- br/pkg/restore/db.go | 21 +++++---- br/pkg/restore/systable_restore.go | 22 ++++----- br/pkg/restore/tiflashrec/tiflash_recorder.go | 4 +- br/pkg/restore/util.go | 6 +-- br/pkg/storage/azblob.go | 22 ++------- br/pkg/task/common.go | 5 +- br/pkg/task/restore.go | 10 ++-- br/pkg/task/restore_test.go | 47 ++++++++++--------- br/pkg/task/stream.go | 8 ++-- br/pkg/{metautil => utils}/schema.go | 13 ++--- br/pkg/{metautil => utils}/schema_test.go | 23 ++++----- 18 files changed, 110 insertions(+), 120 deletions(-) rename br/pkg/{metautil => utils}/schema.go (90%) rename br/pkg/{metautil => utils}/schema_test.go (94%) diff --git a/br/cmd/br/cmd.go b/br/cmd/br/cmd.go index bbcce18fffb6c..f3aeb3393df52 100644 --- a/br/cmd/br/cmd.go +++ b/br/cmd/br/cmd.go @@ -14,7 +14,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/gluetidb" - "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/task" @@ -35,7 +34,7 @@ var ( filterOutSysAndMemTables = []string{ "*.*", - fmt.Sprintf("!%s.*", metautil.TemporaryDBName("*")), + fmt.Sprintf("!%s.*", utils.TemporaryDBName("*")), "!mysql.*", "mysql.user", "mysql.db", diff --git a/br/cmd/br/debug.go b/br/cmd/br/debug.go index 179bebdd17eee..c62dd677f9a66 100644 --- a/br/cmd/br/debug.go +++ b/br/cmd/br/debug.go @@ -80,7 +80,7 @@ func newCheckSumCommand() *cobra.Command { } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := metautil.LoadBackupTables(ctx, reader) + dbs, err := utils.LoadBackupTables(ctx, reader) if err != nil { return errors.Trace(err) } @@ -182,7 +182,7 @@ func newBackupMetaValidateCommand() *cobra.Command { return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - dbs, err := metautil.LoadBackupTables(ctx, reader) + dbs, err := utils.LoadBackupTables(ctx, reader) if err != nil { log.Error("load tables failed", zap.Error(err)) return errors.Trace(err) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 26310aa0c7db6..0241789e65103 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -570,7 +570,7 @@ func BuildBackupRangeAndSchema( switch { case tableInfo.IsSequence(): globalAutoID, err = seqAlloc.NextGlobalAutoID() - case tableInfo.IsView() || !metautil.NeedAutoID(tableInfo): + case tableInfo.IsView() || !utils.NeedAutoID(tableInfo): // no auto ID for views or table without either rowID nor auto_increment ID. default: globalAutoID, err = idAlloc.NextGlobalAutoID() diff --git a/br/pkg/backup/schema.go b/br/pkg/backup/schema.go index 9425f0aca2ad6..bb0cf7f884189 100644 --- a/br/pkg/backup/schema.go +++ b/br/pkg/backup/schema.go @@ -65,13 +65,13 @@ func (ss *Schemas) AddSchema( dbInfo *model.DBInfo, tableInfo *model.TableInfo, ) { if tableInfo == nil { - ss.schemas[metautil.EncloseName(dbInfo.Name.L)] = &schemaInfo{ + ss.schemas[utils.EncloseName(dbInfo.Name.L)] = &schemaInfo{ dbInfo: dbInfo, } return } name := fmt.Sprintf("%s.%s", - metautil.EncloseName(dbInfo.Name.L), metautil.EncloseName(tableInfo.Name.L)) + utils.EncloseName(dbInfo.Name.L), utils.EncloseName(tableInfo.Name.L)) ss.schemas[name] = &schemaInfo{ tableInfo: tableInfo, dbInfo: dbInfo, @@ -106,8 +106,8 @@ func (ss *Schemas) BackupSchemas( schema := s // Because schema.dbInfo is a pointer that many tables point to. // Remove "add Temporary-prefix into dbName" from closure to prevent concurrent operations. - if metautil.IsSysDB(schema.dbInfo.Name.L) { - schema.dbInfo.Name = metautil.TemporaryDBName(schema.dbInfo.Name.O) + if utils.IsSysDB(schema.dbInfo.Name.L) { + schema.dbInfo.Name = utils.TemporaryDBName(schema.dbInfo.Name.O) } var checksum *checkpoint.ChecksumItem diff --git a/br/pkg/backup/schema_test.go b/br/pkg/backup/schema_test.go index f02b3cbd271b2..08d560bf03c25 100644 --- a/br/pkg/backup/schema_test.go +++ b/br/pkg/backup/schema_test.go @@ -17,6 +17,7 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" filter "github.com/pingcap/tidb/util/table-filter" @@ -313,7 +314,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) { schemas2 := GetSchemasFromMeta(t, es2) require.Len(t, schemas2, systemTablesCount) for _, schema := range schemas2 { - require.Equal(t, metautil.TemporaryDBName("mysql"), schema.DB.Name) + require.Equal(t, utils.TemporaryDBName("mysql"), schema.DB.Name) require.Equal(t, true, strings.HasPrefix(schema.Info.Name.O, tablePrefix)) } } diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index c824a34ca64fa..d7811574915f2 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -89,7 +89,7 @@ type Client struct { tlsConf *tls.Config keepaliveConf keepalive.ClientParameters - databases map[string]*metautil.Database + databases map[string]*utils.Database ddlJobs []*model.Job // store tables need to rebase info like auto id and random id and so on after create table @@ -358,7 +358,7 @@ func (rc *Client) InitBackupMeta( backend *backuppb.StorageBackend, reader *metautil.MetaReader) error { if !backupMeta.IsRawKv { - databases, err := metautil.LoadBackupTables(c, reader) + databases, err := utils.LoadBackupTables(c, reader) if err != nil { return errors.Trace(err) } @@ -525,8 +525,8 @@ func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pd } // GetDatabases returns all databases. -func (rc *Client) GetDatabases() []*metautil.Database { - dbs := make([]*metautil.Database, 0, len(rc.databases)) +func (rc *Client) GetDatabases() []*utils.Database { + dbs := make([]*utils.Database, 0, len(rc.databases)) for _, db := range rc.databases { dbs = append(dbs, db) } @@ -534,14 +534,14 @@ func (rc *Client) GetDatabases() []*metautil.Database { } // GetDatabase returns a database by name. -func (rc *Client) GetDatabase(name string) *metautil.Database { +func (rc *Client) GetDatabase(name string) *utils.Database { return rc.databases[name] } // HasBackedUpSysDB whether we have backed up system tables // br backs system tables up since 5.1.0 func (rc *Client) HasBackedUpSysDB() bool { - temporaryDB := metautil.TemporaryDBName(mysql.SystemDB) + temporaryDB := utils.TemporaryDBName(mysql.SystemDB) _, backedUp := rc.databases[temporaryDB.O] return backedUp } @@ -927,8 +927,8 @@ func (rc *Client) CheckSysTableCompatibility(dom *domain.Domain, tables []*metau log.Info("checking target cluster system table compatibility with backed up data") privilegeTablesInBackup := make([]*metautil.Table, 0) for _, table := range tables { - decodedSysDBName, ok := metautil.GetSysDBCIStrName(table.DB.Name) - if ok && metautil.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableMap[table.Info.Name.L] != "" { + decodedSysDBName, ok := utils.GetSysDBCIStrName(table.DB.Name) + if ok && utils.IsSysDB(decodedSysDBName.L) && sysPrivilegeTableMap[table.Info.Name.L] != "" { privilegeTablesInBackup = append(privilegeTablesInBackup, table) } } @@ -1792,8 +1792,8 @@ func (rc *Client) FixIndex(ctx context.Context, schema, table, index string) err } sql := fmt.Sprintf("ADMIN RECOVER INDEX %s %s;", - metautil.EncloseDBAndTable(schema, table), - metautil.EncloseName(index)) + utils.EncloseDBAndTable(schema, table), + utils.EncloseName(index)) log.Debug("Executing fix index sql.", zap.String("sql", sql)) err := rc.db.se.Execute(ctx, sql) if err != nil { @@ -1809,7 +1809,7 @@ func (rc *Client) FixIndicesOfTables( onProgress func(), ) error { for _, table := range fullBackupTables { - if name, ok := metautil.GetSysDBName(table.DB.Name); metautil.IsSysDB(name) && ok { + if name, ok := utils.GetSysDBName(table.DB.Name); utils.IsSysDB(name) && ok { // skip system table for now onProgress() continue @@ -2101,7 +2101,7 @@ func (rc *Client) InitSchemasReplaceForDDL( dbMap := make(map[stream.OldID]*stream.DBReplace) for _, t := range *tables { - name, _ := metautil.GetSysDBName(t.DB.Name) + name, _ := utils.GetSysDBName(t.DB.Name) dbName := model.NewCIStr(name) newDBInfo, exist := rc.GetDBSchema(rc.GetDomain(), dbName) if !exist { @@ -2647,7 +2647,7 @@ func (rc *Client) SetWithSysTable(withSysTable bool) { } // MockClient create a fake client used to test. -func MockClient(dbs map[string]*metautil.Database) *Client { +func MockClient(dbs map[string]*utils.Database) *Client { return &Client{databases: dbs} } diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index 55139f1534838..ae943a96f276b 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/restore/tiflashrec" "github.com/pingcap/tidb/br/pkg/stream" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -189,7 +190,7 @@ func TestCheckSysTableCompatibility(t *testing.T) { dbSchema, isExist := info.SchemaByName(model.NewCIStr(mysql.SystemDB)) require.True(t, isExist) tmpSysDB := dbSchema.Clone() - tmpSysDB.Name = metautil.TemporaryDBName(mysql.SystemDB) + tmpSysDB.Name = utils.TemporaryDBName(mysql.SystemDB) sysDB := model.NewCIStr(mysql.SystemDB) userTI, err := client.GetTableSchema(cluster.Domain, sysDB, model.NewCIStr("user")) require.NoError(t, err) diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index 310f76d4820cf..ae62162c3e890 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -12,6 +12,7 @@ import ( "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" prealloctableid "github.com/pingcap/tidb/br/pkg/restore/prealloc_table_id" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" @@ -115,7 +116,7 @@ func (db *DB) ExecDDL(ctx context.Context, ddlJob *model.Job) error { } if tableInfo != nil { - switchDBSQL := fmt.Sprintf("use %s;", metautil.EncloseName(ddlJob.SchemaName)) + switchDBSQL := fmt.Sprintf("use %s;", utils.EncloseName(ddlJob.SchemaName)) err = db.se.Execute(ctx, switchDBSQL) if err != nil { log.Error("switch db failed", @@ -182,8 +183,8 @@ func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error var err error if table.Info.IsSequence() { setValFormat := fmt.Sprintf("do setval(%s.%s, %%d);", - metautil.EncloseName(table.DB.Name.O), - metautil.EncloseName(table.Info.Name.O)) + utils.EncloseName(table.DB.Name.O), + utils.EncloseName(table.Info.Name.O)) if table.Info.Sequence.Cycle { increment := table.Info.Sequence.Increment // TiDB sequence's behaviour is designed to keep the same pace @@ -192,8 +193,8 @@ func (db *DB) restoreSequence(ctx context.Context, table *metautil.Table) error // https://github.com/pingcap/br/pull/242#issuecomment-631307978 // TODO use sql to set cycle round nextSeqSQL := fmt.Sprintf("do nextval(%s.%s);", - metautil.EncloseName(table.DB.Name.O), - metautil.EncloseName(table.Info.Name.O)) + utils.EncloseName(table.DB.Name.O), + utils.EncloseName(table.Info.Name.O)) var setValSQL string if increment < 0 { setValSQL = fmt.Sprintf(setValFormat, table.Info.Sequence.MinValue) @@ -247,17 +248,17 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table, } // only table exists in restored cluster during incremental restoration should do alter after creation. case toBeCorrectedTables[UniqueTableName{table.DB.Name.String(), table.Info.Name.String()}]: - if metautil.NeedAutoID(table.Info) { + if utils.NeedAutoID(table.Info) { restoreMetaSQL = fmt.Sprintf( "alter table %s.%s auto_increment = %d;", - metautil.EncloseName(table.DB.Name.O), - metautil.EncloseName(table.Info.Name.O), + utils.EncloseName(table.DB.Name.O), + utils.EncloseName(table.Info.Name.O), table.Info.AutoIncID) } else if table.Info.PKIsHandle && table.Info.ContainsAutoRandomBits() { restoreMetaSQL = fmt.Sprintf( "alter table %s.%s auto_random_base = %d", - metautil.EncloseName(table.DB.Name.O), - metautil.EncloseName(table.Info.Name.O), + utils.EncloseName(table.DB.Name.O), + utils.EncloseName(table.Info.Name.O), table.Info.AutoRandID) } else { log.Info("table exists in incremental ddl jobs, but don't need to be altered", diff --git a/br/pkg/restore/systable_restore.go b/br/pkg/restore/systable_restore.go index 7502fea1c365c..02ea0860d5425 100644 --- a/br/pkg/restore/systable_restore.go +++ b/br/pkg/restore/systable_restore.go @@ -11,7 +11,7 @@ import ( "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" filter "github.com/pingcap/tidb/util/table-filter" @@ -76,7 +76,7 @@ func isStatsTable(tableName string) bool { func (rc *Client) RestoreSystemSchemas(ctx context.Context, f filter.Filter) { sysDB := mysql.SystemDB - temporaryDB := metautil.TemporaryDBName(sysDB) + temporaryDB := utils.TemporaryDBName(sysDB) defer rc.cleanTemporaryDatabase(ctx, sysDB) if !f.MatchSchema(sysDB) || !rc.withSysTable { @@ -133,7 +133,7 @@ func (rc *Client) getDatabaseByName(name string) (*database, bool) { db := &database{ ExistingTables: map[string]*model.TableInfo{}, Name: model.NewCIStr(name), - TemporaryName: metautil.TemporaryDBName(name), + TemporaryName: utils.TemporaryDBName(name), } for _, t := range schema.Tables { db.ExistingTables[t.Name.L] = t @@ -210,7 +210,7 @@ func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model log.Info("full cluster restore, delete existing data", zap.String("table", tableName), zap.Stringer("schema", db.Name)) deleteSQL := fmt.Sprintf("DELETE FROM %s %s;", - metautil.EncloseDBAndTable(db.Name.L, tableName), whereClause) + utils.EncloseDBAndTable(db.Name.L, tableName), whereClause) if err := execSQL(deleteSQL); err != nil { return err } @@ -221,28 +221,28 @@ func (rc *Client) replaceTemporaryTableToSystable(ctx context.Context, ti *model // target column order may different with source cluster columnNames := make([]string, 0, len(ti.Columns)) for _, col := range ti.Columns { - columnNames = append(columnNames, metautil.EncloseName(col.Name.L)) + columnNames = append(columnNames, utils.EncloseName(col.Name.L)) } colListStr := strings.Join(columnNames, ",") replaceIntoSQL := fmt.Sprintf("REPLACE INTO %s(%s) SELECT %s FROM %s %s;", - metautil.EncloseDBAndTable(db.Name.L, tableName), + utils.EncloseDBAndTable(db.Name.L, tableName), colListStr, colListStr, - metautil.EncloseDBAndTable(db.TemporaryName.L, tableName), + utils.EncloseDBAndTable(db.TemporaryName.L, tableName), whereClause) return execSQL(replaceIntoSQL) } renameSQL := fmt.Sprintf("RENAME TABLE %s TO %s;", - metautil.EncloseDBAndTable(db.TemporaryName.L, tableName), - metautil.EncloseDBAndTable(db.Name.L, tableName), + utils.EncloseDBAndTable(db.TemporaryName.L, tableName), + utils.EncloseDBAndTable(db.Name.L, tableName), ) return execSQL(renameSQL) } func (rc *Client) cleanTemporaryDatabase(ctx context.Context, originDB string) { - database := metautil.TemporaryDBName(originDB) + database := utils.TemporaryDBName(originDB) log.Debug("dropping temporary database", zap.Stringer("database", database)) - sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", metautil.EncloseName(database.L)) + sql := fmt.Sprintf("DROP DATABASE IF EXISTS %s", utils.EncloseName(database.L)) if err := rc.db.se.Execute(ctx, sql); err != nil { logutil.WarnTerm("failed to drop temporary database, it should be dropped manually", zap.Stringer("database", database), diff --git a/br/pkg/restore/tiflashrec/tiflash_recorder.go b/br/pkg/restore/tiflashrec/tiflash_recorder.go index 4adfdb08b0de9..31dde982a7b69 100644 --- a/br/pkg/restore/tiflashrec/tiflash_recorder.go +++ b/br/pkg/restore/tiflashrec/tiflash_recorder.go @@ -20,7 +20,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/format" @@ -99,7 +99,7 @@ func (r *TiFlashRecorder) GenerateAlterTableDDLs(info infoschema.InfoSchema) []s } items = append(items, fmt.Sprintf( "ALTER TABLE %s %s", - metautil.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), + utils.EncloseDBAndTable(schema.Name.O, table.Meta().Name.O), altTableSpec), ) }) diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index af6fe657adfc2..73a4411c445c1 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -20,10 +20,10 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" - "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/redact" "github.com/pingcap/tidb/br/pkg/restore/split" "github.com/pingcap/tidb/br/pkg/rtree" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" @@ -575,8 +575,8 @@ func ZapTables(tables []CreatedTable) zapcore.Field { names := make([]string, 0, len(tables)) for _, t := range tables { names = append(names, fmt.Sprintf("%s.%s", - metautil.EncloseName(t.OldTable.DB.Name.String()), - metautil.EncloseName(t.OldTable.Info.Name.String()))) + utils.EncloseName(t.OldTable.DB.Name.String()), + utils.EncloseName(t.OldTable.Info.Name.String()))) } return names }) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index 2b9c33946a876..c557a79e3ac8f 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -19,7 +19,6 @@ import ( backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/log" berrors "github.com/pingcap/tidb/br/pkg/errors" - "github.com/pingcap/tidb/br/pkg/utils" "github.com/spf13/pflag" "go.uber.org/zap" ) @@ -31,8 +30,6 @@ const ( azblobAccountKey = "azblob.account-key" ) -const maxRetry = 5 - // AzblobBackendOptions is the options for Azure Blob storage. type AzblobBackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` @@ -283,23 +280,10 @@ func (s *AzureBlobStorage) WriteFile(ctx context.Context, name string, data []by // ReadFile reads a file from Azure Blob Storage. func (s *AzureBlobStorage) ReadFile(ctx context.Context, name string) ([]byte, error) { client := s.containerClient.NewBlockBlobClient(s.withPrefix(name)) - var resp *azblob.DownloadResponse - var err error - for retryTimes := 0; ; retryTimes++ { - if retryTimes == maxRetry { - return nil, errors.Annotatef(err, "Failed to retry to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) - } - resp, err = client.Download(ctx, nil) - if err != nil { - if utils.MessageIsRetryableStorageError(err.Error()) { - log.Warn("Failed to download azure blob file, file info", zap.String("bucket(container)", s.options.Bucket), zap.String("key", s.withPrefix(name)), zap.Int("retry", retryTimes), zap.Error(err)) - continue - } - return nil, errors.Annotatef(err, "Failed to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) - } - break + resp, err := client.Download(ctx, nil) + if err != nil { + return nil, errors.Annotatef(err, "Failed to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) } - defer resp.RawResponse.Body.Close() data, err := io.ReadAll(resp.Body(azblob.RetryReaderOptions{})) if err != nil { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 7b093e19fd7b5..2d04f916d98ec 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/sessionctx/variable" filter "github.com/pingcap/tidb/util/table-filter" "github.com/spf13/cobra" @@ -505,13 +506,13 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error { if len(db) == 0 { return errors.Annotate(berrors.ErrInvalidArgument, "empty database name is not allowed") } - cfg.Schemas[metautil.EncloseName(db)] = struct{}{} + cfg.Schemas[utils.EncloseName(db)] = struct{}{} if tblFlag := flags.Lookup(flagTable); tblFlag != nil { tbl := tblFlag.Value.String() if len(tbl) == 0 { return errors.Annotate(berrors.ErrInvalidArgument, "empty table name is not allowed") } - cfg.Tables[metautil.EncloseDBAndTable(db, tbl)] = struct{}{} + cfg.Tables[utils.EncloseDBAndTable(db, tbl)] = struct{}{} cfg.TableFilter = filter.NewTablesFilter(filter.Table{ Schema: db, Name: tbl, diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 0147f79b3ff0d..83c22a29e61db 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -421,16 +421,16 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error { tablesMap := make(map[string]struct{}) for _, db := range schemas { dbName := db.Info.Name.L - if dbCIStrName, ok := metautil.GetSysDBCIStrName(db.Info.Name); metautil.IsSysDB(dbCIStrName.O) && ok { + if dbCIStrName, ok := utils.GetSysDBCIStrName(db.Info.Name); utils.IsSysDB(dbCIStrName.O) && ok { dbName = dbCIStrName.L } - schemasMap[metautil.EncloseName(dbName)] = struct{}{} + schemasMap[utils.EncloseName(dbName)] = struct{}{} for _, table := range db.Tables { if table.Info == nil { // we may back up empty database. continue } - tablesMap[metautil.EncloseDBAndTable(dbName, table.Info.Name.L)] = struct{}{} + tablesMap[utils.EncloseDBAndTable(dbName, table.Info.Name.L)] = struct{}{} } } restoreSchemas := cfg.Schemas @@ -797,10 +797,10 @@ func dropToBlackhole( func filterRestoreFiles( client *restore.Client, cfg *RestoreConfig, -) (files []*backuppb.File, tables []*metautil.Table, dbs []*metautil.Database) { +) (files []*backuppb.File, tables []*metautil.Table, dbs []*utils.Database) { for _, db := range client.GetDatabases() { dbName := db.Info.Name.O - if name, ok := metautil.GetSysDBName(db.Info.Name); metautil.IsSysDB(name) && ok { + if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { dbName = name } if !cfg.TableFilter.MatchSchema(dbName) { diff --git a/br/pkg/task/restore_test.go b/br/pkg/task/restore_test.go index 66982a0626530..b13ecf0eccc08 100644 --- a/br/pkg/task/restore_test.go +++ b/br/pkg/task/restore_test.go @@ -16,6 +16,7 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/tablecodec" @@ -76,15 +77,15 @@ func TestCheckRestoreDBAndTable(t *testing.T) { cases := []struct { cfgSchemas map[string]struct{} cfgTables map[string]struct{} - backupDBs map[string]*metautil.Database + backupDBs map[string]*utils.Database }{ { cfgSchemas: map[string]struct{}{ - metautil.EncloseName("test"): {}, + utils.EncloseName("test"): {}, }, cfgTables: map[string]struct{}{ - metautil.EncloseDBAndTable("test", "t"): {}, - metautil.EncloseDBAndTable("test", "t2"): {}, + utils.EncloseDBAndTable("test", "t"): {}, + utils.EncloseDBAndTable("test", "t2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"T", "T2"}, @@ -92,11 +93,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - metautil.EncloseName("mysql"): {}, + utils.EncloseName("mysql"): {}, }, cfgTables: map[string]struct{}{ - metautil.EncloseDBAndTable("mysql", "t"): {}, - metautil.EncloseDBAndTable("mysql", "t2"): {}, + utils.EncloseDBAndTable("mysql", "t"): {}, + utils.EncloseDBAndTable("mysql", "t2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "__TiDB_BR_Temporary_mysql": {"T", "T2"}, @@ -104,11 +105,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - metautil.EncloseName("test"): {}, + utils.EncloseName("test"): {}, }, cfgTables: map[string]struct{}{ - metautil.EncloseDBAndTable("test", "T"): {}, - metautil.EncloseDBAndTable("test", "T2"): {}, + utils.EncloseDBAndTable("test", "T"): {}, + utils.EncloseDBAndTable("test", "T2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"t", "t2"}, @@ -116,11 +117,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - metautil.EncloseName("TEST"): {}, + utils.EncloseName("TEST"): {}, }, cfgTables: map[string]struct{}{ - metautil.EncloseDBAndTable("TEST", "t"): {}, - metautil.EncloseDBAndTable("TEST", "T2"): {}, + utils.EncloseDBAndTable("TEST", "t"): {}, + utils.EncloseDBAndTable("TEST", "T2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "test": {"t", "t2"}, @@ -128,11 +129,11 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - metautil.EncloseName("TeSt"): {}, + utils.EncloseName("TeSt"): {}, }, cfgTables: map[string]struct{}{ - metautil.EncloseDBAndTable("TeSt", "tabLe"): {}, - metautil.EncloseDBAndTable("TeSt", "taBle2"): {}, + utils.EncloseDBAndTable("TeSt", "tabLe"): {}, + utils.EncloseDBAndTable("TeSt", "taBle2"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "TesT": {"TablE", "taBle2"}, @@ -140,13 +141,13 @@ func TestCheckRestoreDBAndTable(t *testing.T) { }, { cfgSchemas: map[string]struct{}{ - metautil.EncloseName("TeSt"): {}, - metautil.EncloseName("MYSQL"): {}, + utils.EncloseName("TeSt"): {}, + utils.EncloseName("MYSQL"): {}, }, cfgTables: map[string]struct{}{ - metautil.EncloseDBAndTable("TeSt", "tabLe"): {}, - metautil.EncloseDBAndTable("TeSt", "taBle2"): {}, - metautil.EncloseDBAndTable("MYSQL", "taBle"): {}, + utils.EncloseDBAndTable("TeSt", "tabLe"): {}, + utils.EncloseDBAndTable("TeSt", "taBle2"): {}, + utils.EncloseDBAndTable("MYSQL", "taBle"): {}, }, backupDBs: mockReadSchemasFromBackupMeta(t, map[string][]string{ "TesT": {"table", "TaBLE2"}, @@ -166,7 +167,7 @@ func TestCheckRestoreDBAndTable(t *testing.T) { } } -func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) map[string]*metautil.Database { +func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) map[string]*utils.Database { testDir := t.TempDir() store, err := storage.NewLocalStorage(testDir) require.NoError(t, err) @@ -235,7 +236,7 @@ func mockReadSchemasFromBackupMeta(t *testing.T, db2Tables map[string][]string) err = store.WriteFile(ctx, metautil.MetaFile, data) require.NoError(t, err) - dbs, err := metautil.LoadBackupTables( + dbs, err := utils.LoadBackupTables( ctx, metautil.NewMetaReader( meta, diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 6fae8ddf563e0..b33459bb98e2e 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1526,7 +1526,7 @@ func initFullBackupTables( // read full backup databases to get map[table]table.Info reader := metautil.NewMetaReader(backupMeta, s, nil) - databases, err := metautil.LoadBackupTables(ctx, reader) + databases, err := utils.LoadBackupTables(ctx, reader) if err != nil { return nil, errors.Trace(err) } @@ -1534,7 +1534,7 @@ func initFullBackupTables( tables := make(map[int64]*metautil.Table) for _, db := range databases { dbName := db.Info.Name.O - if name, ok := metautil.GetSysDBName(db.Info.Name); metautil.IsSysDB(name) && ok { + if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { dbName = name } @@ -1562,7 +1562,7 @@ func initRewriteRules(client *restore.Client, tables map[int64]*metautil.Table) // compare table exists in cluster and map[table]table.Info to get rewrite rules. rules := make(map[int64]*restore.RewriteRules) for _, t := range tables { - if name, ok := metautil.GetSysDBName(t.DB.Name); metautil.IsSysDB(name) && ok { + if name, ok := utils.GetSysDBName(t.DB.Name); utils.IsSysDB(name) && ok { // skip system table for now continue } @@ -1604,7 +1604,7 @@ func updateRewriteRules(rules map[int64]*restore.RewriteRules, schemasReplace *s for _, dbReplace := range schemasReplace.DbMap { if dbReplace.OldDBInfo == nil || - metautil.IsSysDB(dbReplace.OldDBInfo.Name.O) || + utils.IsSysDB(dbReplace.OldDBInfo.Name.O) || !filter.MatchSchema(dbReplace.OldDBInfo.Name.O) { continue } diff --git a/br/pkg/metautil/schema.go b/br/pkg/utils/schema.go similarity index 90% rename from br/pkg/metautil/schema.go rename to br/pkg/utils/schema.go index d5ffedd5a2dc9..8ceba24e140ad 100644 --- a/br/pkg/metautil/schema.go +++ b/br/pkg/utils/schema.go @@ -1,6 +1,6 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package metautil +package utils import ( "context" @@ -9,6 +9,7 @@ import ( "github.com/pingcap/errors" backuppb "github.com/pingcap/kvproto/pkg/brpb" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" ) @@ -26,11 +27,11 @@ func NeedAutoID(tblInfo *model.TableInfo) bool { // Database wraps the schema and tables of a database. type Database struct { Info *model.DBInfo - Tables []*Table + Tables []*metautil.Table } // GetTable returns a table of the database by name. -func (db *Database) GetTable(name string) *Table { +func (db *Database) GetTable(name string) *metautil.Table { for _, table := range db.Tables { if table.Info.Name.String() == name { return table @@ -40,8 +41,8 @@ func (db *Database) GetTable(name string) *Table { } // LoadBackupTables loads schemas from BackupMeta. -func LoadBackupTables(ctx context.Context, reader *MetaReader) (map[string]*Database, error) { - ch := make(chan *Table) +func LoadBackupTables(ctx context.Context, reader *metautil.MetaReader) (map[string]*Database, error) { + ch := make(chan *metautil.Table) errCh := make(chan error) go func() { if err := reader.ReadSchemasFiles(ctx, ch); err != nil { @@ -67,7 +68,7 @@ func LoadBackupTables(ctx context.Context, reader *MetaReader) (map[string]*Data if !ok { db = &Database{ Info: table.DB, - Tables: make([]*Table, 0), + Tables: make([]*metautil.Table, 0), } databases[dbName] = db } diff --git a/br/pkg/metautil/schema_test.go b/br/pkg/utils/schema_test.go similarity index 94% rename from br/pkg/metautil/schema_test.go rename to br/pkg/utils/schema_test.go index b36265da00bf9..95d649000d9ce 100644 --- a/br/pkg/metautil/schema_test.go +++ b/br/pkg/utils/schema_test.go @@ -1,6 +1,6 @@ // Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0. -package metautil +package utils import ( "context" @@ -11,6 +11,7 @@ import ( "github.com/golang/protobuf/proto" backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/kvproto/pkg/encryptionpb" + "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/statistics/handle" @@ -83,12 +84,12 @@ func TestLoadBackupMeta(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = store.WriteFile(ctx, MetaFile, data) + err = store.WriteFile(ctx, metautil.MetaFile, data) require.NoError(t, err) dbs, err := LoadBackupTables( ctx, - NewMetaReader( + metautil.NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -178,12 +179,12 @@ func TestLoadBackupMetaPartionTable(t *testing.T) { require.NoError(t, err) ctx := context.Background() - err = store.WriteFile(ctx, MetaFile, data) + err = store.WriteFile(ctx, metautil.MetaFile, data) require.NoError(t, err) dbs, err := LoadBackupTables( ctx, - NewMetaReader( + metautil.NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -264,12 +265,12 @@ func BenchmarkLoadBackupMeta64(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, MetaFile, data) + err = store.WriteFile(ctx, metautil.MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - NewMetaReader( + metautil.NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -296,12 +297,12 @@ func BenchmarkLoadBackupMeta1024(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, MetaFile, data) + err = store.WriteFile(ctx, metautil.MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - NewMetaReader( + metautil.NewMetaReader( meta, store, &backuppb.CipherInfo{ @@ -328,12 +329,12 @@ func BenchmarkLoadBackupMeta10240(b *testing.B) { require.NoError(b, err) ctx := context.Background() - err = store.WriteFile(ctx, MetaFile, data) + err = store.WriteFile(ctx, metautil.MetaFile, data) require.NoError(b, err) dbs, err := LoadBackupTables( ctx, - NewMetaReader( + metautil.NewMetaReader( meta, store, &backuppb.CipherInfo{ From d7fb3d136f39396e66df6b27b0e26ef2c2e785be Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 9 Dec 2022 10:42:20 +0800 Subject: [PATCH 3/5] add retry when read stream the response --- br/pkg/storage/azblob.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index c557a79e3ac8f..65c3da1637d58 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -285,7 +285,9 @@ func (s *AzureBlobStorage) ReadFile(ctx context.Context, name string) ([]byte, e return nil, errors.Annotatef(err, "Failed to download azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) } defer resp.RawResponse.Body.Close() - data, err := io.ReadAll(resp.Body(azblob.RetryReaderOptions{})) + data, err := io.ReadAll(resp.Body(azblob.RetryReaderOptions{ + MaxRetryRequests: 3, + })) if err != nil { return nil, errors.Annotatef(err, "Failed to read azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) } From 92e2558fab721756aacbb91a470a4d6f2930951a Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 9 Dec 2022 14:31:06 +0800 Subject: [PATCH 4/5] add retry when read stream the response Signed-off-by: Leavrth --- br/pkg/storage/azblob.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/br/pkg/storage/azblob.go b/br/pkg/storage/azblob.go index 65c3da1637d58..41d8fa88f559f 100644 --- a/br/pkg/storage/azblob.go +++ b/br/pkg/storage/azblob.go @@ -12,6 +12,7 @@ import ( "path" "strings" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/google/uuid" @@ -30,6 +31,16 @@ const ( azblobAccountKey = "azblob.account-key" ) +const azblobRetryTimes int32 = 5 + +func getDefaultClientOptions() *azblob.ClientOptions { + return &azblob.ClientOptions{ + Retry: policy.RetryOptions{ + MaxRetries: azblobRetryTimes, + }, + } +} + // AzblobBackendOptions is the options for Azure Blob storage. type AzblobBackendOptions struct { Endpoint string `json:"endpoint" toml:"endpoint"` @@ -99,7 +110,7 @@ type sharedKeyClientBuilder struct { } func (b *sharedKeyClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { - return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, nil) + return azblob.NewServiceClientWithSharedKey(b.serviceURL, b.cred, getDefaultClientOptions()) } func (b *sharedKeyClientBuilder) GetAccountName() string { @@ -114,7 +125,7 @@ type tokenClientBuilder struct { } func (b *tokenClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { - return azblob.NewServiceClient(b.serviceURL, b.cred, nil) + return azblob.NewServiceClient(b.serviceURL, b.cred, getDefaultClientOptions()) } func (b *tokenClientBuilder) GetAccountName() string { @@ -286,7 +297,7 @@ func (s *AzureBlobStorage) ReadFile(ctx context.Context, name string) ([]byte, e } defer resp.RawResponse.Body.Close() data, err := io.ReadAll(resp.Body(azblob.RetryReaderOptions{ - MaxRetryRequests: 3, + MaxRetryRequests: int(azblobRetryTimes), })) if err != nil { return nil, errors.Annotatef(err, "Failed to read azure blob file, file info: bucket(container)='%s', key='%s'", s.options.Bucket, s.withPrefix(name)) From 1969647f27b42176c6de1d96c51094f07c635254 Mon Sep 17 00:00:00 2001 From: Leavrth Date: Fri, 9 Dec 2022 16:14:57 +0800 Subject: [PATCH 5/5] add unit test Signed-off-by: Leavrth --- br/pkg/storage/BUILD.bazel | 1 + br/pkg/storage/azblob_test.go | 53 +++++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index c67a17713b2ca..8c98a13e59500 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "@com_github_aws_aws_sdk_go//service/s3", "@com_github_aws_aws_sdk_go//service/s3/s3iface", "@com_github_aws_aws_sdk_go//service/s3/s3manager", + "@com_github_azure_azure_sdk_for_go_sdk_azcore//policy", "@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity", "@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob", "@com_github_golang_snappy//:snappy", diff --git a/br/pkg/storage/azblob_test.go b/br/pkg/storage/azblob_test.go index c099037ea51b2..74ddfa7125699 100644 --- a/br/pkg/storage/azblob_test.go +++ b/br/pkg/storage/azblob_test.go @@ -4,9 +4,13 @@ package storage import ( "context" + "fmt" "io" + "net/http" + "net/http/httptest" "os" "strings" + "sync" "testing" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" @@ -298,3 +302,52 @@ func TestNewAzblobStorage(t *testing.T) { require.Equal(t, "http://127.0.0.1:1000", b.serviceURL) } } + +type fakeClientBuilder struct { + Endpoint string +} + +func (b *fakeClientBuilder) GetServiceClient() (azblob.ServiceClient, error) { + connStr := fmt.Sprintf("DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=%s/devstoreaccount1;", b.Endpoint) + return azblob.NewServiceClientFromConnectionString(connStr, getDefaultClientOptions()) +} + +func (b *fakeClientBuilder) GetAccountName() string { + return "devstoreaccount1" +} + +func TestDownloadRetry(t *testing.T) { + var count int32 = 0 + var lock sync.Mutex + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Log(r.URL) + if strings.Contains(r.URL.String(), "restype=container") { + w.WriteHeader(201) + return + } + lock.Lock() + count += 1 + lock.Unlock() + header := w.Header() + header.Add("Etag", "0x1") + header.Add("Content-Length", "5") + w.WriteHeader(200) + w.Write([]byte("1234567")) + })) + + defer server.Close() + t.Log(server.URL) + + options := &backuppb.AzureBlobStorage{ + Bucket: "test", + Prefix: "a/b/", + } + + ctx := context.Background() + builder := &fakeClientBuilder{Endpoint: server.URL} + s, err := newAzureBlobStorageWithClientBuilder(ctx, options, builder) + require.NoError(t, err) + _, err = s.ReadFile(ctx, "c") + require.Error(t, err) + require.Less(t, azblobRetryTimes, count) +}