Skip to content

Commit

Permalink
br: implement backup & restore policy info (pingcap#33007)
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored Mar 15, 2022
1 parent 911cf53 commit e0db77e
Show file tree
Hide file tree
Showing 18 changed files with 563 additions and 142 deletions.
8 changes: 4 additions & 4 deletions br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newFullBackupCommand() *cobra.Command {
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
// empty db/table means full backup.
return runBackupCommand(command, "Full backup")
return runBackupCommand(command, task.FullBackupCmd)
},
}
task.DefineFilterFlags(command, acceptAllTables)
Expand All @@ -119,7 +119,7 @@ func newDBBackupCommand() *cobra.Command {
Short: "backup a database",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runBackupCommand(command, "Database backup")
return runBackupCommand(command, task.DBBackupCmd)
},
}
task.DefineDatabaseFlags(command)
Expand All @@ -133,7 +133,7 @@ func newTableBackupCommand() *cobra.Command {
Short: "backup a table",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runBackupCommand(command, "Table backup")
return runBackupCommand(command, task.TableBackupCmd)
},
}
task.DefineTableFlags(command)
Expand All @@ -148,7 +148,7 @@ func newRawBackupCommand() *cobra.Command {
Short: "(experimental) backup a raw kv range from TiKV cluster",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runBackupRawCommand(command, "Raw backup")
return runBackupRawCommand(command, task.RawBackupCmd)
},
}

Expand Down
8 changes: 4 additions & 4 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func newFullRestoreCommand() *cobra.Command {
Short: "restore all tables",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Full restore")
return runRestoreCommand(cmd, task.FullRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemTables)
Expand All @@ -108,7 +108,7 @@ func newDBRestoreCommand() *cobra.Command {
Short: "restore tables in a database from the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Database restore")
return runRestoreCommand(cmd, task.DBRestoreCmd)
},
}
task.DefineDatabaseFlags(command)
Expand All @@ -121,7 +121,7 @@ func newTableRestoreCommand() *cobra.Command {
Short: "restore a table from the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Table restore")
return runRestoreCommand(cmd, task.TableRestoreCmd)
},
}
task.DefineTableFlags(command)
Expand All @@ -134,7 +134,7 @@ func newRawRestoreCommand() *cobra.Command {
Short: "(experimental) restore a raw kv range to TiKV cluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreRawCommand(cmd, "Raw restore")
return runRestoreRawCommand(cmd, task.RawRestoreCmd)
},
}

Expand Down
54 changes: 46 additions & 8 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,34 @@ func BuildBackupRangeAndSchema(
storage kv.Storage,
tableFilter filter.Filter,
backupTS uint64,
) ([]rtree.Range, *Schemas, error) {
isFullBackup bool,
) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)

var policies []*backuppb.PlacementPolicy
if isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290
// only full backup will record policies in backupMeta.
policyList, err := m.ListPolicies()
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
policies = make([]*backuppb.PlacementPolicy, 0, len(policies))
for _, policyInfo := range policyList {
p, err := json.Marshal(policyInfo)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
policies = append(policies, &backuppb.PlacementPolicy{Info: p})
}
}

ranges := make([]rtree.Range, 0)
backupSchemas := newBackupSchemas()
dbs, err := m.ListDatabases()
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}

for _, dbInfo := range dbs {
Expand All @@ -301,7 +320,7 @@ func BuildBackupRangeAndSchema(

tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}

if len(tables) == 0 {
Expand All @@ -310,6 +329,12 @@ func BuildBackupRangeAndSchema(
continue
}

if !isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
dbInfo.PlacementPolicyRef = nil
}

for _, tableInfo := range tables {
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
Expand All @@ -336,16 +361,21 @@ func BuildBackupRangeAndSchema(
globalAutoID, err = idAlloc.NextGlobalAutoID()
}
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
tableInfo.AutoIncID = globalAutoID
if !isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
tableInfo.PlacementPolicyRef = nil
}

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = randAlloc.NextGlobalAutoID()
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
tableInfo.AutoRandID = globalAutoRandID
logger.Debug("change table AutoRandID",
Expand All @@ -368,7 +398,7 @@ func BuildBackupRangeAndSchema(

tableRanges, err := BuildTableRanges(tableInfo)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
for _, r := range tableRanges {
ranges = append(ranges, rtree.Range{
Expand All @@ -381,9 +411,9 @@ func BuildBackupRangeAndSchema(

if backupSchemas.Len() == 0 {
log.Info("nothing to backup")
return nil, nil, nil
return nil, nil, nil, nil
}
return ranges, backupSchemas, nil
return ranges, backupSchemas, policies, nil
}

func skipUnsupportedDDLJob(job *model.Job) bool {
Expand Down Expand Up @@ -442,6 +472,14 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.PlacementPolicyRef = nil
}
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
Expand Down
32 changes: 19 additions & 13 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,41 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
// Table t1 is not exist.
testFilter, err := filter.Parse([]string{"test.t1"})
require.NoError(t, err)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Nil(t, backupSchemas)

// Database is not exist.
fooFilter, err := filter.Parse([]string{"foo.t1"})
require.NoError(t, err)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
m.Storage, fooFilter, math.MaxUint64)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(
m.Storage, fooFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Nil(t, backupSchemas)

// Empty database.
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"})
require.NoError(t, err)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Nil(t, backupSchemas)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1 (a int);")
tk.MustExec("insert into t1 values (10);")
tk.MustExec("create placement policy fivereplicas followers=4;")

_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64)
var policies []*backuppb.PlacementPolicy
_, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())
// we expect no policies collected, because it's not full backup.
require.Equal(t, 0, len(policies))
updateCh := new(simpleProgress)
skipChecksum := false
es := GetRandomStorage(t)
Expand Down Expand Up @@ -155,10 +159,12 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
tk.MustExec("insert into t2 values (10);")
tk.MustExec("insert into t2 values (11);")

_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64)
_, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64, true)
require.NoError(t, err)
require.Equal(t, 2, backupSchemas.Len())
// we expect the policy fivereplicas collected in full backup.
require.Equal(t, 1, len(policies))
updateCh.reset()

es2 := GetRandomStorage(t)
Expand Down Expand Up @@ -204,7 +210,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
f, err := filter.Parse([]string{"test.t3"})
require.NoError(t, err)

_, backupSchemas, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())

Expand Down Expand Up @@ -238,7 +244,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
// recover the statistics.
tk.MustExec("analyze table t3;")

_, backupSchemas, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())

Expand Down Expand Up @@ -280,7 +286,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) {

f, err := filter.Parse([]string{"mysql.systable*"})
require.NoError(t, err)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, systemTablesCount, backupSchemas.Len())

Expand Down
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Session interface {
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
}

Expand Down
7 changes: 7 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo)

}

// CreatePlacementPolicy implements glue.Session.
func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error {
d := domain.GetDomain(gs.se).DDL()
// the default behaviour is ignoring duplicated policy during restore.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
d := domain.GetDomain(gs.se).DDL()
Expand Down
Loading

0 comments on commit e0db77e

Please sign in to comment.