Skip to content

Commit

Permalink
backupccl: cluster restore import and restore jobs as canceled
Browse files Browse the repository at this point in the history
IMPORT and RESTORE may write non-transactionally, so their writes cannot
be trusted to be included in every backup. As such, they should be
restored in a reverting state to attempt to undo any of their untrusted
writes.

Release note (bug fix): IMPORT and RESTORE jobs are now restored as
reverting so that they cleanup after themselves. Previously, some of the
writes of the jobs while they were running may have been missed by
backup.
  • Loading branch information
pbardea committed Apr 16, 2021
1 parent 45fa9ef commit b3151e7
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 13 deletions.
54 changes: 41 additions & 13 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,7 @@ func TestReintroduceOfflineSpans(t *testing.T) {

const numAccounts = 1000
ctx, _, srcDB, tempDir, cleanupSrc := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitNone, params)
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{})
defer cleanupSrc()
defer cleanupDst()

dbBackupLoc := "nodelocal://0/my_db_backup"
clusterBackupLoc := "nodelocal://0/my_cluster_backup"
Expand All @@ -819,7 +817,10 @@ func TestReintroduceOfflineSpans(t *testing.T) {
<-dbRestoreStarted
srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// All the restore to finish. This will issue AddSSTable requests at a
var tsMidRestore string
srcDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsMidRestore)

// Allow the restore to finish. This will issue AddSSTable requests at a
// timestamp that is before the last incremental we just took.
close(blockDBRestore)

Expand All @@ -836,16 +837,43 @@ func TestReintroduceOfflineSpans(t *testing.T) {

srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)
t.Run("spans-reintroduced", func(t *testing.T) {
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{})
defer cleanupDst()

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)

// Assert that the restored database has the same number of rows in both the
// source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
})

t.Run("restore-canceled", func(t *testing.T) {
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitNone, base.TestClusterArgs{})
defer cleanupDst()

// Assert that the restored database has the same number
// of rows in both the source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsMidRestore, clusterBackupLoc)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
// Wait for the cluster restore job to finish, as well as the restored RESTORE TABLE
// job to revert.
destDB.CheckQueryResultsRetry(t, `
SELECT description, status FROM [SHOW JOBS]
WHERE job_type = 'RESTORE' AND status NOT IN ('succeeded', 'canceled')`,
[][]string{},
)
// The cluster restore should succeed, but the table restore should have failed.
destDB.CheckQueryResults(t,
`SELECT status, count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' GROUP BY status ORDER BY status`,
[][]string{{"canceled", "1"}, {"succeeded", "1"}})

destDB.ExpectErr(t, `relation "restoredb.bank" does not exist`, `SELECT count(*) FROM restoredb.bank`)
})
}
116 changes: 116 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"fmt"
"math"
"strings"

"github.com/cockroachdb/cockroach/pkg/build"
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
Expand Down Expand Up @@ -528,6 +529,10 @@ func restore(
if len(spans) == 0 {
return emptyRowCount, nil
}
details := job.Details().(jobspb.RestoreDetails)
if alreadyMigrated := checkForMigratedData(details); alreadyMigrated {
return emptyRowCount, nil
}

mu := struct {
syncutil.Mutex
Expand Down Expand Up @@ -1922,6 +1927,81 @@ func getRestoringPrivileges(
return updatedPrivileges, nil
}

func getImportAndRestoreJobs(
ctx context.Context, ie *sql.InternalExecutor, txn *kv.Txn, stagingTableName string,
) ([]int64, error) {
pageSize := 100
allJobs := make([]int64, 0)
var maxID int64
for {
var done bool
var err error
var pageJobIDs []int64
done, maxID, pageJobIDs, err = getImportAndRestoreJobsPage(ctx, ie, stagingTableName, txn, maxID, pageSize)
if err != nil {
return nil, err
}
allJobs = append(allJobs, pageJobIDs...)
if done {
break
}
}
return allJobs, nil
}

func getImportAndRestoreJobsPage(
ctx context.Context,
ie *sql.InternalExecutor,
stagingTableName string,
txn *kv.Txn,
minID int64,
pageSize int,
) (done bool, maxID int64, jobIDs []int64, _ error) {
stmt := fmt.Sprintf("SELECT id, payload FROM %s "+
"WHERE id > $1 AND status IN ($2, $3, $4)"+
"ORDER BY id "+ // the ordering is important as we keep track of the maximum ID we've seen
"LIMIT $5", stagingTableName)
rows, err := ie.Query(ctx, "fetch-import-and-restore-jobs", nil /* txn */, stmt, minID, jobs.StatusRunning, jobs.StatusPaused, jobs.StatusPauseRequested, pageSize)
if err != nil {
return false, 0, nil, errors.Wrapf(err, "failed to fetch import and restore jobs from %s", stagingTableName)
}

if len(rows) == 0 {
return true, 0, nil, nil
}
// Track the highest ID we encounter, so it can serve as the bottom of the
// next page.
maxID = int64(*(rows[len(rows)-1][0].(*tree.DInt)))
// If we got as many rows as we asked for, there might be more.
morePages := len(rows) == pageSize

for _, row := range rows {
id, payloadBytes := row[0], row[1]
rawJobID, ok := id.(*tree.DInt)
if !ok {
return false, 0, nil, errors.New("could not parse jobID")
}
jobID := int64(*rawJobID)
payload, err := jobs.UnmarshalPayload(payloadBytes)
if err != nil {
return false, 0, nil, err
}
if payload.Type() == jobspb.TypeImport || payload.Type() == jobspb.TypeRestore {
jobIDs = append(jobIDs, jobID)
}
}

return !morePages, maxID, jobIDs, nil
}

// checkForMigratedData checks to see if any of the system tables have already
// been restored. If they have, then we have already restored all of the data
// to the cluster. We should not restore the data again, since we would be
// shadowing potentially migrated system table data.
func checkForMigratedData(details jobspb.RestoreDetails) bool {
return len(details.SystemTablesRestored) > 0
}

// restoreSystemTables atomically replaces the contents of the system tables
// with the data from the restored system tables.
func (r *restoreResumer) restoreSystemTables(
Expand Down Expand Up @@ -1955,6 +2035,25 @@ func (r *restoreResumer) restoreSystemTables(
if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetDebugName("system-restore-txn")
stmtDebugName := fmt.Sprintf("restore-system-systemTable-%s", systemTableName)
// Perform any migrations of the data that are required.
switch systemTableName {
case systemschema.JobsTable.Name:
stagingTableName := restoreTempSystemDB + "." + systemTableName
jobsToRevert, err := getImportAndRestoreJobs(ctx, executor, txn, stagingTableName)
if err != nil {
return errors.Wrap(err, "failed to fetch IMPORT and RESTORE jobs")
}

var updateStatusQuery strings.Builder
fmt.Fprintf(&updateStatusQuery, "UPDATE %s SET status = $1 WHERE id IN ", stagingTableName)
writeJobsList(jobsToRevert, &updateStatusQuery)

_, err = executor.Exec(ctx, stmtDebugName+"-status-update", txn, updateStatusQuery.String(), jobs.StatusCancelRequested)
if err != nil {
return errors.Wrapf(err, "updating status for IMPORT and RESTORE jobs")
}
}

// Don't clear the jobs table as to not delete the jobs that are performing
// the restore.
if systemTableName == systemschema.SettingsTable.Name {
Expand Down Expand Up @@ -2011,6 +2110,23 @@ func (r *restoreResumer) restoreSystemTables(
return nil
}

// writeJobsList writes to the given string builder the provided jobIDs to the
// given string builder as a SQL list.
func writeJobsList(jobs []int64, q *strings.Builder) {
if q == nil {
return
}

fmt.Fprint(q, "(")
for i, job := range jobs {
if i > 0 {
fmt.Fprint(q, ", ")
}
fmt.Fprintf(q, "'%d'", job)
}
fmt.Fprint(q, ")")
}

var _ jobs.Resumer = &restoreResumer{}

func init() {
Expand Down

0 comments on commit b3151e7

Please sign in to comment.