Skip to content

Commit

Permalink
Merge #83676 #84886
Browse files Browse the repository at this point in the history
83676: kvserver: add Time Bound Iteration to DeleteRange r=erikgrinaker a=msbutler

Previously, a kv client could only pass an AOST timestamp to a DelRange
request. Now, the user can pass a lower bound timestamp, causing
the kvserver to leverage time bound iteration while issuing delete requests.

Specifically, the server uses an MVCCIncrementalIterator to iterate over the
target span at the client provided time bounds, track a continuous run of keys
in that time bound, and flush the run via point and MVCC range tombstones
depending on the size of the run.

In a future pr, this operation will replace the use of RevertRange during IMPORT
INTO rollbacks to make them MVCC compatible.

Informs #70428

Release note: none

84886: backupccl: do not backup or restore system.jobs r=dt a=dt

Previously system.jobs was backed up by cluster backups and restored by cluster restore,
albeit with all but schema change jobs moved into a reverting state. However job records
almost always include specific tables' IDs or IDs of other objects in their persisted state
which means a restored job is only valid, even just for cancellation, if all of the objects
that it could reference still have the same IDs after they're restored.

At one point, this was true in cluster restore, but is becoming increasingly difficult to
maintain as the portions of the ID space that are reserved changes. The only real reason to
restore jobs, given that most were cancelled anyway, was so that schema changes mentioned in
the descriptors of restored tables would be completed or rolled back -- absent a job, the
mutation would otherwise just remain incomplete indefinitely. However during database or
table RESTORE, jobs are not restored but rather the required schema change job is synthesized
from the information in the table descriptor. This approach can be used during cluster restore
as well instead of restoring the jobs table, eliminating the majority of ID-mentioning fields
in restored system tables, and, in particular, those that are burried deep in binary payloads
rather than easy to find columns (ala zones).

Release note (sql change): Cluster BACKUP and RESTORE no longer includes job records, which previously
were usually only restored in a cancelling state with the exception of schema changes which were restored
to their initial running state. Instead any schema change jobs required for restored tables are
recreated after restoring the tables.

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
3 people committed Jul 25, 2022
3 parents 69e5187 + aead1ca + 11718f3 commit 4ab8292
Show file tree
Hide file tree
Showing 10 changed files with 1,021 additions and 337 deletions.
61 changes: 0 additions & 61 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,6 @@ func TestFullClusterBackup(t *testing.T) {
}
}

// The claim_session_id field in jobs is a uuid and so needs to be excluded
// when comparing jobs pre/post restore. The span config reconciliation job
// too is something we exclude; because it's a singleton job, when restored
// into another cluster it self-terminates.
const jobsQuery = `
SELECT id, status, created, payload, progress, created_by_type, created_by_id, claim_instance_id
FROM system.jobs
WHERE id NOT IN
(
SELECT job_id FROM [SHOW AUTOMATIC JOBS]
WHERE job_type = 'AUTO SPAN CONFIG RECONCILIATION'
)
`
// Pause SQL Stats compaction job to ensure the test is deterministic.
sqlDB.Exec(t, `PAUSE SCHEDULES SELECT id FROM [SHOW SCHEDULES FOR SQL STATISTICS]`)

Expand Down Expand Up @@ -151,7 +138,6 @@ CREATE TABLE data2.foo (a int);
// should appear in the restore.
// This job will eventually fail since it will run from a new cluster.
sqlDB.Exec(t, `BACKUP data.bank TO 'nodelocal://0/throwawayjob'`)
preBackupJobs := sqlDB.QueryStr(t, jobsQuery)
// Populate system.settings.
sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_io_write.concurrent_addsstable_requests = 5`)
sqlDB.Exec(t, `INSERT INTO system.ui (key, value, "lastUpdated") VALUES ($1, $2, now())`, "some_key", "some_val")
Expand Down Expand Up @@ -323,27 +309,6 @@ CREATE TABLE data2.foo (a int);
sqlDBRestore.CheckQueryResults(t, grantCheck, sqlDB.QueryStr(t, grantCheck))
})

t.Run("ensure that jobs are restored", func(t *testing.T) {
// Ensure that the jobs in the RESTORE cluster is a superset of the jobs
// that were in the BACKUP cluster (before the full cluster BACKUP job was
// run). There may be more jobs now because the restore can run jobs of
// its own.
newJobsStr := sqlDBRestore.QueryStr(t, jobsQuery)
newJobs := make(map[string][]string)

for _, newJob := range newJobsStr {
// The first element of the slice is the job id.
newJobs[newJob[0]] = newJob
}
for _, oldJob := range preBackupJobs {
newJob, ok := newJobs[oldJob[0]]
if !ok {
t.Errorf("Expected to find job %+v in RESTORE cluster, but not found", oldJob)
}
require.Equal(t, oldJob, newJob)
}
})

t.Run("zone_configs", func(t *testing.T) {
// The restored zones should be a superset of the zones in the backed up
// cluster.
Expand Down Expand Up @@ -676,7 +641,6 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
{"comments"},
{"database_role_settings"},
{"external_connections"},
{"jobs"},
{"locations"},
{"role_members"},
{"role_options"},
Expand Down Expand Up @@ -768,7 +732,6 @@ func TestClusterRestoreFailCleanup(t *testing.T) {
{"comments"},
{"database_role_settings"},
{"external_connections"},
{"jobs"},
{"locations"},
{"role_members"},
{"role_options"},
Expand Down Expand Up @@ -1043,30 +1006,6 @@ func TestReintroduceOfflineSpans(t *testing.T) {
expectedCount = srcDB.QueryStr(t, checkQuery)
destDB.CheckQueryResults(t, `SELECT count(*) FROM restoredb.bank@new_idx`, expectedCount)
})

t.Run("restore-canceled", func(t *testing.T) {
args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}},
}
_, destDB, cleanupDst := backupRestoreTestSetupEmpty(t, singleNode, tempDir, InitManualReplication, args)
defer cleanupDst()

destDB.Exec(t, `RESTORE FROM $1 AS OF SYSTEM TIME `+tsMidRestore, clusterBackupLoc)

// Wait for the cluster restore job to finish, as well as the restored RESTORE TABLE
// job to cancel.
destDB.CheckQueryResultsRetry(t, `
SELECT description, status FROM [SHOW JOBS]
WHERE job_type = 'RESTORE' AND status NOT IN ('succeeded', 'canceled')`,
[][]string{},
)
// The cluster restore should succeed, but the table restore should have failed.
destDB.CheckQueryResults(t,
`SELECT status, count(*) FROM [SHOW JOBS] WHERE job_type = 'RESTORE' GROUP BY status ORDER BY status`,
[][]string{{"canceled", "1"}, {"succeeded", "1"}})

destDB.ExpectErr(t, `relation "restoredb.bank" does not exist`, `SELECT count(*) FROM restoredb.bank`)
})
}

// TestClusterRevisionDoesNotBackupOptOutSystemTables is a regression test for a
Expand Down
19 changes: 8 additions & 11 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1825,18 +1825,15 @@ func (r *restoreResumer) publishDescriptors(
mutTable.RowLevelTTL.ScheduleID = j.ScheduleID()
}
newTables = append(newTables, mutTable.TableDesc())
// For cluster restores, all the jobs are restored directly from the jobs
// table, so there is no need to re-create ongoing schema change jobs,
// otherwise we'll create duplicate jobs.
if details.DescriptorCoverage != tree.AllDescriptors || len(badIndexes) > 0 {
// Convert any mutations that were in progress on the table descriptor
// when the backup was taken, and convert them to schema change jobs.
if err := createSchemaChangeJobsFromMutations(ctx,
r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), mutTable,
); err != nil {
return err
}

// Convert any mutations that were in progress on the table descriptor
// when the backup was taken, and convert them to schema change jobs.
if err := createSchemaChangeJobsFromMutations(ctx,
r.execCfg.JobRegistry, r.execCfg.Codec, txn, r.job.Payload().UsernameProto.Decode(), mutTable,
); err != nil {
return err
}

}
// For all of the newly created types, make type schema change jobs for any
// type descriptors that were backed up in the middle of a type schema change.
Expand Down
68 changes: 13 additions & 55 deletions pkg/ccl/backupccl/restore_mid_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestRestoreMidSchemaChange(t *testing.T) {
for _, backupDir := range backupDirs {
fullBackupDir, err := filepath.Abs(filepath.Join(fullClusterVersionDir, backupDir.Name()))
require.NoError(t, err)
t.Run(backupDir.Name(), restoreMidSchemaChange(fullBackupDir, backupDir.Name(), isClusterRestore, blockLocation == "after"))
t.Run(backupDir.Name(), restoreMidSchemaChange(fullBackupDir, backupDir.Name(), isClusterRestore))
}
})
}
Expand All @@ -107,52 +107,25 @@ func TestRestoreMidSchemaChange(t *testing.T) {

// expectedSCJobCount returns the expected number of schema change jobs
// we expect to find.
func expectedSCJobCount(scName string, isClusterRestore, after bool) int {
func expectedSCJobCount(scName string) int {
// The number of schema change under test. These will be the ones that are
// synthesized in database restore.
var expNumSCJobs int
var numBackgroundSCJobs int

// Some test cases may have more than 1 background schema change job.
switch scName {
case "midmany":
numBackgroundSCJobs = 1 // the create table
// This test runs 3 schema changes on a single table.
expNumSCJobs = 3
case "midmultitable":
numBackgroundSCJobs = 2 // this test creates 2 tables
expNumSCJobs = 2 // this test perform a schema change for each table
expNumSCJobs = 2 // this test perform a schema change for each table
case "midprimarykeyswap":
// Create table + alter column is done in the prep stage of this test.
numBackgroundSCJobs = 2
// PK change + PK cleanup
expNumSCJobs = 2
if isClusterRestore && after {
expNumSCJobs = 1
}
case "midprimarykeyswapcleanup":
// This test performs an ALTER COLUMN, and the original ALTER PRIMARY
// KEY that is being cleaned up.
numBackgroundSCJobs = 3
expNumSCJobs = 1
default:
// Most test cases only have 1 schema change under test.
expNumSCJobs = 1
// Most test cases have just a CREATE TABLE job that created the table
// under test.
numBackgroundSCJobs = 1
}

// We drop defaultdb and postgres for full cluster restores
numBackgroundDropDatabaseSCJobs := 2
// Since we're doing a cluster restore, we need to account for all of
// the schema change jobs that existed in the backup.
if isClusterRestore {
expNumSCJobs += numBackgroundSCJobs + numBackgroundDropDatabaseSCJobs

// If we're performing a cluster restore, we also need to include the drop
// crdb_temp_system job.
expNumSCJobs++
}

return expNumSCJobs
Expand Down Expand Up @@ -189,32 +162,16 @@ func getTablesInTest(scName string) (tableNames []string) {
return
}

func verifyMidSchemaChange(
t *testing.T, scName string, kvDB *kv.DB, sqlDB *sqlutils.SQLRunner, isClusterRestore, after bool,
) {
func verifyMidSchemaChange(t *testing.T, scName string, kvDB *kv.DB, sqlDB *sqlutils.SQLRunner) {
tables := getTablesInTest(scName)

// Check that we are left with the expected number of schema change jobs.
expNumSchemaChangeJobs := expectedSCJobCount(scName, isClusterRestore, after)
schemaChangeJobs := sqlDB.QueryStr(t, "SELECT description FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE'")
require.Equal(t, expNumSchemaChangeJobs, len(schemaChangeJobs),
"Expected %d schema change jobs but found %v", expNumSchemaChangeJobs, schemaChangeJobs)
if isClusterRestore {
// Cluster restores should be restoring the exact job entries that were
// backed up, and therefore should not create jobs that contains "RESTORING"
// in the description.
schemaChangeJobs := sqlDB.QueryStr(t,
"SELECT description FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND description NOT LIKE '%RESTORING%'")
require.Equal(t, expNumSchemaChangeJobs, len(schemaChangeJobs),
"Expected %d schema change jobs but found %v", expNumSchemaChangeJobs, schemaChangeJobs)
} else {
// Non-cluster restores should create jobs with "RESTORE" in the job
// description.
schemaChangeJobs := sqlDB.QueryStr(t,
"SELECT description FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND description LIKE '%RESTORING%'")
require.Equal(t, expNumSchemaChangeJobs, len(schemaChangeJobs),
"Expected %d schema change jobs but found %v", expNumSchemaChangeJobs, schemaChangeJobs)
}
expNumSchemaChangeJobs := expectedSCJobCount(scName)

synthesizedSchemaChangeJobs := sqlDB.QueryStr(t,
"SELECT description FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND description LIKE '%RESTORING%'")
require.Equal(t, expNumSchemaChangeJobs, len(synthesizedSchemaChangeJobs),
"Expected %d schema change jobs but found %v", expNumSchemaChangeJobs, synthesizedSchemaChangeJobs)

for _, tableName := range tables {
validateTable(t, kvDB, sqlDB, "defaultdb", tableName)
Expand All @@ -226,7 +183,7 @@ func verifyMidSchemaChange(
}

func restoreMidSchemaChange(
backupDir, schemaChangeName string, isClusterRestore bool, after bool,
backupDir, schemaChangeName string, isClusterRestore bool,
) func(t *testing.T) {
return func(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -265,6 +222,7 @@ func restoreMidSchemaChange(
// Wait for all jobs to terminate. Some may fail since we don't restore
// adding spans.
sqlDB.CheckQueryResultsRetry(t, "SELECT * FROM crdb_internal.jobs WHERE job_type = 'SCHEMA CHANGE' AND NOT (status = 'succeeded' OR status = 'failed')", [][]string{})
verifyMidSchemaChange(t, schemaChangeName, kvDB, sqlDB, isClusterRestore, after)
verifyMidSchemaChange(t, schemaChangeName, kvDB, sqlDB)
sqlDB.CheckQueryResultsRetry(t, "SELECT * from crdb_internal.invalid_objects", [][]string{})
}
}
101 changes: 1 addition & 100 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,14 @@ import (
"context"
fmt "fmt"
"math"
"strings"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
descpb "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -155,98 +150,6 @@ func queryTableRowCount(
return int64(*count), nil
}

// jobsMigrationFunc resets the progress on schema change jobs, and marks all
// other jobs as reverting.
func jobsMigrationFunc(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, tempTableName string,
) (err error) {
executor := execCfg.InternalExecutor

const statesToRevert = `('` + string(jobs.StatusRunning) + `', ` +
`'` + string(jobs.StatusPauseRequested) + `', ` +
`'` + string(jobs.StatusPaused) + `')`

jobsToRevert := make([]int64, 0)
query := `SELECT id, payload FROM ` + tempTableName + ` WHERE status IN ` + statesToRevert
it, err := executor.QueryIteratorEx(
ctx, "restore-fetching-job-payloads", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
query)
if err != nil {
return errors.Wrap(err, "fetching job payloads")
}
defer func() {
closeErr := it.Close()
if err == nil {
err = closeErr
}
}()
for {
ok, err := it.Next(ctx)
if !ok {
if err != nil {
return err
}
break
}

r := it.Cur()
id, payloadBytes := r[0], r[1]
rawJobID, ok := id.(*tree.DInt)
if !ok {
return errors.Errorf("job: failed to read job id as DInt (was %T)", id)
}
jobID := int64(*rawJobID)

payload, err := jobs.UnmarshalPayload(payloadBytes)
if err != nil {
return errors.Wrap(err, "failed to unmarshal job to restore")
}
if payload.Type() == jobspb.TypeImport || payload.Type() == jobspb.TypeRestore {
jobsToRevert = append(jobsToRevert, jobID)
}
}

// Update the status for other jobs.
var updateStatusQuery strings.Builder
fmt.Fprintf(&updateStatusQuery, "UPDATE %s SET status = $1 WHERE id IN ", tempTableName)
fmt.Fprint(&updateStatusQuery, "(")
for i, job := range jobsToRevert {
if i > 0 {
fmt.Fprint(&updateStatusQuery, ", ")
}
fmt.Fprintf(&updateStatusQuery, "'%d'", job)
}
fmt.Fprint(&updateStatusQuery, ")")

if _, err := executor.Exec(ctx, "updating-job-status", txn, updateStatusQuery.String(), jobs.StatusCancelRequested); err != nil {
return errors.Wrap(err, "updating restored jobs as reverting")
}

return nil
}

// When restoring the jobs table we don't want to remove existing jobs, since
// that includes the restore that we're running.
func jobsRestoreFunc(
ctx context.Context,
execCfg *sql.ExecutorConfig,
txn *kv.Txn,
systemTableName, tempTableName string,
) error {
executor := execCfg.InternalExecutor

// When restoring jobs, don't clear the existing table.

restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s) ON CONFLICT DO NOTHING;",
systemTableName, tempTableName)
opName := systemTableName + "-data-insert"
if _, err := executor.Exec(ctx, opName, txn, restoreQuery); err != nil {
return errors.Wrapf(err, "inserting data to system.%s", systemTableName)
}
return nil
}

// When restoring the settings table, we want to make sure to not override the
// version.
func settingsRestoreFunc(
Expand Down Expand Up @@ -314,9 +217,7 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
shouldIncludeInClusterBackup: optInToClusterBackup,
},
systemschema.JobsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
migrationFunc: jobsMigrationFunc,
customRestoreFunc: jobsRestoreFunc,
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
systemschema.ScheduledJobsTable.GetName(): {
shouldIncludeInClusterBackup: optInToClusterBackup,
Expand Down
Loading

0 comments on commit 4ab8292

Please sign in to comment.