diff --git a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go index e0a34e2bd234..4e5e436f1ed7 100644 --- a/pkg/ccl/backupccl/full_cluster_backup_restore_test.go +++ b/pkg/ccl/backupccl/full_cluster_backup_restore_test.go @@ -793,9 +793,7 @@ func TestReintroduceOfflineSpans(t *testing.T) { const numAccounts = 1000 ctx, _, srcDB, tempDir, cleanupSrc := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitNone, params) - _, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{}) defer cleanupSrc() - defer cleanupDst() dbBackupLoc := "nodelocal://0/my_db_backup" clusterBackupLoc := "nodelocal://0/my_cluster_backup" @@ -819,7 +817,10 @@ func TestReintroduceOfflineSpans(t *testing.T) { <-dbRestoreStarted srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc) - // All the restore to finish. This will issue AddSSTable requests at a + var tsMidRestore string + srcDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsMidRestore) + + // Allow the restore to finish. This will issue AddSSTable requests at a // timestamp that is before the last incremental we just took. close(blockDBRestore) @@ -836,16 +837,43 @@ func TestReintroduceOfflineSpans(t *testing.T) { srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc) - // Restore the incremental backup chain that has missing writes. - destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc) + t.Run("spans-reintroduced", func(t *testing.T) { + _, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{}) + defer cleanupDst() + + // Restore the incremental backup chain that has missing writes. + destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc) + + // Assert that the restored database has the same number of rows in both the + // source and destination cluster. + checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore + expectedCount := srcDB.QueryStr(t, checkQuery) + destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount) + + checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore + expectedCount = srcDB.QueryStr(t, checkQuery) + destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount) + }) + + t.Run("restore-canceled", func(t *testing.T) { + defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)() + _, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{}) + defer cleanupDst() - // Assert that the restored database has the same number - // of rows in both the source and destination cluster. - checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore - expectedCount := srcDB.QueryStr(t, checkQuery) - destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount) + destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsMidRestore, clusterBackupLoc) - checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore - expectedCount = srcDB.QueryStr(t, checkQuery) - destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount) + // Wait for the cluster restore job to finish, as well as the restored RESTORE TABLE + // job to revert. + destDB.CheckQueryResultsRetry(t, ` + SELECT description, status FROM [SHOW JOBS] + WHERE job_type = 'RESTORE' AND status NOT IN ('succeeded', 'canceled')`, + [][]string{}, + ) + // The cluster restore should succeed, but the table restore should have failed. + destDB.CheckQueryResults(t, + `SELECT status, count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' GROUP BY status ORDER BY status`, + [][]string{{"canceled", "1"}, {"succeeded", "1"}}) + + destDB.ExpectErr(t, `relation "restoredb.bank" does not exist`, `SELECT count(*) FROM restoredb.bank`) + }) } diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index d81722234a98..3e52d8f6b602 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -13,6 +13,7 @@ import ( "context" "fmt" "math" + "strings" "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" @@ -528,6 +529,10 @@ func restore( if len(spans) == 0 { return emptyRowCount, nil } + details := job.Details().(jobspb.RestoreDetails) + if alreadyMigrated := checkForMigratedData(details); alreadyMigrated { + return emptyRowCount, nil + } mu := struct { syncutil.Mutex @@ -1922,6 +1927,81 @@ func getRestoringPrivileges( return updatedPrivileges, nil } +func getImportAndRestoreJobs( + ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, stagingTableName string, +) ([]int64, error) { + pageSize := 100 + allJobs := make([]int64, 0) + var maxID int64 + for { + var done bool + var err error + var pageJobIDs []int64 + done, maxID, pageJobIDs, err = getImportAndRestoreJobsPage(ctx, ie, stagingTableName, txn, maxID, pageSize) + if err != nil { + return nil, err + } + allJobs = append(allJobs, pageJobIDs...) + if done { + break + } + } + return allJobs, nil +} + +func getImportAndRestoreJobsPage( + ctx context.Context, + ie *sql.InternalExecutor, + stagingTableName string, + txn *kv.Txn, + minID int64, + pageSize int, +) (done bool, maxID int64, jobIDs []int64, _ error) { + stmt := fmt.Sprintf("SELECT id, payload FROM %s "+ + "WHERE id > $1 AND status IN ($2, $3, $4)"+ + "ORDER BY id "+ // the ordering is important as we keep track of the maximum ID we've seen + "LIMIT $5", stagingTableName) + rows, err := ie.Query(ctx, "fetch-import-and-restore-jobs", nil /* txn */, stmt, minID, jobs.StatusRunning, jobs.StatusPaused, jobs.StatusPauseRequested, pageSize) + if err != nil { + return false, 0, nil, errors.Wrapf(err, "failed to fetch import and restore jobs from %s", stagingTableName) + } + + if len(rows) == 0 { + return true, 0, nil, nil + } + // Track the highest ID we encounter, so it can serve as the bottom of the + // next page. + maxID = int64(*(rows[len(rows)-1][0].(*tree.DInt))) + // If we got as many rows as we asked for, there might be more. + morePages := len(rows) == pageSize + + for _, row := range rows { + id, payloadBytes := row[0], row[1] + rawJobID, ok := id.(*tree.DInt) + if !ok { + return false, 0, nil, errors.New("could not parse jobID") + } + jobID := int64(*rawJobID) + payload, err := jobs.UnmarshalPayload(payloadBytes) + if err != nil { + return false, 0, nil, err + } + if payload.Type() == jobspb.TypeImport || payload.Type() == jobspb.TypeRestore { + jobIDs = append(jobIDs, jobID) + } + } + + return !morePages, maxID, jobIDs, nil +} + +// checkForMigratedData checks to see if any of the system tables have already +// been restored. If they have, then we have already restored all of the data +// to the cluster. We should not restore the data again, since we would be +// shadowing potentially migrated system table data. +func checkForMigratedData(details jobspb.RestoreDetails) bool { + return len(details.SystemTablesRestored) > 0 +} + // restoreSystemTables atomically replaces the contents of the system tables // with the data from the restored system tables. func (r *restoreResumer) restoreSystemTables( @@ -1955,6 +2035,25 @@ func (r *restoreResumer) restoreSystemTables( if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { txn.SetDebugName("system-restore-txn") stmtDebugName := fmt.Sprintf("restore-system-systemTable-%s", systemTableName) + // Perform any migrations of the data that are required. + switch systemTableName { + case systemschema.JobsTable.Name: + stagingTableName := restoreTempSystemDB + "." + systemTableName + jobsToRevert, err := getImportAndRestoreJobs(ctx, executor, txn, stagingTableName) + if err != nil { + return errors.Wrap(err, "failed to fetch IMPORT and RESTORE jobs") + } + + var updateStatusQuery strings.Builder + fmt.Fprintf(&updateStatusQuery, "UPDATE %s SET status = $1 WHERE id IN ", stagingTableName) + writeJobsList(jobsToRevert, &updateStatusQuery) + + _, err = executor.Exec(ctx, stmtDebugName+"-status-update", txn, updateStatusQuery.String(), jobs.StatusCancelRequested) + if err != nil { + return errors.Wrapf(err, "updating status for IMPORT and RESTORE jobs") + } + } + // Don't clear the jobs table as to not delete the jobs that are performing // the restore. if systemTableName == systemschema.SettingsTable.Name { @@ -2011,6 +2110,23 @@ func (r *restoreResumer) restoreSystemTables( return nil } +// writeJobsList writes to the given string builder the provided jobIDs to the +// given string builder as a SQL list. +func writeJobsList(jobs []int64, q *strings.Builder) { + if q == nil { + return + } + + fmt.Fprint(q, "(") + for i, job := range jobs { + if i > 0 { + fmt.Fprint(q, ", ") + } + fmt.Fprintf(q, "'%d'", job) + } + fmt.Fprint(q, ")") +} + var _ jobs.Resumer = &restoreResumer{} func init() {