diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 02ab85b2124f..88345cf4dbbf 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -224,6 +224,7 @@ go_test( "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", + "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/security", diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index f7f3a1d2edd4..061ed75a5b45 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -63,6 +63,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -9338,6 +9339,103 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) { }) } +// TestProtectRestoreSpans ensures that a protected timestamp is issued before +// the restore flow begins on the correct target and is released when the restore ends. +func TestProtectRestoreSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + numAccounts := 100 + params := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}, + }, + } + tc, sqlDB, tempDir, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode, + numAccounts, + InitManualReplication, params) + _, emptyDB, cleanupEmptyCluster := backupRestoreTestSetupEmpty(t, singleNode, tempDir, + InitManualReplication, params) + defer cleanupEmptyCluster() + defer cleanupFn() + + ctx := context.Background() + if !tc.StartedDefaultTestTenant() { + _, err := tc.Servers[0].StartTenant(ctx, base.TestTenantArgs{TenantID: roachpb. + MustMakeTenantID(10)}) + require.NoError(t, err) + } + sqlDB.Exec(t, `BACKUP INTO $1`, localFoo) + + for _, subtest := range []struct { + name string + restoreStmt string + }{ + { + name: "tenant", + restoreStmt: `RESTORE TENANT 10 FROM LATEST IN $1 WITH detached, tenant = '20'`, + }, + { + name: "database", + restoreStmt: `RESTORE DATABASE data FROM LATEST IN $1 WITH detached, new_db_name=data2`, + }, + { + name: "table", + restoreStmt: `RESTORE TABLE bank FROM LATEST IN $1 WITH detached, into_db='defaultdb'`, + }, + { + name: "cluster", + restoreStmt: `RESTORE FROM LATEST IN $1 WITH detached`, + }, + } { + if tc.StartedDefaultTestTenant() && subtest.name == "tenant" { + // Cannot run a restore of a tenant within a tenant + continue + } + t.Run(subtest.name, func(t *testing.T) { + if subtest.name == "cluster" { + // Use the empty cluster for cluster restore + sqlDB = emptyDB + sqlDB.Exec(t, "USE system") + } + // Begin a Restore and assert that PTS with the correct target was persisted + sqlDB.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_flow'`) + var jobId jobspb.JobID + sqlDB.QueryRow(t, subtest.restoreStmt, localFoo).Scan(&jobId) + jobutils.WaitForJobToPause(t, sqlDB, jobId) + + restoreDetails := jobutils.GetJobPayload(t, sqlDB, jobId).GetRestore() + require.NotNil(t, restoreDetails.ProtectedTimestampRecord) + + target := ptutil.GetPTSTarget(t, sqlDB, restoreDetails.ProtectedTimestampRecord) + switch subtest.name { + case "cluster": + // The target cluster object doesn't have any info, + // so just assert that the right type was instantiated. + require.NotNil(t, target.GetCluster()) + case "tenant": + targetIDs := target.GetTenants() + require.Equal(t, roachpb.TenantID{InternalValue: 20}, targetIDs.IDs[0]) + case "database": + targetIDs := target.GetSchemaObjects() + require.Equal(t, restoreDetails.DatabaseDescs[0].GetID(), targetIDs.IDs[0]) + case "table": + targetIDs := target.GetSchemaObjects() + require.Equal(t, restoreDetails.TableDescs[0].GetID(), targetIDs.IDs[0]) + } + // Finish the restore and ensure the PTS record was removed + sqlDB.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = ''`) + sqlDB.Exec(t, `RESUME JOB $1`, jobId) + jobutils.WaitForJobToSucceed(t, sqlDB, jobId) + + var count int + sqlDB.QueryRow(t, `SELECT count(*) FROM system.protected_ts_records WHERE id = $1`, + restoreDetails.ProtectedTimestampRecord).Scan(&count) + require.Equal(t, 0, count) + }) + } +} + // TestBackupRestoreSystemUsers tests RESTORE SYSTEM USERS feature which allows user to // restore users from a backup into current cluster and regrant roles. func TestBackupRestoreSystemUsers(t *testing.T) { diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 33225caa4af0..527cf961fadc 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -25,8 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -1355,6 +1357,57 @@ func createImportingDescriptors( return dataToPreRestore, preValidation, trackedRestore, nil } +// protectRestoreSpans issues a protected timestamp over the span we seek to +// restore. If a pts already exists in the job record, due to previous call of +// this function, this noops. +func protectRestoreSpans( + ctx context.Context, + execCfg *sql.ExecutorConfig, + job *jobs.Job, + details jobspb.RestoreDetails, + tenantRekeys []execinfrapb.TenantRekey, +) (jobsprotectedts.Cleaner, error) { + if details.ProtectedTimestampRecord != nil { + // A protected time stamp has already been set. No need to write a new one. + return nil, nil + } + var target *ptpb.Target + switch { + case details.DescriptorCoverage == tree.AllDescriptors: + // During a cluster restore, protect the whole key space. + target = ptpb.MakeClusterTarget() + case len(details.Tenants) > 0: + // During restores of tenants, protect whole tenant key spans. + tenantIDs := make([]roachpb.TenantID, 0, len(tenantRekeys)) + for _, tenant := range tenantRekeys { + if tenant.OldID == roachpb.SystemTenantID { + // The system tenant rekey acts as metadata for restore processors during + // restores of tenants. The host tenant's keyspace does not need protection. + // https://github.com/cockroachdb/cockroach/pull/73647 + continue + } + tenantIDs = append(tenantIDs, tenant.NewID) + } + target = ptpb.MakeTenantsTarget(tenantIDs) + case len(details.DatabaseDescs) > 0: + // During database restores, protect whole databases. + databaseIDs := make([]descpb.ID, 0, len(details.DatabaseDescs)) + for i := range details.DatabaseDescs { + databaseIDs = append(databaseIDs, details.DatabaseDescs[i].GetID()) + } + target = ptpb.MakeSchemaObjectsTarget(databaseIDs) + default: + // Else, protect individual tables. + tableIDs := make([]descpb.ID, 0, len(details.TableDescs)) + for i := range details.TableDescs { + tableIDs = append(tableIDs, details.TableDescs[i].GetID()) + } + target = ptpb.MakeSchemaObjectsTarget(tableIDs) + } + protectedTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + return execCfg.ProtectedTimestampManager.Protect(ctx, job, target, protectedTime) +} + // remapPublicSchemas is used to create a descriptor backed public schema // for databases that have virtual public schemas. // The rewrite map is updated with the new public schema id. @@ -1542,6 +1595,17 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro mainData.addTenant(from, to) } + _, err = protectRestoreSpans(ctx, p.ExecCfg(), r.job, details, mainData.tenantRekeys) + if err != nil { + return err + } + // the restore details now have pts + details = r.job.Details().(jobspb.RestoreDetails) + if err := p.ExecCfg().JobRegistry.CheckPausepoint( + "restore.before_flow"); err != nil { + return err + } + numNodes, err := clusterNodeCount(p.ExecCfg().Gossip) if err != nil { if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() { @@ -1552,6 +1616,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } var resTotal roachpb.RowCount + if !preData.isEmpty() { res, err := restoreWithRetry( ctx, @@ -1690,7 +1755,9 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } } - + if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job); err != nil { + log.Errorf(ctx, "failed to release protected timestamp: %v", err) + } r.notifyStatsRefresherOfNewTables() r.restoreStats = resTotal @@ -2214,6 +2281,10 @@ func (r *restoreResumer) OnFailOrCancel( telemetry.CountBucketed("restore.duration-sec.failed", int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds())) + if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job); err != nil { + return err + } + details := r.job.Details().(jobspb.RestoreDetails) logJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr) diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index 57eb23fd566d..f7fc9ffeb8d5 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -338,22 +338,31 @@ message RestoreDetails { repeated string uris = 3 [(gogoproto.customname) = "URIs"]; repeated BackupLocalityInfo backup_locality_info = 7 [(gogoproto.nullable) = false]; - // We keep track of the descriptors that we're creating as part of the - // restore. + // DatabaseDescs contain the database descriptors for the whole databases we're restoring, + // remapped to their new IDs. repeated sqlbase.DatabaseDescriptor database_descs = 16; + + // TableDescs contain the table descriptors for the whole tables we're restoring, + // remapped to their new IDs. repeated sqlbase.TableDescriptor table_descs = 5; - // TypeDescs contains the type descriptors written as part of this restore. - // Note that it does not include type descriptors existing in the cluster - // that backed up types are remapped to. + + // TypeDescs contains the type descriptors written as part of this restore, + // remapped with their new IDs. Note that it does not include type descriptors + // existing in the cluster that backed up types are remapped to. repeated sqlbase.TypeDescriptor type_descs = 14; - // SchemaDescs contains schema descriptors written as part of this restore. - // Like TypeDescs, it does not include existing schema descriptors in the - // cluster that backed up schemas are remapped to. + + // SchemaDescs contains schema descriptors written as part of this restore, + // remapped with their new IDs. Like TypeDescs, it does not include existing + // schema descriptors in the cluster that backed up schemas are remapped to. repeated sqlbase.SchemaDescriptor schema_descs = 15; + // FunctionDescs contains function descriptors written as part of this - // restore. + // restore, remapped with their new IDs. repeated sqlbase.FunctionDescriptor function_descs = 27; reserved 13; + + // Tenants contain info on each tenant to restore. Note this field contains the backed up + // tenant id. repeated sqlbase.TenantInfoWithUsage tenants = 21 [(gogoproto.nullable) = false]; string override_db = 6 [(gogoproto.customname) = "OverrideDB"]; @@ -421,7 +430,14 @@ message RestoreDetails { bool VerifyData = 26; - // NEXT ID: 28. + // ProtectedTimestampRecord is the ID of the protected timestamp record + // corresponding to this job. + bytes protected_timestamp_record = 28 [ + (gogoproto.customname) = "ProtectedTimestampRecord", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" + ]; + + // NEXT ID: 29. } diff --git a/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go b/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go index b700014817f5..4f34819c2a1a 100644 --- a/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go +++ b/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go @@ -50,6 +50,9 @@ type Cleaner func(ctx context.Context) error func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details { switch v := details.(type) { + case jobspb.RestoreDetails: + v.ProtectedTimestampRecord = u + return v case jobspb.NewSchemaChangeDetails: v.ProtectedTimestampRecord = u return v @@ -63,6 +66,8 @@ func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details { func getProtectedTSOnJob(details jobspb.Details) *uuid.UUID { switch v := details.(type) { + case jobspb.RestoreDetails: + return v.ProtectedTimestampRecord case jobspb.NewSchemaChangeDetails: return v.ProtectedTimestampRecord case jobspb.SchemaChangeDetails: diff --git a/pkg/kv/kvserver/protectedts/ptutil/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptutil/BUILD.bazel index 855c11e39a64..808dde77050f 100644 --- a/pkg/kv/kvserver/protectedts/ptutil/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptutil/BUILD.bazel @@ -7,12 +7,16 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil", visibility = ["//visibility:public"], deps = [ + "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/spanconfig", "//pkg/spanconfig/spanconfigptsreader", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", "//pkg/util/hlc", + "//pkg/util/protoutil", + "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", ], ) diff --git a/pkg/kv/kvserver/protectedts/ptutil/testutils.go b/pkg/kv/kvserver/protectedts/ptutil/testutils.go index 5d4a2c8e4078..b663d49b7d95 100644 --- a/pkg/kv/kvserver/protectedts/ptutil/testutils.go +++ b/pkg/kv/kvserver/protectedts/ptutil/testutils.go @@ -14,12 +14,16 @@ import ( "context" "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" ) @@ -60,3 +64,13 @@ func TestingVerifyProtectionTimestampExistsOnSpans( }) return nil } + +func GetPTSTarget(t *testing.T, db *sqlutils.SQLRunner, ptsID *uuid.UUID) *ptpb.Target { + ret := &ptpb.Target{} + var buf []byte + db.QueryRow(t, `SELECT target FROM system.protected_ts_records WHERE id = $1`, ptsID).Scan(&buf) + if err := protoutil.Unmarshal(buf, ret); err != nil { + t.Fatal(err) + } + return ret +} diff --git a/pkg/testutils/jobutils/jobs_verification.go b/pkg/testutils/jobutils/jobs_verification.go index 2e1ca2f26b62..73381be8cfa2 100644 --- a/pkg/testutils/jobutils/jobs_verification.go +++ b/pkg/testutils/jobutils/jobs_verification.go @@ -262,3 +262,14 @@ func GetJobProgress(t *testing.T, db *sqlutils.SQLRunner, jobID jobspb.JobID) *j } return ret } + +// GetJobPayload loads the Payload message associated with the job. +func GetJobPayload(t *testing.T, db *sqlutils.SQLRunner, jobID jobspb.JobID) *jobspb.Payload { + ret := &jobspb.Payload{} + var buf []byte + db.QueryRow(t, `SELECT payload FROM system.jobs WHERE id = $1`, jobID).Scan(&buf) + if err := protoutil.Unmarshal(buf, ret); err != nil { + t.Fatal(err) + } + return ret +}