Skip to content

Commit

Permalink
backup: use history iterator to scan ddl jobs (#54100)
Browse files Browse the repository at this point in the history
close #54139
  • Loading branch information
3pointer authored Jun 24, 2024
1 parent 1368bf7 commit eac8012
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 29 deletions.
2 changes: 1 addition & 1 deletion br/pkg/backup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ go_test(
embed = [":backup"],
flaky = True,
race = "on",
shard_count = 10,
shard_count = 11,
deps = [
"//br/pkg/conn",
"//br/pkg/gluetidb/mock",
Expand Down
94 changes: 66 additions & 28 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/base64"
"encoding/json"
"reflect"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -1000,8 +1001,37 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
if err != nil {
return errors.Trace(err)
}

// determine whether the jobs need to be append into `allJobs`
appendJobsFn := func(jobs []*model.Job) ([]*model.Job, bool) {
appendJobs := make([]*model.Job, 0, len(jobs))
for _, job := range jobs {
if skipUnsupportedDDLJob(job) {
continue
}
if job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion <= lastSchemaVersion {
// early exits to stop unnecessary scan
return appendJobs, true
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
appendJobs = append(appendJobs, job)
}
}
return appendJobs, false
}

newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs := make([]*model.Job, 0)
var allJobs []*model.Job
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx())
if err != nil {
Expand All @@ -1014,41 +1044,49 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
return errors.Trace(err)
}

historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
// filter out the jobs
allJobs, _ = appendJobsFn(allJobs)

historyJobsIter, err := ddl.GetLastHistoryDDLJobsIterator(newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

count := 0
count := len(allJobs)

cacheJobs := make([]*model.Job, 0, ddl.DefNumHistoryJobs)
for {
cacheJobs, err = historyJobsIter.GetLastJobs(ddl.DefNumHistoryJobs, cacheJobs)
if err != nil {
return errors.Trace(err)
}
if len(cacheJobs) == 0 {
// no more jobs
break
}
jobs, finished := appendJobsFn(cacheJobs)
count += len(jobs)
allJobs = append(allJobs, jobs...)
if finished {
// no more jobs between [LastTS, ts]
break
}
}
log.Debug("get complete jobs", zap.Int("jobs", count))
// sort by job id with ascend order
sort.Slice(allJobs, func(i, j int) bool {
return allJobs[i].ID < allJobs[j].ID
})
for _, job := range allJobs {
if skipUnsupportedDDLJob(job) {
continue
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
}

if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion && job.BinlogInfo.SchemaVersion <= backupSchemaVersion) {
if job.BinlogInfo.DBInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.DBInfo.PlacementPolicyRef = nil
}
if job.BinlogInfo.TableInfo != nil {
// ignore all placement policy info during incremental backup for now.
job.BinlogInfo.TableInfo.ClearPlacement()
}
jobBytes, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
}
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
count++
err = metaWriter.Send(jobBytes, metautil.AppendDDL)
if err != nil {
return errors.Trace(err)
}
}
log.Debug("get completed jobs", zap.Int("jobs", count))
return nil
}

Expand Down
59 changes: 59 additions & 0 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,65 @@ func TestGetTS(t *testing.T) {
require.Equal(t, backupts, ts)
}

func TestGetHistoryDDLJobs(t *testing.T) {
s := createBackupSuite(t)

tk := testkit.NewTestKit(t, s.cluster.Storage)
lastTS1, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("CREATE DATABASE IF NOT EXISTS test_db;")
tk.MustExec("CREATE TABLE IF NOT EXISTS test_db.test_table (c1 INT);")
lastTS2, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("RENAME TABLE test_db.test_table to test_db.test_table1;")
tk.MustExec("DROP TABLE test_db.test_table1;")
tk.MustExec("DROP DATABASE test_db;")
tk.MustExec("CREATE DATABASE test_db;")
tk.MustExec("USE test_db;")
tk.MustExec("CREATE TABLE test_table1 (c2 CHAR(255));")
tk.MustExec("RENAME TABLE test_table1 to test_table;")
tk.MustExec("RENAME TABLE test_table to test_table2;")
tk.MustExec("RENAME TABLE test_table2 to test_table;")
lastTS3, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)
tk.MustExec("TRUNCATE TABLE test_table;")
ts, err := s.cluster.GetOracle().GetTimestamp(context.Background(), &oracle.Option{TxnScope: oracle.GlobalTxnScope})
require.NoErrorf(t, err, "Error get last ts: %s", err)

checkFn := func(lastTS uint64, ts uint64, jobsCount int) {
cipher := backuppb.CipherInfo{CipherType: encryptionpb.EncryptionMethod_PLAINTEXT}
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
err = metaWriter.FlushBackupMeta(ctx)
require.NoError(t, err, "Finally flush backup meta failed", err)

metaBytes, err := s.storage.ReadFile(ctx, metautil.MetaFile)
require.NoError(t, err)
mockMeta := &backuppb.BackupMeta{}
err = proto.Unmarshal(metaBytes, mockMeta)
require.NoError(t, err)
// check the schema version
metaReader := metautil.NewMetaReader(mockMeta, s.storage, &cipher)
allDDLJobsBytes, err := metaReader.ReadDDLs(ctx)
require.NoError(t, err)
var allDDLJobs []*model.Job
err = json.Unmarshal(allDDLJobsBytes, &allDDLJobs)
require.NoError(t, err)
require.Len(t, allDDLJobs, jobsCount)
}

checkFn(lastTS1, ts, 11)
checkFn(lastTS2, ts, 9)
checkFn(lastTS1, lastTS2, 2)
checkFn(lastTS3, ts, 1)
}

func TestSkipUnsupportedDDLJob(t *testing.T) {
s := createBackupSuite(t)

Expand Down

0 comments on commit eac8012

Please sign in to comment.