Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: implement backup & restore policy info #33007

Merged
merged 27 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions br/cmd/br/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func newFullBackupCommand() *cobra.Command {
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
// empty db/table means full backup.
return runBackupCommand(command, "Full backup")
return runBackupCommand(command, task.FullBackupCmd)
},
}
task.DefineFilterFlags(command, acceptAllTables)
Expand All @@ -119,7 +119,7 @@ func newDBBackupCommand() *cobra.Command {
Short: "backup a database",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runBackupCommand(command, "Database backup")
return runBackupCommand(command, task.DBBackupCmd)
},
}
task.DefineDatabaseFlags(command)
Expand All @@ -133,7 +133,7 @@ func newTableBackupCommand() *cobra.Command {
Short: "backup a table",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runBackupCommand(command, "Table backup")
return runBackupCommand(command, task.TableBackupCmd)
},
}
task.DefineTableFlags(command)
Expand All @@ -148,7 +148,7 @@ func newRawBackupCommand() *cobra.Command {
Short: "(experimental) backup a raw kv range from TiKV cluster",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return runBackupRawCommand(command, "Raw backup")
return runBackupRawCommand(command, task.RawBackupCmd)
},
}

Expand Down
8 changes: 4 additions & 4 deletions br/cmd/br/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func newFullRestoreCommand() *cobra.Command {
Short: "restore all tables",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Full restore")
return runRestoreCommand(cmd, task.FullRestoreCmd)
},
}
task.DefineFilterFlags(command, filterOutSysAndMemTables)
Expand All @@ -108,7 +108,7 @@ func newDBRestoreCommand() *cobra.Command {
Short: "restore tables in a database from the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Database restore")
return runRestoreCommand(cmd, task.DBRestoreCmd)
},
}
task.DefineDatabaseFlags(command)
Expand All @@ -121,7 +121,7 @@ func newTableRestoreCommand() *cobra.Command {
Short: "restore a table from the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreCommand(cmd, "Table restore")
return runRestoreCommand(cmd, task.TableRestoreCmd)
},
}
task.DefineTableFlags(command)
Expand All @@ -134,7 +134,7 @@ func newRawRestoreCommand() *cobra.Command {
Short: "(experimental) restore a raw kv range to TiKV cluster",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return runRestoreRawCommand(cmd, "Raw restore")
return runRestoreRawCommand(cmd, task.RawRestoreCmd)
},
}

Expand Down
54 changes: 46 additions & 8 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,15 +282,34 @@ func BuildBackupRangeAndSchema(
storage kv.Storage,
tableFilter filter.Filter,
backupTS uint64,
) ([]rtree.Range, *Schemas, error) {
isFullBackup bool,
) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)

var policies []*backuppb.PlacementPolicy
if isFullBackup {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
// according to https://github.com/pingcap/tidb/issues/32290
// only full backup will record policies in backupMeta.
policyList, err := m.ListPolicies()
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
policies = make([]*backuppb.PlacementPolicy, 0, len(policies))
for _, policyInfo := range policyList {
p, err := json.Marshal(policyInfo)
if err != nil {
return nil, nil, nil, errors.Trace(err)
}
policies = append(policies, &backuppb.PlacementPolicy{Info: p})
}
}

ranges := make([]rtree.Range, 0)
backupSchemas := newBackupSchemas()
dbs, err := m.ListDatabases()
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}

for _, dbInfo := range dbs {
Expand All @@ -301,7 +320,7 @@ func BuildBackupRangeAndSchema(

tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}

if len(tables) == 0 {
Expand All @@ -310,6 +329,12 @@ func BuildBackupRangeAndSchema(
continue
}

if !isFullBackup {
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
dbInfo.PlacementPolicyRef = nil
}

for _, tableInfo := range tables {
if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) {
// Skip tables other than the given table.
Expand All @@ -336,16 +361,21 @@ func BuildBackupRangeAndSchema(
globalAutoID, err = idAlloc.NextGlobalAutoID()
}
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
tableInfo.AutoIncID = globalAutoID
if !isFullBackup {
3pointer marked this conversation as resolved.
Show resolved Hide resolved
// according to https://github.com/pingcap/tidb/issues/32290.
// ignore placement policy when not in full backup
tableInfo.PlacementPolicyRef = nil
}

if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = randAlloc.NextGlobalAutoID()
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
tableInfo.AutoRandID = globalAutoRandID
logger.Debug("change table AutoRandID",
Expand All @@ -368,7 +398,7 @@ func BuildBackupRangeAndSchema(

tableRanges, err := BuildTableRanges(tableInfo)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, nil, errors.Trace(err)
}
for _, r := range tableRanges {
ranges = append(ranges, rtree.Range{
Expand All @@ -381,9 +411,9 @@ func BuildBackupRangeAndSchema(

if backupSchemas.Len() == 0 {
log.Info("nothing to backup")
return nil, nil, nil
return nil, nil, nil, nil
}
return ranges, backupSchemas, nil
return ranges, backupSchemas, policies, nil
}

func skipUnsupportedDDLJob(job *model.Job) bool {
Expand Down Expand Up @@ -442,6 +472,14 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.PlacementPolicyRef = nil
}
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
Expand Down
32 changes: 19 additions & 13 deletions br/pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,41 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
// Table t1 is not exist.
testFilter, err := filter.Parse([]string{"test.t1"})
require.NoError(t, err)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Nil(t, backupSchemas)

// Database is not exist.
fooFilter, err := filter.Parse([]string{"foo.t1"})
require.NoError(t, err)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
m.Storage, fooFilter, math.MaxUint64)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(
m.Storage, fooFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Nil(t, backupSchemas)

// Empty database.
// Filter out system tables manually.
noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"})
require.NoError(t, err)
_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Nil(t, backupSchemas)

tk.MustExec("use test")
tk.MustExec("drop table if exists t1;")
tk.MustExec("create table t1 (a int);")
tk.MustExec("insert into t1 values (10);")
tk.MustExec("create placement policy fivereplicas followers=4;")

_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64)
var policies []*backuppb.PlacementPolicy
_, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema(
m.Storage, testFilter, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())
// we expect no policies collected, because it's not full backup.
require.Equal(t, 0, len(policies))
updateCh := new(simpleProgress)
skipChecksum := false
es := GetRandomStorage(t)
Expand Down Expand Up @@ -155,10 +159,12 @@ func TestBuildBackupRangeAndSchema(t *testing.T) {
tk.MustExec("insert into t2 values (10);")
tk.MustExec("insert into t2 values (11);")

_, backupSchemas, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64)
_, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema(
m.Storage, noFilter, math.MaxUint64, true)
require.NoError(t, err)
require.Equal(t, 2, backupSchemas.Len())
// we expect the policy fivereplicas collected in full backup.
require.Equal(t, 1, len(policies))
updateCh.reset()

es2 := GetRandomStorage(t)
Expand Down Expand Up @@ -204,7 +210,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
f, err := filter.Parse([]string{"test.t3"})
require.NoError(t, err)

_, backupSchemas, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())

Expand Down Expand Up @@ -238,7 +244,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) {
// recover the statistics.
tk.MustExec("analyze table t3;")

_, backupSchemas, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64)
_, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, 1, backupSchemas.Len())

Expand Down Expand Up @@ -280,7 +286,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) {

f, err := filter.Parse([]string{"mysql.systable*"})
require.NoError(t, err)
_, backupSchemas, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64)
_, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false)
require.NoError(t, err)
require.Equal(t, systemTablesCount, backupSchemas.Len())

Expand Down
1 change: 1 addition & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Session interface {
ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error
CreateDatabase(ctx context.Context, schema *model.DBInfo) error
CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error
CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error
Close()
}

Expand Down
7 changes: 7 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo)

}

// CreatePlacementPolicy implements glue.Session.
func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error {
d := domain.GetDomain(gs.se).DDL()
// the default behaviour is ignoring duplicated policy during restore.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
}

// CreateTables implements glue.BatchCreateTableSession.
func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
d := domain.GetDomain(gs.se).DDL()
Expand Down
Loading