From b3151e721a112a8bb455717e7eb9a8023de79720 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Thu, 15 Apr 2021 21:58:44 +0000 Subject: [PATCH] backupccl: cluster restore import and restore jobs as canceled IMPORT and RESTORE may write non-transactionally, so their writes cannot be trusted to be included in every backup. As such, they should be restored in a reverting state to attempt to undo any of their untrusted writes. Release note (bug fix): IMPORT and RESTORE jobs are now restored as reverting so that they cleanup after themselves. Previously, some of the writes of the jobs while they were running may have been missed by backup. --- .../full_cluster_backup_restore_test.go | 54 ++++++-- pkg/ccl/backupccl/restore_job.go | 116 ++++++++++++++++++ 2 files changed, 157 insertions(+), 13 deletions(-) diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index e0a34e2bd234..4e5e436f1ed7 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -793,9 +793,7 @@ func TestReintroduceOfflineSpans(t *testing.T) { const numAccounts = 1000 ctx, _, srcDB, tempDir, cleanupSrc := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitNone, params) - _, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{}) defer cleanupSrc() - defer cleanupDst() dbBackupLoc := "nodelocal://0/my_db_backup" clusterBackupLoc := "nodelocal://0/my_cluster_backup" @@ -819,7 +817,10 @@ func TestReintroduceOfflineSpans(t *testing.T) { <-dbRestoreStarted srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc) - // All the restore to finish. This will issue AddSSTable requests at a + var tsMidRestore string + srcDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsMidRestore) + + // Allow the restore to finish. This will issue AddSSTable requests at a // timestamp that is before the last incremental we just took. close(blockDBRestore) @@ -836,16 +837,43 @@ func TestReintroduceOfflineSpans(t *testing.T) { srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc) - // Restore the incremental backup chain that has missing writes. - destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc) + t.Run("spans-reintroduced", func(t *testing.T) { + _, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{}) + defer cleanupDst() + + // Restore the incremental backup chain that has missing writes. + destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc) + + // Assert that the restored database has the same number of rows in both the + // source and destination cluster. + checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore + expectedCount := srcDB.QueryStr(t, checkQuery) + destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount) + + checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore + expectedCount = srcDB.QueryStr(t, checkQuery) + destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount) + }) + + t.Run("restore-canceled", func(t *testing.T) { + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)() + _, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{}) + defer cleanupDst() - // Assert that the restored database has the same number - // of rows in both the source and destination cluster. - checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore - expectedCount := srcDB.QueryStr(t, checkQuery) - destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount) + destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsMidRestore, clusterBackupLoc) - checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore - expectedCount = srcDB.QueryStr(t, checkQuery) - destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount) + // Wait for the cluster restore job to finish, as well as the restored RESTORE TABLE + // job to revert. + destDB.CheckQueryResultsRetry(t, ` + SELECT description, status FROM [SHOW JOBS] + WHERE job_type = 'RESTORE' AND status NOT IN ('succeeded', 'canceled')`, + [][]string{}, + ) + // The cluster restore should succeed, but the table restore should have failed. + destDB.CheckQueryResults(t, + `SELECT status, count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' GROUP BY status ORDER BY status`, + [][]string{{"canceled", "1"}, {"succeeded", "1"}}) + + destDB.ExpectErr(t, `relation "restoredb.bank" does not exist`, `SELECT count(*) FROM restoredb.bank`) + }) } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index d81722234a98..3e52d8f6b602 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -13,6 +13,7 @@ import ( "context" "fmt" "math" + "strings" "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" @@ -528,6 +529,10 @@ func restore( if len(spans) == 0 { return emptyRowCount, nil } + details := job.Details().(jobspb.RestoreDetails) + if alreadyMigrated := checkForMigratedData(details); alreadyMigrated { + return emptyRowCount, nil + } mu := struct { syncutil.Mutex @@ -1922,6 +1927,81 @@ func getRestoringPrivileges( return updatedPrivileges, nil } +func getImportAndRestoreJobs( + ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, stagingTableName string, +) ([]int64, error) { + pageSize := 100 + allJobs := make([]int64, 0) + var maxID int64 + for { + var done bool + var err error + var pageJobIDs []int64 + done, maxID, pageJobIDs, err = getImportAndRestoreJobsPage(ctx, ie, stagingTableName, txn, maxID, pageSize) + if err != nil { + return nil, err + } + allJobs = append(allJobs, pageJobIDs...) + if done { + break + } + } + return allJobs, nil +} + +func getImportAndRestoreJobsPage( + ctx context.Context, + ie *sql.InternalExecutor, + stagingTableName string, + txn *kv.Txn, + minID int64, + pageSize int, +) (done bool, maxID int64, jobIDs []int64, _ error) { + stmt := fmt.Sprintf("SELECT id, payload FROM %s "+ + "WHERE id > $1 AND status IN ($2, $3, $4)"+ + "ORDER BY id "+ // the ordering is important as we keep track of the maximum ID we've seen + "LIMIT $5", stagingTableName) + rows, err := ie.Query(ctx, "fetch-import-and-restore-jobs", nil /* txn */, stmt, minID, jobs.StatusRunning, jobs.StatusPaused, jobs.StatusPauseRequested, pageSize) + if err != nil { + return false, 0, nil, errors.Wrapf(err, "failed to fetch import and restore jobs from %s", stagingTableName) + } + + if len(rows) == 0 { + return true, 0, nil, nil + } + // Track the highest ID we encounter, so it can serve as the bottom of the + // next page. + maxID = int64(*(rows[len(rows)-1][0].(*tree.DInt))) + // If we got as many rows as we asked for, there might be more. + morePages := len(rows) == pageSize + + for _, row := range rows { + id, payloadBytes := row[0], row[1] + rawJobID, ok := id.(*tree.DInt) + if !ok { + return false, 0, nil, errors.New("could not parse jobID") + } + jobID := int64(*rawJobID) + payload, err := jobs.UnmarshalPayload(payloadBytes) + if err != nil { + return false, 0, nil, err + } + if payload.Type() == jobspb.TypeImport || payload.Type() == jobspb.TypeRestore { + jobIDs = append(jobIDs, jobID) + } + } + + return !morePages, maxID, jobIDs, nil +} + +// checkForMigratedData checks to see if any of the system tables have already +// been restored. If they have, then we have already restored all of the data +// to the cluster. We should not restore the data again, since we would be +// shadowing potentially migrated system table data. +func checkForMigratedData(details jobspb.RestoreDetails) bool { + return len(details.SystemTablesRestored) > 0 +} + // restoreSystemTables atomically replaces the contents of the system tables // with the data from the restored system tables. func (r *restoreResumer) restoreSystemTables( @@ -1955,6 +2035,25 @@ func (r *restoreResumer) restoreSystemTables( if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { txn.SetDebugName("system-restore-txn") stmtDebugName := fmt.Sprintf("restore-system-systemTable-%s", systemTableName) + // Perform any migrations of the data that are required. + switch systemTableName { + case systemschema.JobsTable.Name: + stagingTableName := restoreTempSystemDB + "." + systemTableName + jobsToRevert, err := getImportAndRestoreJobs(ctx, executor, txn, stagingTableName) + if err != nil { + return errors.Wrap(err, "failed to fetch IMPORT and RESTORE jobs") + } + + var updateStatusQuery strings.Builder + fmt.Fprintf(&updateStatusQuery, "UPDATE %s SET status = $1 WHERE id IN ", stagingTableName) + writeJobsList(jobsToRevert, &updateStatusQuery) + + _, err = executor.Exec(ctx, stmtDebugName+"-status-update", txn, updateStatusQuery.String(), jobs.StatusCancelRequested) + if err != nil { + return errors.Wrapf(err, "updating status for IMPORT and RESTORE jobs") + } + } + // Don't clear the jobs table as to not delete the jobs that are performing // the restore. if systemTableName == systemschema.SettingsTable.Name { @@ -2011,6 +2110,23 @@ func (r *restoreResumer) restoreSystemTables( return nil } +// writeJobsList writes to the given string builder the provided jobIDs to the +// given string builder as a SQL list. +func writeJobsList(jobs []int64, q *strings.Builder) { + if q == nil { + return + } + + fmt.Fprint(q, "(") + for i, job := range jobs { + if i > 0 { + fmt.Fprint(q, ", ") + } + fmt.Fprintf(q, "'%d'", job) + } + fmt.Fprint(q, ")") +} + var _ jobs.Resumer = &restoreResumer{} func init() {