diff --git a/br/cmd/br/backup.go b/br/cmd/br/backup.go index 442672fad871e..c6b49b43f21b1 100644 --- a/br/cmd/br/backup.go +++ b/br/cmd/br/backup.go @@ -105,7 +105,7 @@ func newFullBackupCommand() *cobra.Command { Args: cobra.NoArgs, RunE: func(command *cobra.Command, _ []string) error { // empty db/table means full backup. - return runBackupCommand(command, "Full backup") + return runBackupCommand(command, task.FullBackupCmd) }, } task.DefineFilterFlags(command, acceptAllTables) @@ -119,7 +119,7 @@ func newDBBackupCommand() *cobra.Command { Short: "backup a database", Args: cobra.NoArgs, RunE: func(command *cobra.Command, _ []string) error { - return runBackupCommand(command, "Database backup") + return runBackupCommand(command, task.DBBackupCmd) }, } task.DefineDatabaseFlags(command) @@ -133,7 +133,7 @@ func newTableBackupCommand() *cobra.Command { Short: "backup a table", Args: cobra.NoArgs, RunE: func(command *cobra.Command, _ []string) error { - return runBackupCommand(command, "Table backup") + return runBackupCommand(command, task.TableBackupCmd) }, } task.DefineTableFlags(command) @@ -148,7 +148,7 @@ func newRawBackupCommand() *cobra.Command { Short: "(experimental) backup a raw kv range from TiKV cluster", Args: cobra.NoArgs, RunE: func(command *cobra.Command, _ []string) error { - return runBackupRawCommand(command, "Raw backup") + return runBackupRawCommand(command, task.RawBackupCmd) }, } diff --git a/br/cmd/br/restore.go b/br/cmd/br/restore.go index c57712ba8bddc..32d04994c793e 100644 --- a/br/cmd/br/restore.go +++ b/br/cmd/br/restore.go @@ -95,7 +95,7 @@ func newFullRestoreCommand() *cobra.Command { Short: "restore all tables", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { - return runRestoreCommand(cmd, "Full restore") + return runRestoreCommand(cmd, task.FullRestoreCmd) }, } task.DefineFilterFlags(command, filterOutSysAndMemTables) @@ -108,7 +108,7 @@ func newDBRestoreCommand() *cobra.Command { Short: "restore tables in a database from the backup data", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { - return runRestoreCommand(cmd, "Database restore") + return runRestoreCommand(cmd, task.DBRestoreCmd) }, } task.DefineDatabaseFlags(command) @@ -121,7 +121,7 @@ func newTableRestoreCommand() *cobra.Command { Short: "restore a table from the backup data", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { - return runRestoreCommand(cmd, "Table restore") + return runRestoreCommand(cmd, task.TableRestoreCmd) }, } task.DefineTableFlags(command) @@ -134,7 +134,7 @@ func newRawRestoreCommand() *cobra.Command { Short: "(experimental) restore a raw kv range to TiKV cluster", Args: cobra.NoArgs, RunE: func(cmd *cobra.Command, _ []string) error { - return runRestoreRawCommand(cmd, "Raw restore") + return runRestoreRawCommand(cmd, task.RawRestoreCmd) }, } diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 8933945e115c4..d6f59944b8564 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -282,15 +282,34 @@ func BuildBackupRangeAndSchema( storage kv.Storage, tableFilter filter.Filter, backupTS uint64, -) ([]rtree.Range, *Schemas, error) { + isFullBackup bool, +) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) { snapshot := storage.GetSnapshot(kv.NewVersion(backupTS)) m := meta.NewSnapshotMeta(snapshot) + var policies []*backuppb.PlacementPolicy + if isFullBackup { + // according to https://github.com/pingcap/tidb/issues/32290 + // only full backup will record policies in backupMeta. + policyList, err := m.ListPolicies() + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + policies = make([]*backuppb.PlacementPolicy, 0, len(policies)) + for _, policyInfo := range policyList { + p, err := json.Marshal(policyInfo) + if err != nil { + return nil, nil, nil, errors.Trace(err) + } + policies = append(policies, &backuppb.PlacementPolicy{Info: p}) + } + } + ranges := make([]rtree.Range, 0) backupSchemas := newBackupSchemas() dbs, err := m.ListDatabases() if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } for _, dbInfo := range dbs { @@ -301,7 +320,7 @@ func BuildBackupRangeAndSchema( tables, err := m.ListTables(dbInfo.ID) if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } if len(tables) == 0 { @@ -310,6 +329,12 @@ func BuildBackupRangeAndSchema( continue } + if !isFullBackup { + // according to https://github.com/pingcap/tidb/issues/32290. + // ignore placement policy when not in full backup + dbInfo.PlacementPolicyRef = nil + } + for _, tableInfo := range tables { if !tableFilter.MatchTable(dbInfo.Name.O, tableInfo.Name.O) { // Skip tables other than the given table. @@ -336,16 +361,21 @@ func BuildBackupRangeAndSchema( globalAutoID, err = idAlloc.NextGlobalAutoID() } if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } tableInfo.AutoIncID = globalAutoID + if !isFullBackup { + // according to https://github.com/pingcap/tidb/issues/32290. + // ignore placement policy when not in full backup + tableInfo.PlacementPolicyRef = nil + } if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() { // this table has auto_random id, we need backup and rebase in restoration var globalAutoRandID int64 globalAutoRandID, err = randAlloc.NextGlobalAutoID() if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } tableInfo.AutoRandID = globalAutoRandID logger.Debug("change table AutoRandID", @@ -368,7 +398,7 @@ func BuildBackupRangeAndSchema( tableRanges, err := BuildTableRanges(tableInfo) if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, nil, errors.Trace(err) } for _, r := range tableRanges { ranges = append(ranges, rtree.Range{ @@ -381,9 +411,9 @@ func BuildBackupRangeAndSchema( if backupSchemas.Len() == 0 { log.Info("nothing to backup") - return nil, nil, nil + return nil, nil, nil, nil } - return ranges, backupSchemas, nil + return ranges, backupSchemas, policies, nil } func skipUnsupportedDDLJob(job *model.Job) bool { @@ -442,6 +472,14 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, store kv.Storage, lastB if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) { + if job.BinlogInfo.DBInfo != nil { + // ignore all placement policy info during incremental backup for now. + job.BinlogInfo.DBInfo.PlacementPolicyRef = nil + } + if job.BinlogInfo.TableInfo != nil { + // ignore all placement policy info during incremental backup for now. + job.BinlogInfo.TableInfo.PlacementPolicyRef = nil + } jobBytes, err := json.Marshal(job) if err != nil { return errors.Trace(err) diff --git a/br/pkg/backup/schema_test.go b/br/pkg/backup/schema_test.go index dc4038bf98051..9446027f1cd61 100644 --- a/br/pkg/backup/schema_test.go +++ b/br/pkg/backup/schema_test.go @@ -97,16 +97,16 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { // Table t1 is not exist. testFilter, err := filter.Parse([]string{"test.t1"}) require.NoError(t, err) - _, backupSchemas, err := backup.BuildBackupRangeAndSchema( - m.Storage, testFilter, math.MaxUint64) + _, backupSchemas, _, err := backup.BuildBackupRangeAndSchema( + m.Storage, testFilter, math.MaxUint64, false) require.NoError(t, err) require.Nil(t, backupSchemas) // Database is not exist. fooFilter, err := filter.Parse([]string{"foo.t1"}) require.NoError(t, err) - _, backupSchemas, err = backup.BuildBackupRangeAndSchema( - m.Storage, fooFilter, math.MaxUint64) + _, backupSchemas, _, err = backup.BuildBackupRangeAndSchema( + m.Storage, fooFilter, math.MaxUint64, false) require.NoError(t, err) require.Nil(t, backupSchemas) @@ -114,8 +114,8 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { // Filter out system tables manually. noFilter, err := filter.Parse([]string{"*.*", "!mysql.*"}) require.NoError(t, err) - _, backupSchemas, err = backup.BuildBackupRangeAndSchema( - m.Storage, noFilter, math.MaxUint64) + _, backupSchemas, _, err = backup.BuildBackupRangeAndSchema( + m.Storage, noFilter, math.MaxUint64, false) require.NoError(t, err) require.Nil(t, backupSchemas) @@ -123,11 +123,15 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { tk.MustExec("drop table if exists t1;") tk.MustExec("create table t1 (a int);") tk.MustExec("insert into t1 values (10);") + tk.MustExec("create placement policy fivereplicas followers=4;") - _, backupSchemas, err = backup.BuildBackupRangeAndSchema( - m.Storage, testFilter, math.MaxUint64) + var policies []*backuppb.PlacementPolicy + _, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema( + m.Storage, testFilter, math.MaxUint64, false) require.NoError(t, err) require.Equal(t, 1, backupSchemas.Len()) + // we expect no policies collected, because it's not full backup. + require.Equal(t, 0, len(policies)) updateCh := new(simpleProgress) skipChecksum := false es := GetRandomStorage(t) @@ -155,10 +159,12 @@ func TestBuildBackupRangeAndSchema(t *testing.T) { tk.MustExec("insert into t2 values (10);") tk.MustExec("insert into t2 values (11);") - _, backupSchemas, err = backup.BuildBackupRangeAndSchema( - m.Storage, noFilter, math.MaxUint64) + _, backupSchemas, policies, err = backup.BuildBackupRangeAndSchema( + m.Storage, noFilter, math.MaxUint64, true) require.NoError(t, err) require.Equal(t, 2, backupSchemas.Len()) + // we expect the policy fivereplicas collected in full backup. + require.Equal(t, 1, len(policies)) updateCh.reset() es2 := GetRandomStorage(t) @@ -204,7 +210,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) { f, err := filter.Parse([]string{"test.t3"}) require.NoError(t, err) - _, backupSchemas, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64) + _, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false) require.NoError(t, err) require.Equal(t, 1, backupSchemas.Len()) @@ -238,7 +244,7 @@ func TestBuildBackupRangeAndSchemaWithBrokenStats(t *testing.T) { // recover the statistics. tk.MustExec("analyze table t3;") - _, backupSchemas, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64) + _, backupSchemas, _, err = backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false) require.NoError(t, err) require.Equal(t, 1, backupSchemas.Len()) @@ -280,7 +286,7 @@ func TestBackupSchemasForSystemTable(t *testing.T) { f, err := filter.Parse([]string{"mysql.systable*"}) require.NoError(t, err) - _, backupSchemas, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64) + _, backupSchemas, _, err := backup.BuildBackupRangeAndSchema(m.Storage, f, math.MaxUint64, false) require.NoError(t, err) require.Equal(t, systemTablesCount, backupSchemas.Len()) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 118b3ce3f8982..4c203179146f5 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -36,6 +36,7 @@ type Session interface { ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error CreateDatabase(ctx context.Context, schema *model.DBInfo) error CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error + CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error Close() } diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index d95655489a080..a9be0de744ee7 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -131,6 +131,13 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) } +// CreatePlacementPolicy implements glue.Session. +func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error { + d := domain.GetDomain(gs.se).DDL() + // the default behaviour is ignoring duplicated policy during restore. + return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore) +} + // CreateTables implements glue.BatchCreateTableSession. func (gs *tidbSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { d := domain.GetDomain(gs.se).DDL() diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index deaa52beb7d4d..71483b6059e53 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -53,8 +53,14 @@ import ( // defaultChecksumConcurrency is the default number of the concurrent // checksum tasks. const defaultChecksumConcurrency = 64 +const defaultDDLConcurrency = 16 const minBatchDdlSize = 1 +const ( + strictPlacementPolicyMode = "STRICT" + ignorePlacementPolicyMode = "IGNORE" +) + // Client sends requests to restore files. type Client struct { pdClient pd.Client @@ -78,7 +84,10 @@ type Client struct { // Before you do it, you can firstly read discussions at // https://github.com/pingcap/br/pull/377#discussion_r446594501, // this probably isn't as easy as it seems like (however, not hard, too :D) - db *DB + db *DB + + // use db pool to speed up restoration in BR binary mode. + dbPool []*DB rateLimit uint64 isOnline bool noSchema bool @@ -87,8 +96,6 @@ type Client struct { restoreStores []uint64 cipher *backuppb.CipherInfo - storage storage.ExternalStorage - backend *backuppb.StorageBackend switchModeInterval time.Duration switchCh chan struct{} @@ -99,42 +106,85 @@ type Client struct { dom *domain.Domain batchDdlSize uint + + // correspond to --tidb-placement-mode config. + // STRICT(default) means policy related SQL can be executed in tidb. + // IGNORE means policy related SQL will be ignored. + policyMode string + + // policy name -> policy info + policyMap *sync.Map + + supportPolicy bool } // NewRestoreClient returns a new RestoreClient. func NewRestoreClient( - g glue.Glue, pdClient pd.Client, - store kv.Storage, tlsConf *tls.Config, keepaliveConf keepalive.ClientParameters, isRawKv bool, -) (*Client, error) { - db, err := NewDB(g, store) +) *Client { + return &Client{ + pdClient: pdClient, + toolClient: NewSplitClient(pdClient, tlsConf, isRawKv), + tlsConf: tlsConf, + keepaliveConf: keepaliveConf, + switchCh: make(chan struct{}), + } +} + +// Init create db connection and domain for storage. +func (rc *Client) Init(g glue.Glue, store kv.Storage) error { + // setDB must happen after set PolicyMode. + // we will use policyMode to set session variables. + var err error + rc.db, rc.supportPolicy, err = NewDB(g, store, rc.policyMode) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } - dom, err := g.GetDomain(store) + rc.dom, err = g.GetDomain(store) if err != nil { - return nil, errors.Trace(err) + return errors.Trace(err) } - - var statsHandle *handle.Handle // tikv.Glue will return nil, tidb.Glue will return available domain - if dom != nil { - statsHandle = dom.StatsHandle() + if rc.dom != nil { + rc.statsHandler = rc.dom.StatsHandle() + } + + // Only in binary we can use multi-thread sessions to create tables. + // so use OwnStorage() to tell whether we are use binary or SQL. + if g.OwnsStorage() { + // Maybe allow user modify the DDL concurrency isn't necessary, + // because executing DDL is really I/O bound (or, algorithm bound?), + // and we cost most of time at waiting DDL jobs be enqueued. + // So these jobs won't be faster or slower when machine become faster or slower, + // hence make it a fixed value would be fine. + rc.dbPool, err = makeDBPool(defaultDDLConcurrency, func() (*DB, error) { + db, _, err := NewDB(g, store, rc.policyMode) + return db, err + }) + if err != nil { + log.Warn("create session pool failed, we will send DDLs only by created sessions", + zap.Error(err), + zap.Int("sessionCount", len(rc.dbPool)), + ) + } } + return errors.Trace(err) +} - return &Client{ - pdClient: pdClient, - toolClient: NewSplitClient(pdClient, tlsConf, isRawKv), - db: db, - tlsConf: tlsConf, - keepaliveConf: keepaliveConf, - switchCh: make(chan struct{}), - dom: dom, - statsHandler: statsHandle, - }, nil +// SetPlacementPolicyMode to policy mode. +func (rc *Client) SetPlacementPolicyMode(withPlacementPolicy string) { + switch strings.ToUpper(withPlacementPolicy) { + case strictPlacementPolicyMode: + rc.policyMode = strictPlacementPolicyMode + case ignorePlacementPolicyMode: + rc.policyMode = ignorePlacementPolicyMode + default: + rc.policyMode = strictPlacementPolicyMode + } + log.Info("set placement policy mode", zap.String("mode", rc.policyMode)) } // SetRateLimit to set rateLimit. @@ -146,15 +196,19 @@ func (rc *Client) SetCrypter(crypter *backuppb.CipherInfo) { rc.cipher = crypter } -// SetStorage set ExternalStorage for client. -func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error { - var err error - rc.storage, err = storage.New(ctx, backend, opts) - if err != nil { - return errors.Trace(err) - } - rc.backend = backend - return nil +// SetPolicyMap set policyMap. +func (rc *Client) SetPolicyMap(p *sync.Map) { + rc.policyMap = p +} + +// GetPolicyMap set policyMap. +func (rc *Client) GetPolicyMap() *sync.Map { + return rc.policyMap +} + +// GetSupportPolicy tells whether target tidb support placement policy. +func (rc *Client) GetSupportPolicy() bool { + return rc.supportPolicy } // GetPDClient returns a pd client. @@ -352,6 +406,20 @@ func (rc *Client) GetDatabase(name string) *utils.Database { return rc.databases[name] } +// GetPlacementPolicies returns policies. +func (rc *Client) GetPlacementPolicies() (*sync.Map, error) { + policies := &sync.Map{} + for _, p := range rc.backupMeta.Policies { + policyInfo := &model.PolicyInfo{} + err := json.Unmarshal(p.Info, policyInfo) + if err != nil { + return nil, errors.Trace(err) + } + policies.Store(policyInfo.Name.L, policyInfo) + } + return policies, nil +} + // GetDDLJobs returns ddl jobs. func (rc *Client) GetDDLJobs() []*model.Job { return rc.ddlJobs @@ -371,12 +439,41 @@ func (rc *Client) GetTableSchema( return table.Meta(), nil } +// CreatePolicies creates all policies in full restore. +func (rc *Client) CreatePolicies(ctx context.Context, policyMap *sync.Map) error { + var err error + policyMap.Range(func(key, value interface{}) bool { + e := rc.db.CreatePlacementPolicy(ctx, value.(*model.PolicyInfo)) + if e != nil { + err = e + return false + } + return true + }) + return err +} + // CreateDatabase creates a database. func (rc *Client) CreateDatabase(ctx context.Context, db *model.DBInfo) error { if rc.IsSkipCreateSQL() { log.Info("skip create database", zap.Stringer("database", db.Name)) return nil } + if !rc.supportPolicy { + log.Info("set placementPolicyRef to nil when target tidb not support policy", + zap.Stringer("database", db.Name)) + db.PlacementPolicyRef = nil + } + if db.PlacementPolicyRef != nil && rc.policyMap != nil { + if policy, ok := rc.policyMap.Load(db.PlacementPolicyRef.Name.L); ok { + err := rc.db.CreatePlacementPolicy(ctx, policy.(*model.PolicyInfo)) + if err != nil { + return errors.Trace(err) + } + // delete policy in cache after restore succeed + rc.policyMap.Delete(db.PlacementPolicyRef.Name.L) + } + } return rc.db.CreateDatabase(ctx, db) } @@ -395,7 +492,7 @@ func (rc *Client) CreateTables( for i, t := range tables { tbMapping[t.Info.Name.String()] = i } - dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, nil, errCh) + dataCh := rc.GoCreateTables(context.TODO(), dom, tables, newTS, errCh) for et := range dataCh { rules := et.RewriteRule rewriteRules.Data = append(rewriteRules.Data, rules.Data...) @@ -426,7 +523,7 @@ func (rc *Client) createTables( if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID") } else { - err := db.CreateTables(ctx, tables, rc.GetDDLJobsMap()) + err := db.CreateTables(ctx, tables, rc.GetDDLJobsMap(), rc.GetSupportPolicy(), rc.GetPolicyMap()) if err != nil { return nil, errors.Trace(err) } @@ -466,7 +563,7 @@ func (rc *Client) createTable( if rc.IsSkipCreateSQL() { log.Info("skip create table and alter autoIncID", zap.Stringer("table", table.Info.Name)) } else { - err := db.CreateTable(ctx, table, rc.GetDDLJobsMap()) + err := db.CreateTable(ctx, table, rc.GetDDLJobsMap(), rc.GetSupportPolicy(), rc.GetPolicyMap()) if err != nil { return CreatedTable{}, errors.Trace(err) } @@ -499,7 +596,6 @@ func (rc *Client) GoCreateTables( dom *domain.Domain, tables []*metautil.Table, newTS uint64, - dbPool []*DB, errCh chan<- error, ) <-chan CreatedTable { // Could we have a smaller size of tables? @@ -516,9 +612,9 @@ func (rc *Client) GoCreateTables( var err error - if rc.batchDdlSize > minBatchDdlSize && len(dbPool) > 0 { + if rc.batchDdlSize > minBatchDdlSize && len(rc.dbPool) > 0 { - err = rc.createTablesInWorkerPool(ctx, dom, tables, dbPool, newTS, outCh) + err = rc.createTablesInWorkerPool(ctx, dom, tables, newTS, outCh) if err == nil { defer log.Debug("all tables are created") @@ -565,8 +661,8 @@ func (rc *Client) GoCreateTables( defer close(outCh) defer log.Debug("all tables are created") var err error - if len(dbPool) > 0 { - err = rc.createTablesWithDBPool(ctx, createOneTable, tables, dbPool) + if len(rc.dbPool) > 0 { + err = rc.createTablesWithDBPool(ctx, createOneTable, tables) } else { err = rc.createTablesWithSoleDB(ctx, createOneTable, tables) } @@ -591,23 +687,23 @@ func (rc *Client) createTablesWithSoleDB(ctx context.Context, func (rc *Client) createTablesWithDBPool(ctx context.Context, createOneTable func(ctx context.Context, db *DB, t *metautil.Table) error, - tables []*metautil.Table, dbPool []*DB) error { + tables []*metautil.Table) error { eg, ectx := errgroup.WithContext(ctx) - workers := utils.NewWorkerPool(uint(len(dbPool)), "DDL workers") + workers := utils.NewWorkerPool(uint(len(rc.dbPool)), "DDL workers") for _, t := range tables { table := t workers.ApplyWithIDInErrorGroup(eg, func(id uint64) error { - db := dbPool[id%uint64(len(dbPool))] + db := rc.dbPool[id%uint64(len(rc.dbPool))] return createOneTable(ectx, db, table) }) } return eg.Wait() } -func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Domain, tables []*metautil.Table, dbPool []*DB, newTS uint64, outCh chan<- CreatedTable) error { +func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Domain, tables []*metautil.Table, newTS uint64, outCh chan<- CreatedTable) error { eg, ectx := errgroup.WithContext(ctx) rater := logutil.TraceRateOver(logutil.MetricTableCreatedCounter) - workers := utils.NewWorkerPool(uint(len(dbPool)), "Create Tables Worker") + workers := utils.NewWorkerPool(uint(len(rc.dbPool)), "Create Tables Worker") numOfTables := len(tables) for lastSent := 0; lastSent < numOfTables; lastSent += int(rc.batchDdlSize) { @@ -616,7 +712,7 @@ func (rc *Client) createTablesInWorkerPool(ctx context.Context, dom *domain.Doma tableSlice := tables[lastSent:end] workers.ApplyWithIDInErrorGroup(eg, func(id uint64) error { - db := dbPool[id%uint64(len(dbPool))] + db := rc.dbPool[id%uint64(len(rc.dbPool))] cts, err := rc.createTables(ectx, db, dom, tableSlice, newTS) // ddl job for [lastSent:i) failpoint.Inject("restore-createtables-error", func(val failpoint.Value) { if val.(bool) { diff --git a/br/pkg/restore/client_test.go b/br/pkg/restore/client_test.go index da2571030e36c..492d1324ee1df 100644 --- a/br/pkg/restore/client_test.go +++ b/br/pkg/restore/client_test.go @@ -32,7 +32,9 @@ var defaultKeepaliveCfg = keepalive.ClientParameters{ func TestCreateTables(t *testing.T) { m := mc - client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false) + g := gluetidb.New() + client := restore.NewRestoreClient(m.PDClient, nil, defaultKeepaliveCfg, false) + err := client.Init(g, m.Storage) require.NoError(t, err) info, err := m.Domain.GetSnapshotInfoSchema(math.MaxUint64) @@ -89,7 +91,9 @@ func TestCreateTables(t *testing.T) { func TestIsOnline(t *testing.T) { m := mc - client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false) + g := gluetidb.New() + client := restore.NewRestoreClient(m.PDClient, nil, defaultKeepaliveCfg, false) + err := client.Init(g, m.Storage) require.NoError(t, err) require.False(t, client.IsOnline()) @@ -99,7 +103,9 @@ func TestIsOnline(t *testing.T) { func TestPreCheckTableClusterIndex(t *testing.T) { m := mc - client, err := restore.NewRestoreClient(gluetidb.New(), m.PDClient, m.Storage, nil, defaultKeepaliveCfg, false) + g := gluetidb.New() + client := restore.NewRestoreClient(m.PDClient, nil, defaultKeepaliveCfg, false) + err := client.Init(g, m.Storage) require.NoError(t, err) info, err := m.Domain.GetSnapshotInfoSchema(math.MaxUint64) @@ -191,9 +197,11 @@ func TestPreCheckTableTiFlashReplicas(t *testing.T) { }, } - client, err := restore.NewRestoreClient(gluetidb.New(), fakePDClient{ + g := gluetidb.New() + client := restore.NewRestoreClient(fakePDClient{ stores: mockStores, - }, m.Storage, nil, defaultKeepaliveCfg, false) + }, nil, defaultKeepaliveCfg, false) + err := client.Init(g, m.Storage) require.NoError(t, err) tables := make([]*metautil.Table, 4) diff --git a/br/pkg/restore/db.go b/br/pkg/restore/db.go index f4ef5dc8256be..f0051abf9883d 100644 --- a/br/pkg/restore/db.go +++ b/br/pkg/restore/db.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "sort" + "sync" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -15,6 +16,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/sessionctx/variable" "go.uber.org/zap" ) @@ -29,23 +31,40 @@ type UniqueTableName struct { } // NewDB returns a new DB. -func NewDB(g glue.Glue, store kv.Storage) (*DB, error) { +func NewDB(g glue.Glue, store kv.Storage, policyMode string) (*DB, bool, error) { se, err := g.CreateSession(store) if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) } // The session may be nil in raw kv mode if se == nil { - return nil, nil + return nil, false, nil } // Set SQL mode to None for avoiding SQL compatibility problem err = se.Execute(context.Background(), "set @@sql_mode=''") if err != nil { - return nil, errors.Trace(err) + return nil, false, errors.Trace(err) + } + + supportPolicy := false + if len(policyMode) != 0 { + // Set placement mode for handle placement policy. + err = se.Execute(context.Background(), fmt.Sprintf("set @@tidb_placement_mode='%s';", policyMode)) + if err != nil { + if variable.ErrUnknownSystemVar.Equal(err) { + // not support placement policy, just ignore it + log.Warn("target tidb not support tidb_placement_mode, ignore create policies", zap.Error(err)) + } else { + return nil, false, errors.Trace(err) + } + } else { + log.Info("set tidb_placement_mode success", zap.String("mode", policyMode)) + supportPolicy = true + } } return &DB{ se: se, - }, nil + }, supportPolicy, nil } // ExecDDL executes the query of a ddl job. @@ -115,6 +134,16 @@ func (db *DB) UpdateStatsMeta(ctx context.Context, tableID int64, restoreTS uint return nil } +// CreatePlacementPolicy check whether cluster support policy and create the policy. +func (db *DB) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error { + err := db.se.CreatePlacementPolicy(ctx, policy) + if err != nil { + return errors.Trace(err) + } + log.Info("create placement policy succeed", zap.Stringer("name", policy.Name)) + return nil +} + // CreateDatabase executes a CREATE DATABASE SQL. func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { err := db.se.CreateDatabase(ctx, schema) @@ -228,11 +257,26 @@ func (db *DB) CreateTablePostRestore(ctx context.Context, table *metautil.Table, } // CreateTables execute a internal CREATE TABLES. -func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, ddlTables map[UniqueTableName]bool) error { +func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, + ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error { if batchSession, ok := db.se.(glue.BatchCreateTableSession); ok { m := map[string][]*model.TableInfo{} for _, table := range tables { m[table.DB.Name.L] = append(m[table.DB.Name.L], table.Info) + if table.Info.PlacementPolicyRef != nil && policyMap != nil { + if !supportPolicy { + log.Info("set placementPolicyRef to nil when target tidb not support policy", + zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name)) + table.Info.PlacementPolicyRef = nil + } else if p, exists := policyMap.Load(table.Info.PlacementPolicyRef.Name.L); exists { + err := db.CreatePlacementPolicy(ctx, p.(*model.PolicyInfo)) + if err != nil { + return errors.Trace(err) + } + // delete policy in cache after restore table succeed. + policyMap.Delete(table.Info.PlacementPolicyRef.Name.L) + } + } } if err := batchSession.CreateTables(ctx, m); err != nil { return err @@ -249,7 +293,22 @@ func (db *DB) CreateTables(ctx context.Context, tables []*metautil.Table, ddlTab } // CreateTable executes a CREATE TABLE SQL. -func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, ddlTables map[UniqueTableName]bool) error { +func (db *DB) CreateTable(ctx context.Context, table *metautil.Table, + ddlTables map[UniqueTableName]bool, supportPolicy bool, policyMap *sync.Map) error { + if table.Info.PlacementPolicyRef != nil && policyMap != nil { + if !supportPolicy { + log.Info("set placementPolicyRef to nil when target tidb not support policy", + zap.Stringer("table", table.Info.Name), zap.Stringer("db", table.DB.Name)) + table.Info.PlacementPolicyRef = nil + } else if p, exists := policyMap.Load(table.Info.PlacementPolicyRef.Name.L); exists { + err := db.CreatePlacementPolicy(ctx, p.(*model.PolicyInfo)) + if err != nil { + return errors.Trace(err) + } + // delete policy in cache after restore table succeed. + policyMap.Delete(table.Info.PlacementPolicyRef.Name.L) + } + } err := db.se.CreateTable(ctx, table.DB.Name, table.Info) if err != nil { log.Error("create table failed", diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 1393ff2e66dbf..2d774355c6c85 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -81,7 +81,7 @@ func TestRestoreAutoIncID(t *testing.T) { require.Equal(t, uint64(globalAutoID), autoIncID) // Alter AutoIncID to the next AutoIncID + 100 table.Info.AutoIncID = globalAutoID + 100 - db, err := restore.NewDB(gluetidb.New(), s.mock.Storage) + db, _, err := restore.NewDB(gluetidb.New(), s.mock.Storage, "STRICT") require.NoErrorf(t, err, "Error create DB") tk.MustExec("drop database if exists test;") // Test empty collate value @@ -96,7 +96,7 @@ func TestRestoreAutoIncID(t *testing.T) { err = db.CreateDatabase(context.Background(), table.DB) require.NoErrorf(t, err, "Error create empty charset db: %s %s", err, s.mock.DSN) uniqueMap := make(map[restore.UniqueTableName]bool) - err = db.CreateTable(context.Background(), &table, uniqueMap) + err = db.CreateTable(context.Background(), &table, uniqueMap, false, nil) require.NoErrorf(t, err, "Error create table: %s %s", err, s.mock.DSN) tk.MustExec("use test") @@ -107,7 +107,7 @@ func TestRestoreAutoIncID(t *testing.T) { // try again, failed due to table exists. table.Info.AutoIncID = globalAutoID + 200 - err = db.CreateTable(context.Background(), &table, uniqueMap) + err = db.CreateTable(context.Background(), &table, uniqueMap, false, nil) require.NoError(t, err) // Check if AutoIncID is not altered. autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) @@ -117,7 +117,7 @@ func TestRestoreAutoIncID(t *testing.T) { // try again, success because we use alter sql in unique map. table.Info.AutoIncID = globalAutoID + 300 uniqueMap[restore.UniqueTableName{"test", "\"t\""}] = true - err = db.CreateTable(context.Background(), &table, uniqueMap) + err = db.CreateTable(context.Background(), &table, uniqueMap, false, nil) require.NoError(t, err) // Check if AutoIncID is altered to globalAutoID + 300. autoIncID, err = strconv.ParseUint(tk.MustQuery("admin show `\"t\"` next_row_id").Rows()[0][3].(string), 10, 64) @@ -157,10 +157,10 @@ func TestCreateTablesInDb(t *testing.T) { } ddlJobMap[restore.UniqueTableName{dbSchema.Name.String(), tables[i].Info.Name.String()}] = false } - db, err := restore.NewDB(gluetidb.New(), s.mock.Storage) + db, _, err := restore.NewDB(gluetidb.New(), s.mock.Storage, "STRICT") require.NoError(t, err) - err = db.CreateTables(context.Background(), tables, ddlJobMap) + err = db.CreateTables(context.Background(), tables, ddlJobMap, false, nil) require.NoError(t, err) } diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index a323e6006e79c..35edb7ec52b50 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -137,8 +137,8 @@ func GetSSTMetaFromFile( } } -// MakeDBPool makes a session pool with specficated size by sessionFactory. -func MakeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) { +// makeDBPool makes a session pool with specficated size by sessionFactory. +func makeDBPool(size uint, dbFactory func() (*DB, error)) ([]*DB, error) { dbPool := make([]*DB, 0, size) for i := uint(0); i < size; i++ { db, e := dbFactory() diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 87461f53bab74..e5eb316ba5fac 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -50,6 +50,13 @@ const ( maxBackupConcurrency = 256 ) +const ( + FullBackupCmd = "Full Backup" + DBBackupCmd = "Database Backup" + TableBackupCmd = "Table Backup" + RawBackupCmd = "Raw Backup" +) + // CompressionConfig is the configuration for sst file compression. type CompressionConfig struct { CompressionType backuppb.CompressionType `json:"compression-type" toml:"compression-type"` @@ -217,6 +224,10 @@ func (cfg *BackupConfig) adjustBackupConfig() { } } +func isFullBackup(cmdName string) bool { + return cmdName == FullBackupCmd +} + // RunBackup starts a backup task inside the current goroutine. func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig) error { cfg.adjustBackupConfig() @@ -323,7 +334,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig return errors.Trace(err) } - ranges, schemas, err := backup.BuildBackupRangeAndSchema(mgr.GetStorage(), cfg.TableFilter, backupTS) + ranges, schemas, policies, err := backup.BuildBackupRangeAndSchema(mgr.GetStorage(), cfg.TableFilter, backupTS, isFullBackup(cmdName)) if err != nil { return errors.Trace(err) } @@ -341,6 +352,13 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig m.BrVersion = brVersion }) + log.Info("get placement policies", zap.Int("count", len(policies))) + if len(policies) != 0 { + metawriter.Update(func(m *backuppb.BackupMeta) { + m.Policies = policies + }) + } + // nothing to backup if ranges == nil { pdAddress := strings.Join(cfg.PD, ",") diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 209bf4265e23e..186d7978d9ca1 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -17,7 +17,6 @@ import ( "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/pdutil" "github.com/pingcap/tidb/br/pkg/restore" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" @@ -39,17 +38,26 @@ const ( FlagPDConcurrency = "pd-concurrency" // FlagBatchFlushInterval controls after how long the restore batch would be auto sended. FlagBatchFlushInterval = "batch-flush-interval" - // flagDdlBatchSize controls batch ddl size to create a batch of tables + // FlagDdlBatchSize controls batch ddl size to create a batch of tables FlagDdlBatchSize = "ddl-batch-size" + // FlagWithPlacementPolicy corresponds to tidb config with-tidb-placement-mode + // current only support STRICT or IGNORE, the default is STRICT according to tidb. + FlagWithPlacementPolicy = "with-tidb-placement-mode" defaultRestoreConcurrency = 128 maxRestoreBatchSizeLimit = 10240 defaultPDConcurrency = 1 defaultBatchFlushInterval = 16 * time.Second - defaultDDLConcurrency = 16 defaultFlagDdlBatchSize = 128 ) +const ( + FullRestoreCmd = "Full Restore" + DBRestoreCmd = "DataBase Restore" + TableRestoreCmd = "Table Restore" + RawRestoreCmd = "Raw Restore" +) + // RestoreCommonConfig is the common configuration for all BR restore tasks. type RestoreCommonConfig struct { Online bool `json:"online" toml:"online"` @@ -122,6 +130,8 @@ type RestoreConfig struct { BatchFlushInterval time.Duration `json:"batch-flush-interval" toml:"batch-flush-interval"` // DdlBatchSize use to define the size of batch ddl to create tables DdlBatchSize uint `json:"ddl-batch-size" toml:"ddl-batch-size"` + + WithPlacementPolicy string `json:"with-tidb-placement-mode" toml:"with-tidb-placement-mode"` } // DefineRestoreFlags defines common flags for the restore tidb command. @@ -129,6 +139,7 @@ func DefineRestoreFlags(flags *pflag.FlagSet) { flags.Bool(flagNoSchema, false, "skip creating schemas and tables, reuse existing empty ones") // Do not expose this flag _ = flags.MarkHidden(flagNoSchema) + flags.String(FlagWithPlacementPolicy, "STRICT", "correspond to tidb global/session variable with-tidb-placement-mode") DefineRestoreCommonFlags(flags) } @@ -165,6 +176,10 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet) error { if err != nil { return errors.Annotatef(err, "failed to get flag %s", FlagDdlBatchSize) } + cfg.WithPlacementPolicy, err = flags.GetString(FlagWithPlacementPolicy) + if err != nil { + return errors.Annotatef(err, "failed to get flag %s", FlagWithPlacementPolicy) + } return nil } @@ -194,17 +209,6 @@ func (cfg *RestoreConfig) adjustRestoreConfig() { } func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *RestoreConfig) error { - u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) - if err != nil { - return errors.Trace(err) - } - opts := storage.ExternalStorageOptions{ - NoCredentials: cfg.NoCreds, - SendCredentials: cfg.SendCreds, - } - if err = client.SetStorage(ctx, u, &opts); err != nil { - return errors.Trace(err) - } client.SetRateLimit(cfg.RateLimit) client.SetCrypter(&cfg.CipherInfo) client.SetConcurrency(uint(cfg.Concurrency)) @@ -216,7 +220,9 @@ func configureRestoreClient(ctx context.Context, client *restore.Client, cfg *Re } client.SetSwitchModeInterval(cfg.SwitchModeInterval) client.SetBatchDdlSize(cfg.DdlBatchSize) - err = client.LoadRestoreStores(ctx) + client.SetPlacementPolicyMode(cfg.WithPlacementPolicy) + + err := client.LoadRestoreStores(ctx) if err != nil { return errors.Trace(err) } @@ -259,6 +265,10 @@ func CheckRestoreDBAndTable(client *restore.Client, cfg *RestoreConfig) error { return nil } +func isFullRestore(cmdName string) bool { + return cmdName == FullRestoreCmd +} + // RunRestore starts a restore task inside the current goroutine. func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error { cfg.adjustRestoreConfig() @@ -283,17 +293,18 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf defer mgr.Close() keepaliveCfg.PermitWithoutStream = true - client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg, false) + client := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, false) + err = configureRestoreClient(ctx, client, cfg) if err != nil { return errors.Trace(err) } + // Init DB connection sessions + err = client.Init(g, mgr.GetStorage()) defer client.Close() - err = configureRestoreClient(ctx, client, cfg) if err != nil { return errors.Trace(err) } - u, s, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config) if err != nil { return errors.Trace(err) @@ -363,6 +374,23 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf restoreDBConfig := enableTiDBConfig() defer restoreDBConfig() + if client.GetSupportPolicy() { + // create policy if backupMeta has policies. + policies, err := client.GetPlacementPolicies() + if err != nil { + return errors.Trace(err) + } + if isFullRestore(cmdName) { + // we should restore all policies during full restoration. + err = client.CreatePolicies(ctx, policies) + if err != nil { + return errors.Trace(err) + } + } else { + client.SetPolicyMap(policies) + } + } + // execute DDL first err = client.ExecDDLs(ctx, ddlJobs) if err != nil { @@ -386,26 +414,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf // We make bigger errCh so we won't block on multi-part failed. errCh := make(chan error, 32) - // Maybe allow user modify the DDL concurrency isn't necessary, - // because executing DDL is really I/O bound (or, algorithm bound?), - // and we cost most of time at waiting DDL jobs be enqueued. - // So these jobs won't be faster or slower when machine become faster or slower, - // hence make it a fixed value would be fine. - var dbPool []*restore.DB - if g.OwnsStorage() { - // Only in binary we can use multi-thread sessions to create tables. - // so use OwnStorage() to tell whether we are use binary or SQL. - dbPool, err = restore.MakeDBPool(defaultDDLConcurrency, func() (*restore.DB, error) { - return restore.NewDB(g, mgr.GetStorage()) - }) - } - if err != nil { - log.Warn("create session pool failed, we will send DDLs only by created sessions", - zap.Error(err), - zap.Int("sessionCount", len(dbPool)), - ) - } - tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, dbPool, errCh) + tableStream := client.GoCreateTables(ctx, mgr.GetDomain(), tables, newTS, errCh) if len(files) == 0 { log.Info("no files, empty databases and tables are restored") summary.SetSuccessStatus(true) diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index 25409e8c28e31..621b41c4f25a6 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -75,11 +75,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR // sometimes we have pooled the connections. // sending heartbeats in idle times is useful. keepaliveCfg.PermitWithoutStream = true - client, err := restore.NewRestoreClient(g, mgr.GetPDClient(), mgr.GetStorage(), mgr.GetTLSConfig(), keepaliveCfg, true) - if err != nil { - return errors.Trace(err) - } - defer client.Close() + client := restore.NewRestoreClient(mgr.GetPDClient(), mgr.GetTLSConfig(), keepaliveCfg, true) client.SetRateLimit(cfg.RateLimit) client.SetCrypter(&cfg.CipherInfo) client.SetConcurrency(uint(cfg.Concurrency)) @@ -87,6 +83,11 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR client.EnableOnline() } client.SetSwitchModeInterval(cfg.SwitchModeInterval) + err = client.Init(g, mgr.GetStorage()) + defer client.Close() + if err != nil { + return errors.Trace(err) + } u, s, backupMeta, err := ReadBackupMeta(ctx, metautil.MetaFile, &cfg.Config) if err != nil { diff --git a/br/tests/br_tidb_placement_policy/run.sh b/br/tests/br_tidb_placement_policy/run.sh new file mode 100644 index 0000000000000..d8ff2dfd8547f --- /dev/null +++ b/br/tests/br_tidb_placement_policy/run.sh @@ -0,0 +1,170 @@ +#!/bin/sh +# +# Copyright 2022 PingCAP, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -eu +DB="$TEST_NAME" +TABLES_COUNT=30 + +PROGRESS_FILE="$TEST_DIR/progress_file" +BACKUPMETAV1_LOG="$TEST_DIR/backup.log" +BACKUPMETAV2_LOG="$TEST_DIR/backupv2.log" +RESTORE_LOG="$TEST_DIR/restore.log" +rm -rf $PROGRESS_FILE + +run_sql "create schema $DB;" +run_sql "create placement policy fivereplicas followers=4;" + +# generate 30 tables with 1 row content with policy fivereplicas;. +i=1 +while [ $i -le $TABLES_COUNT ]; do + run_sql "create table $DB.sbtest$i(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;" + run_sql "insert into $DB.sbtest$i values ($i, $i, '$i', '$i');" + i=$(($i+1)) +done + +# backup db +echo "full backup meta v2 start..." +unset BR_LOG_TO_TERM +rm -f $BACKUPMETAV2_LOG +run_br backup full --log-file $BACKUPMETAV2_LOG -s "local://$TEST_DIR/${DB}v2" --pd $PD_ADDR --use-backupmeta-v2 + +echo "full backup meta v1 start..." +rm -f $BACKUPMETAV1_LOG +run_br backup full --log-file $BACKUPMETAV1_LOG -s "local://$TEST_DIR/$DB" --pd $PD_ADDR + +# clear data and policy fore restore. +run_sql "DROP DATABASE $DB;" +run_sql "DROP PLACEMENT POLICY fivereplicas;" + +# restore with tidb-placement-policy +echo "restore with tidb-placement start..." +run_br restore db --db $DB -s "local://$TEST_DIR/${DB}v2" --pd $PD_ADDR + +policy_name=$(run_sql "use $DB; show placement;" | grep "POLICY" | awk '{print $2}') +if [ "$policy_name" -ne "fivereplicas" ];then + echo "TEST: [$TEST_NAME] failed! due to policy restore failed" + exit 1 +fi + +# clear data and policy for restore. +run_sql "DROP DATABASE $DB;" +run_sql "DROP PLACEMENT POLICY fivereplicas;" + +# restore without tidb-placement-policy +echo "restore without tidb-placement start..." +run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR --with-tidb-placement-mode "ignore" + +policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | wc -l) +if [ "$policy_count" -ne "0" ];then + echo "TEST: [$TEST_NAME] failed! due to policy should be ignore" + exit 1 +fi + +# clear data and policy for next case. +run_sql "DROP DATABASE $DB;" + +echo "test backup db can ignore placement policy" +run_sql "create schema $DB;" +run_sql "create placement policy fivereplicas followers=4;" + +# generate one table with one row content with policy fivereplicas;. +run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;" +run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');" + +run_br backup db --db $DB -s "local://$TEST_DIR/${DB}_db" --pd $PD_ADDR + +# clear data and policy for restore. +run_sql "DROP DATABASE $DB;" +run_sql "DROP PLACEMENT POLICY fivereplicas;" + +# restore should success and no policy have been restored. +run_br restore db --db $DB -s "local://$TEST_DIR/${DB}_db" --pd $PD_ADDR + +policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | wc -l) +if [ "$policy_count" -ne "0" ];then + echo "TEST: [$TEST_NAME] failed! due to policy should be ignore" + exit 1 +fi + +# clear data for next case. +run_sql "DROP DATABASE $DB;" + +echo "test only restore related placement policy..." +run_sql "create schema $DB;" +# we have two policies +run_sql "create placement policy fivereplicas followers=4;" +run_sql "create placement policy tworeplicas followers=1;" + +# generate one table with one row content with policy fivereplicas;. +run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;" +run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');" + +# backup table and policies +run_br backup full -s "local://$TEST_DIR/${DB}_related" --pd $PD_ADDR + +# clear data and policies for restore. +run_sql "DROP DATABASE $DB;" +run_sql "DROP PLACEMENT POLICY fivereplicas;" +run_sql "DROP PLACEMENT POLICY tworeplicas;" + +# restore table +run_br restore table --db $DB --table sbtest -s "local://$TEST_DIR/${DB}_related" --pd $PD_ADDR + +# verify only one policy has been restored +policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | wc -l) +if [ "$policy_count" -ne "1" ];then + echo "TEST: [$TEST_NAME] failed! due to policy should be ignore" + exit 1 +fi + +# which is fivereplicas... +policy_name=$(run_sql "use $DB; show placement;" | grep "POLICY" | awk '{print $2}') +if [ "$policy_name" = "fivereplicas" ];then + echo "TEST: [$TEST_NAME] failed! due to policy restore failed" + exit 1 +fi + +# clear data and policies for next case. +run_sql "DROP DATABASE $DB;" +run_sql "DROP PLACEMENT POLICY fivereplicas;" + +echo "test restore all placement policies..." +run_sql "create schema $DB;" +# we have two policies +run_sql "create placement policy fivereplicas followers=4;" +run_sql "create placement policy tworeplicas followers=1;" + +# generate one table with one row content with policy fivereplicas;. +run_sql "create table $DB.sbtest(id int primary key, k int not null, c char(120) not null, pad char(60) not null) placement policy=fivereplicas;" +run_sql "insert into $DB.sbtest values ($i, $i, '$i', '$i');" + +# backup table and policies +run_br backup full -s "local://$TEST_DIR/${DB}_all" --pd $PD_ADDR + +# clear data and policies for restore. +run_sql "DROP DATABASE $DB;" +run_sql "DROP PLACEMENT POLICY fivereplicas;" +run_sql "DROP PLACEMENT POLICY tworeplicas;" + +# restore table +run_br restore full -f "$DB.sbtest" -s "local://$TEST_DIR/${DB}_all" --pd $PD_ADDR + +# verify all policies have been restored even we only restore one table during tableFilter. +policy_count=$(run_sql "use $DB; show placement;" | grep "POLICY" | wc -l) +if [ "$policy_count" -ne "2" ];then + echo "TEST: [$TEST_NAME] failed! due to policy should be ignore" + exit 1 +fi diff --git a/executor/brie.go b/executor/brie.go index 3920254e2b869..24c92e444cf2c 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -497,6 +497,13 @@ func (gs *tidbGlueSession) CreateTable(ctx context.Context, dbName model.CIStr, return d.CreateTableWithInfo(gs.se, dbName, table, ddl.OnExistIgnore) } +// CreatePlacementPolicy implements glue.Session +func (gs *tidbGlueSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error { + d := domain.GetDomain(gs.se).DDL() + // the default behaviour is ignoring duplicated policy during restore. + return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore) +} + // Close implements glue.Session func (gs *tidbGlueSession) Close() { } diff --git a/go.mod b/go.mod index 23cf937ee8006..fc5418f0f5ba3 100644 --- a/go.mod +++ b/go.mod @@ -50,7 +50,7 @@ require ( github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220303073211-00fea37feb66 github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 - github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27 + github.com/pingcap/kvproto v0.0.0-20220314103629-10e688307221 github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 github.com/pingcap/tidb-tools v6.0.0-alpha.0.20220309081549-563c2a342f9c+incompatible diff --git a/go.sum b/go.sum index 440ef127b23e0..e7a881b95c29d 100644 --- a/go.sum +++ b/go.sum @@ -572,8 +572,9 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27 h1:+Ax2NXyAFIITrzgSPWBo3SBZtX/D60VeELCG0B0hqiY= github.com/pingcap/kvproto v0.0.0-20220304032058-ccd676426a27/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220314103629-10e688307221 h1:QiHOVihPED67vDEZE6kP3cGrS55U1+QXbSTahGaEyOI= +github.com/pingcap/kvproto v0.0.0-20220314103629-10e688307221/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=