diff --git a/br/pkg/backup/BUILD.bazel b/br/pkg/backup/BUILD.bazel index 251b2c3b75385..760e37c44d26b 100644 --- a/br/pkg/backup/BUILD.bazel +++ b/br/pkg/backup/BUILD.bazel @@ -67,7 +67,7 @@ go_test( embed = [":backup"], flaky = True, race = "on", - shard_count = 10, + shard_count = 11, deps = [ "//br/pkg/conn", "//br/pkg/gluetidb/mock", diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 012984cf39057..606f6cf4d67be 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "encoding/json" "reflect" + "sort" "strings" "time" @@ -1000,8 +1001,37 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S if err != nil { return errors.Trace(err) } + + // determine whether the jobs need to be append into `allJobs` + appendJobsFn := func(jobs []*model.Job) ([]*model.Job, bool) { + appendJobs := make([]*model.Job, 0, len(jobs)) + for _, job := range jobs { + if skipUnsupportedDDLJob(job) { + continue + } + if job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion <= lastSchemaVersion { + // early exits to stop unnecessary scan + return appendJobs, true + } + + if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && + (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) { + if job.BinlogInfo.DBInfo != nil { + // ignore all placement policy info during incremental backup for now. + job.BinlogInfo.DBInfo.PlacementPolicyRef = nil + } + if job.BinlogInfo.TableInfo != nil { + // ignore all placement policy info during incremental backup for now. + job.BinlogInfo.TableInfo.ClearPlacement() + } + appendJobs = append(appendJobs, job) + } + } + return appendJobs, false + } + newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver))) - allJobs := make([]*model.Job, 0) + var allJobs []*model.Job err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error { allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx()) if err != nil { @@ -1014,41 +1044,49 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S return errors.Trace(err) } - historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta) + // filter out the jobs + allJobs, _ = appendJobsFn(allJobs) + + historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta) if err != nil { return errors.Trace(err) } - log.Debug("get history jobs", zap.Int("jobs", len(historyJobs))) - allJobs = append(allJobs, historyJobs...) - count := 0 + count := len(allJobs) + + cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs) + for { + cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs) + if err != nil { + return errors.Trace(err) + } + if len(cacheJobs) == 0 { + // no more jobs + break + } + jobs, finished := appendJobsFn(cacheJobs) + count += len(jobs) + allJobs = append(allJobs, jobs...) + if finished { + // no more jobs between [LastTS, ts] + break + } + } + log.Debug("get complete jobs", zap.Int("jobs", count)) + // sort by job id with ascend order + sort.Slice(allJobs, func(i, j int) bool { + return allJobs[i].ID < allJobs[j].ID + }) for _, job := range allJobs { - if skipUnsupportedDDLJob(job) { - continue + jobBytes, err := json.Marshal(job) + if err != nil { + return errors.Trace(err) } - - if (job.State == model.JobStateDone || job.State == model.JobStateSynced) && - (job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) { - if job.BinlogInfo.DBInfo != nil { - // ignore all placement policy info during incremental backup for now. - job.BinlogInfo.DBInfo.PlacementPolicyRef = nil - } - if job.BinlogInfo.TableInfo != nil { - // ignore all placement policy info during incremental backup for now. - job.BinlogInfo.TableInfo.ClearPlacement() - } - jobBytes, err := json.Marshal(job) - if err != nil { - return errors.Trace(err) - } - err = metaWriter.Send(jobBytes, metautil.AppendDDL) - if err != nil { - return errors.Trace(err) - } - count++ + err = metaWriter.Send(jobBytes, metautil.AppendDDL) + if err != nil { + return errors.Trace(err) } } - log.Debug("get completed jobs", zap.Int("jobs", count)) return nil } diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index db6051e84b0c2..55ddcf2e64a74 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -188,6 +188,65 @@ func TestGetTS(t *testing.T) { require.Equal(t, backupts, ts) } +func TestGetHistoryDDLJobs(t *testing.T) { + s := createBackupSuite(t) + + tk := testkit.NewTestKit(t, s.cluster.Storage) + lastTS1, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;") + tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);") + lastTS2, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;") + tk.MustExec("DROP TABLE test_db.test_table1;") + tk.MustExec("DROP DATABASE test_db;") + tk.MustExec("CREATE DATABASE test_db;") + tk.MustExec("USE test_db;") + tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));") + tk.MustExec("RENAME TABLE test_table1 to test_table;") + tk.MustExec("RENAME TABLE test_table to test_table2;") + tk.MustExec("RENAME TABLE test_table2 to test_table;") + lastTS3, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + tk.MustExec("TRUNCATE TABLE test_table;") + ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope}) + require.NoErrorf(t, err, "Error get last ts: %s", err) + + checkFn := func(lastTS uint64, ts uint64, jobsCount int) { + cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT} + metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher) + ctx := context.Background() + metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false) + require.NoErrorf(t, err, "Error get ddl jobs: %s", err) + err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) + require.NoError(t, err, "Flush failed", err) + err = metaWriter.FlushBackupMeta(ctx) + require.NoError(t, err, "Finally flush backup meta failed", err) + + metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile) + require.NoError(t, err) + mockMeta := &backuppb.BackupMeta{} + err = proto.Unmarshal(metaBytes, mockMeta) + require.NoError(t, err) + // check the schema version + metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher) + allDDLJobsBytes, err := metaReader.ReadDDLs(ctx) + require.NoError(t, err) + var allDDLJobs []*model.Job + err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs) + require.NoError(t, err) + require.Len(t, allDDLJobs, jobsCount) + } + + checkFn(lastTS1, ts, 11) + checkFn(lastTS2, ts, 9) + checkFn(lastTS1, lastTS2, 2) + checkFn(lastTS3, ts, 1) +} + func TestSkipUnsupportedDDLJob(t *testing.T) { s := createBackupSuite(t)