Skip to content

Commit

Permalink
backupccl: reset restored jobs during cluster restore
Browse files Browse the repository at this point in the history
Previously, jobs were restored without modification during cluster
restore. Due to a recently discovered bug where backup may miss
non-transactional writes written to offline spans by these jobs, their
progress may no longer be accurate on the restored cluster.

IMPORT and RESTORE jobs perform non-transactional writes that may be
missed. When a cluster RESTORE brings back these OFFLINE tables, it will
also bring back its associated job. To ensure the underlying data in
these tables is correct, the jobs are now set in a reverting state so
that they can clean up after themselves.

In-progress schema change jobs that are affected, will fail upon
validation.

Release note (bug fix): Fix a bug where restored jobs may have assumed
to have made progress that was not captured in the backup. The restored
jobs are now either canceled cluster restore.
  • Loading branch information
pbardea committed Apr 19, 2021
1 parent cb3e9ce commit 1c882a0
Show file tree
Hide file tree
Showing 6 changed files with 448 additions and 285 deletions.
59 changes: 43 additions & 16 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,10 +630,9 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
}
}

// The initial restore will fail, and restart.
// The initial restore will return an error, and restart.
sqlDBRestore.ExpectErr(t, `injected error: restarting in background`, `RESTORE FROM $1`, LocalFoo)
// Expect the job to succeed. If the job fails, it's likely due to
// attempting to restore the same system table data twice.
// Expect the restore to succeed.
sqlDBRestore.CheckQueryResultsRetry(t,
`SELECT count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' AND status = 'succeeded'`,
[][]string{{"1"}})
Expand Down Expand Up @@ -842,9 +841,7 @@ func TestReintroduceOfflineSpans(t *testing.T) {

const numAccounts = 1000
ctx, _, srcDB, tempDir, cleanupSrc := backupRestoreTestSetupWithParams(t, singleNode, numAccounts, InitManualReplication, params)
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupSrc()
defer cleanupDst()

dbBackupLoc := "nodelocal://0/my_db_backup"
clusterBackupLoc := "nodelocal://0/my_cluster_backup"
Expand All @@ -868,7 +865,10 @@ func TestReintroduceOfflineSpans(t *testing.T) {
<-dbRestoreStarted
srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// All the restore to finish. This will issue AddSSTable requests at a
var tsMidRestore string
srcDB.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&tsMidRestore)

// Allow the restore to finish. This will issue AddSSTable requests at a
// timestamp that is before the last incremental we just took.
close(blockDBRestore)

Expand All @@ -885,16 +885,43 @@ func TestReintroduceOfflineSpans(t *testing.T) {

srcDB.Exec(t, `BACKUP TO $1 WITH revision_history`, clusterBackupLoc)

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)
t.Run("spans-reintroduced", func(t *testing.T) {
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupDst()

// Restore the incremental backup chain that has missing writes.
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsBefore, clusterBackupLoc)

// Assert that the restored database has the same number of rows in both the
// source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
})

t.Run("restore-canceled", func(t *testing.T) {
defer jobs.TestingSetAdoptAndCancelIntervals(100*time.Millisecond, 100*time.Millisecond)()
_, _, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, base.TestClusterArgs{})
defer cleanupDst()

// Assert that the restored database has the same number
// of rows in both the source and destination cluster.
checkQuery := `SELECT count(*) FROM restoredb.bank AS OF SYSTEM TIME ` + tsBefore
expectedCount := srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank`, expectedCount)
destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsMidRestore, clusterBackupLoc)

checkQuery = `SELECT count(*) FROM restoredb.bank@new_idx AS OF SYSTEM TIME ` + tsBefore
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
// Wait for the cluster restore job to finish, as well as the restored RESTORE TABLE
// job to cancel.
destDB.CheckQueryResultsRetry(t, `
SELECT description, status FROM [SHOW JOBS]
WHERE job_type = 'RESTORE' AND status NOT IN ('succeeded', 'canceled')`,
[][]string{},
)
// The cluster restore should succeed, but the table restore should have failed.
destDB.CheckQueryResults(t,
`SELECT status, count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' GROUP BY status ORDER BY status`,
[][]string{{"canceled", "1"}, {"succeeded", "1"}})

destDB.ExpectErr(t, `relation "restoredb.bank" does not exist`, `SELECT count(*) FROM restoredb.bank`)
})
}
18 changes: 18 additions & 0 deletions pkg/ccl/backupccl/restoration_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package backupccl

import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -107,3 +108,20 @@ func (b *restorationDataBase) isEmpty() bool {

// isMainBundle implements restorationData.
func (restorationDataBase) isMainBundle() bool { return false }

// checkForMigratedData checks to see if any of the system tables in the set of
// data that is to be restored has already been restored. If this is the case,
// it is not safe to try and restore the data again since the migration may have
// written to the temporary system table.
func checkForMigratedData(details jobspb.RestoreDetails, dataToRestore restorationData) bool {
for _, systemTable := range dataToRestore.getSystemTables() {
// We only need to check if _any_ of the system tables in this batch of
// data have been migrated. This is because the migration can only
// happen after all of the data in the batch has been restored.
if _, ok := details.SystemTablesMigrated[systemTable.GetName()]; ok {
return true
}
}

return false
}
49 changes: 41 additions & 8 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,14 @@ func restore(
return emptyRowCount, nil
}

// If we've already migrated some of the system tables we're about to
// restore, this implies that a previous attempt restored all of this data.
// We want to avoid restoring again since we'll be shadowing migrated keys.
details := job.Details().(jobspb.RestoreDetails)
if alreadyMigrated := checkForMigratedData(details, dataToRestore); alreadyMigrated {
return emptyRowCount, nil
}

mu := struct {
syncutil.Mutex
highWaterMark int
Expand Down Expand Up @@ -1670,8 +1678,7 @@ func (r *restoreResumer) Resume(ctx context.Context, execCtx interface{}) error
}
// Reload the details as we may have updated the job.
details = r.job.Details().(jobspb.RestoreDetails)
}
if details.DescriptorCoverage == tree.AllDescriptors {

if err := r.cleanupTempSystemTables(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -2425,6 +2432,10 @@ func (r *restoreResumer) restoreSystemTables(
tables []catalog.TableDescriptor,
) error {
tempSystemDBID := getTempSystemDBID(restoreDetails)
details := r.job.Details().(jobspb.RestoreDetails)
if details.SystemTablesMigrated == nil {
details.SystemTablesMigrated = make(map[string]bool)
}

// Iterate through all the tables that we're restoring, and if it was restored
// to the temporary system DB then copy it's data over to the real system
Expand All @@ -2434,14 +2445,36 @@ func (r *restoreResumer) restoreSystemTables(
continue
}
systemTableName := table.GetName()
stagingTableName := restoreTempSystemDB + "." + systemTableName

config, ok := systemTableBackupConfiguration[systemTableName]
if !ok {
log.Warningf(ctx, "no configuration specified for table %s... skipping restoration",
systemTableName)
}

if config.migrationFunc != nil {
if details.SystemTablesMigrated[systemTableName] {
continue
}

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := config.migrationFunc(ctx, r.execCfg, txn, stagingTableName); err != nil {
return err
}

// Keep track of which system tables we've migrated so that future job
// restarts don't try to import data over our migrated data. This would
// fail since the restored data would shadow the migrated keys.
details.SystemTablesMigrated[systemTableName] = true
return r.job.SetDetails(ctx, txn, details)
}); err != nil {
return err
}
}

if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetDebugName("system-restore-txn")
config, ok := systemTableBackupConfiguration[systemTableName]
if !ok {
log.Warningf(ctx, "no configuration specified for table %s... skipping restoration",
systemTableName)
}

restoreFunc := defaultSystemTableRestoreFunc
if config.customRestoreFunc != nil {
Expand All @@ -2450,7 +2483,7 @@ func (r *restoreResumer) restoreSystemTables(
}

log.Eventf(ctx, "restoring system table %s", systemTableName)
err := restoreFunc(ctx, r.execCfg, txn, systemTableName, restoreTempSystemDB+"."+systemTableName)
err := restoreFunc(ctx, r.execCfg, txn, systemTableName, stagingTableName)
if err != nil {
return errors.Wrapf(err, "restoring system table %s", systemTableName)
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ package backupccl
import (
"context"
fmt "fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -55,6 +61,10 @@ type systemBackupConfiguration struct {
// to support the restore (e.g. users that can run the restore, cluster settings
// that control how the restore runs, etc...).
restoreBeforeData bool
// migrationFunc performs the necessary migrations on the system table data in
// the crdb_temp staging table before it is loaded into the actual system
// table.
migrationFunc func(ctx context.Context, execCtx *sql.ExecutorConfig, txn *kv.Txn, tempTableName string) error
// customRestoreFunc is responsible for restoring the data from a table that
// holds the restore system table data into the given system table. If none
// is provided then `defaultRestoreFunc` is used.
Expand Down Expand Up @@ -93,6 +103,77 @@ func defaultSystemTableRestoreFunc(

// Custom restore functions for different system tables.

// jobsMigrationFunc resets the progress on schema change jobs, and marks all
// other jobs as reverting.
func jobsMigrationFunc(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, tempTableName string,
) (err error) {
executor := execCfg.InternalExecutor

const statesToRevert = `('` + string(jobs.StatusRunning) + `', ` +
`'` + string(jobs.StatusPauseRequested) + `', ` +
`'` + string(jobs.StatusPaused) + `')`

jobsToRevert := make([]int64, 0)
query := `SELECT id, payload FROM ` + tempTableName + ` WHERE status IN ` + statesToRevert
it, err := executor.QueryIteratorEx(
ctx, "restore-fetching-job-payloads", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
query)
if err != nil {
return errors.Wrap(err, "fetching job payloads")
}
defer func() {
closeErr := it.Close()
if err == nil {
err = closeErr
}
}()
for {
ok, err := it.Next(ctx)
if !ok {
if err != nil {
return err
}
break
}

r := it.Cur()
id, payloadBytes := r[0], r[1]
rawJobID, ok := id.(*tree.DInt)
if !ok {
return errors.Errorf("job: failed to read job id as DInt (was %T)", id)
}
jobID := int64(*rawJobID)

payload, err := jobs.UnmarshalPayload(payloadBytes)
if err != nil {
return errors.Wrap(err, "failed to unmarshal job to restore")
}
if payload.Type() == jobspb.TypeImport || payload.Type() == jobspb.TypeRestore {
jobsToRevert = append(jobsToRevert, jobID)
}
}

// Update the status for other jobs.
var updateStatusQuery strings.Builder
fmt.Fprintf(&updateStatusQuery, "UPDATE %s SET status = $1 WHERE id IN ", tempTableName)
fmt.Fprint(&updateStatusQuery, "(")
for i, job := range jobsToRevert {
if i > 0 {
fmt.Fprint(&updateStatusQuery, ", ")
}
fmt.Fprintf(&updateStatusQuery, "'%d'", job)
}
fmt.Fprint(&updateStatusQuery, ")")

if _, err := executor.Exec(ctx, "updating-job-status", txn, updateStatusQuery.String(), jobs.StatusCancelRequested); err != nil {
return errors.Wrap(err, "updating restored jobs as reverting")
}

return nil
}

// When restoring the jobs table we don't want to remove existing jobs, since
// that includes the restore that we're running.
func jobsRestoreFunc(
Expand Down Expand Up @@ -178,6 +259,7 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
},
systemschema.JobsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
migrationFunc: jobsMigrationFunc,
customRestoreFunc: jobsRestoreFunc,
},
systemschema.ScheduledJobsTable.GetName(): {
Expand Down
Loading

0 comments on commit 1c882a0

Please sign in to comment.