From a57ac1b3dc025520cf440c5547376925c0104af7 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Wed, 2 Nov 2022 10:09:31 -0400 Subject: [PATCH] backupccl: issue protected timestamps during on restore spans Fixes #91148 Release note: None --- pkg/ccl/backupccl/restore_job.go | 61 ++++++++++++++++++- pkg/jobs/jobspb/jobs.proto | 36 ++++++++--- .../jobs_protected_ts_manager.go | 14 +++-- pkg/sql/schemachanger/scexec/BUILD.bazel | 1 + pkg/sql/schemachanger/scexec/dependencies.go | 12 ++++ 5 files changed, 107 insertions(+), 17 deletions(-) diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 3d5bd0c897bc..78568b8f6291 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -26,8 +26,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/joberror" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -1354,6 +1356,46 @@ func createImportingDescriptors( return dataToPreRestore, preValidation, trackedRestore, nil } +// protectRestoreSpans issues a protected timestamp over the span we seek to +// restore. If a pts already exists in the job record, due to previous call of +// this function, this noops. +func protectRestoreSpans( + ctx context.Context, + execCfg *sql.ExecutorConfig, + jobID jobspb.JobID, + details jobspb.RestoreDetails, + tenantRekeys []execinfrapb.TenantRekey, +) (jobsprotectedts.Cleaner, error) { + if details.ProtectedTimestampRecord != nil { + // A protected time stamp has already been set. No need to write a new one. + return nil, nil + } + var target *ptpb.Target + switch { + case details.DescriptorCoverage == tree.AllDescriptors: + target = ptpb.MakeClusterTarget() + case len(tenantRekeys) > 0: + tenantIDs := make([]roachpb.TenantID, 0, len(tenantRekeys)) + for _, tenant := range tenantRekeys { + tenantIDs = append(tenantIDs, tenant.NewID) + } + target = ptpb.MakeTenantsTarget(tenantIDs) + case len(details.DatabaseDescs) > 0: + databaseIDs := make([]descpb.ID, 0, len(details.DatabaseDescs)) + for i := range details.DatabaseDescs { + databaseIDs = append(databaseIDs, details.DatabaseDescs[i].GetID()) + } + target = ptpb.MakeSchemaObjectsTarget(databaseIDs) + default: + tableIDs := make([]descpb.ID, 0, len(details.TableDescs)) + for i := range details.TableDescs { + tableIDs = append(tableIDs, details.TableDescs[i].GetID()) + } + target = ptpb.MakeSchemaObjectsTarget(tableIDs) + } + return execCfg.ProtectedTimestampManager.Protect(ctx, jobID, target, hlc.Timestamp{}) +} + // remapPublicSchemas is used to create a descriptor backed public schema // for databases that have virtual public schemas. // The rewrite map is updated with the new public schema id. @@ -1541,6 +1583,16 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro mainData.addTenant(from, to) } + _, err = protectRestoreSpans(ctx, p.ExecCfg(), r.job.ID(), details, mainData.tenantRekeys) + if err != nil { + return err + } + + if err := p.ExecCfg().JobRegistry.CheckPausepoint( + "restore.before_flow"); err != nil { + return err + } + numNodes, err := clusterNodeCount(p.ExecCfg().Gossip) if err != nil { if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() { @@ -1551,6 +1603,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro } var resTotal roachpb.RowCount + if !preData.isEmpty() { res, err := restoreWithRetry( ctx, @@ -1689,7 +1742,9 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro return err } } - + if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job.ID()); err != nil { + return err + } r.notifyStatsRefresherOfNewTables() r.restoreStats = resTotal @@ -2212,6 +2267,10 @@ func (r *restoreResumer) OnFailOrCancel( telemetry.CountBucketed("restore.duration-sec.failed", int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds())) + if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job.ID()); err != nil { + return err + } + details := r.job.Details().(jobspb.RestoreDetails) logJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr) diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index cba1fe344dc9..867234251bd5 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -311,22 +311,31 @@ message RestoreDetails { repeated string uris = 3 [(gogoproto.customname) = "URIs"]; repeated BackupLocalityInfo backup_locality_info = 7 [(gogoproto.nullable) = false]; - // We keep track of the descriptors that we're creating as part of the - // restore. + // DatabaseDescs contain the database descriptors for the whole databases we're restoring, + // remapped to their new IDs. repeated sqlbase.DatabaseDescriptor database_descs = 16; + + // TableDescs contain the table descriptors for the whole tables we're restoring, + // remapped to their new IDs. repeated sqlbase.TableDescriptor table_descs = 5; - // TypeDescs contains the type descriptors written as part of this restore. - // Note that it does not include type descriptors existing in the cluster - // that backed up types are remapped to. + + // TypeDescs contains the type descriptors written as part of this restore, + // remapped with their new IDs. Note that it does not include type descriptors + // existing in the cluster that backed up types are remapped to. repeated sqlbase.TypeDescriptor type_descs = 14; - // SchemaDescs contains schema descriptors written as part of this restore. - // Like TypeDescs, it does not include existing schema descriptors in the - // cluster that backed up schemas are remapped to. + + // SchemaDescs contains schema descriptors written as part of this restore, + // remapped with their new IDs. Like TypeDescs, it does not include existing + // schema descriptors in the cluster that backed up schemas are remapped to. repeated sqlbase.SchemaDescriptor schema_descs = 15; + // FunctionDescs contains function descriptors written as part of this - // restore. + // restore, remapped with their new IDs. repeated sqlbase.FunctionDescriptor function_descs = 27; reserved 13; + + // Tenants contain info on each tenant to restore. Note this field contains the backed up + // tenant id. repeated sqlbase.TenantInfoWithUsage tenants = 21 [(gogoproto.nullable) = false]; string override_db = 6 [(gogoproto.customname) = "OverrideDB"]; @@ -394,7 +403,14 @@ message RestoreDetails { bool VerifyData = 26; - // NEXT ID: 28. + // ProtectedTimestampRecord is the ID of the protected timestamp record + // corresponding to this job. + bytes protected_timestamp_record = 28 [ + (gogoproto.customname) = "ProtectedTimestampRecord", + (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID" + ]; + + // NEXT ID: 29. } diff --git a/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go b/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go index 42889f0e5b97..4a8ce1ac6f3f 100644 --- a/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go +++ b/pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go @@ -50,6 +50,9 @@ type Cleaner func(ctx context.Context) error func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details { switch v := details.(type) { + case jobspb.RestoreDetails: + v.ProtectedTimestampRecord = u + return v case jobspb.NewSchemaChangeDetails: v.ProtectedTimestampRecord = u return v @@ -63,6 +66,8 @@ func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details { func getProtectedTSOnJob(details jobspb.Details) *uuid.UUID { switch v := details.(type) { + case jobspb.RestoreDetails: + return v.ProtectedTimestampRecord case jobspb.NewSchemaChangeDetails: return v.ProtectedTimestampRecord case jobspb.SchemaChangeDetails: @@ -127,7 +132,8 @@ func (p *Manager) TryToProtectBeforeGC( select { case <-time.After(waitBeforeProtectedTS): - unprotectCallback, err = p.Protect(ctx, jobID, tableDesc, readAsOf) + target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()}) + unprotectCallback, err = p.Protect(ctx, jobID, target, readAsOf) if err != nil { return err } @@ -158,10 +164,7 @@ func (p *Manager) TryToProtectBeforeGC( // a new timestamp. Returns a Cleaner function to remove the protected timestamp, // if one was installed. func (p *Manager) Protect( - ctx context.Context, - jobID jobspb.JobID, - tableDesc catalog.TableDescriptor, - readAsOf hlc.Timestamp, + ctx context.Context, jobID jobspb.JobID, target *ptpb.Target, readAsOf hlc.Timestamp, ) (Cleaner, error) { // If we are not running a historical query, nothing to do here. if readAsOf.IsEmpty() { @@ -187,7 +190,6 @@ func (p *Manager) Protect( md.Payload.Details = jobspb.WrapPayloadDetails(details) ju.UpdatePayload(md.Payload) - target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()}) rec := MakeRecord(*protectedtsID, int64(jobID), readAsOf, nil, Jobs, target) return p.protectedTSProvider.Protect(ctx, txn, rec) diff --git a/pkg/sql/schemachanger/scexec/BUILD.bazel b/pkg/sql/schemachanger/scexec/BUILD.bazel index 67d7017e304f..0c524a526910 100644 --- a/pkg/sql/schemachanger/scexec/BUILD.bazel +++ b/pkg/sql/schemachanger/scexec/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/jobs/jobspb", "//pkg/jobs/jobsprotectedts", "//pkg/keys", + "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/security/username", "//pkg/settings/cluster", diff --git a/pkg/sql/schemachanger/scexec/dependencies.go b/pkg/sql/schemachanger/scexec/dependencies.go index 73439ea21266..ec26ea92f7e9 100644 --- a/pkg/sql/schemachanger/scexec/dependencies.go +++ b/pkg/sql/schemachanger/scexec/dependencies.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -408,6 +409,17 @@ type ProtectedTimestampManager interface { ctx context.Context, jobID jobspb.JobID, tableDesc catalog.TableDescriptor, readAsOf hlc.Timestamp, ) jobsprotectedts.Cleaner + // Protect adds a protected timestamp record for a historical transaction for + // a specific target immediately. If an existing record is found, it will be + // updated with a new timestamp. Returns a Cleaner function to remove the + // protected timestamp, if one was installed. + Protect( + ctx context.Context, + jobID jobspb.JobID, + target *ptpb.Target, + readAsOf hlc.Timestamp, + ) (jobsprotectedts.Cleaner, error) + // Unprotect unprotects just based on a job ID, mainly for last resort cleanup. // Note: This should only be used for job cleanup if its not currently, // executing.