Skip to content

Commit

Permalink
backupccl: issue protected timestamps during on restore spans
Browse files Browse the repository at this point in the history
Fixes cockroachdb#91148

Release note: None
  • Loading branch information
msbutler committed Nov 16, 2022
1 parent 507721e commit a57ac1b
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 17 deletions.
61 changes: 60 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand Down Expand Up @@ -1354,6 +1356,46 @@ func createImportingDescriptors(
return dataToPreRestore, preValidation, trackedRestore, nil
}

// protectRestoreSpans issues a protected timestamp over the span we seek to
// restore. If a pts already exists in the job record, due to previous call of
// this function, this noops.
func protectRestoreSpans(
ctx context.Context,
execCfg *sql.ExecutorConfig,
jobID jobspb.JobID,
details jobspb.RestoreDetails,
tenantRekeys []execinfrapb.TenantRekey,
) (jobsprotectedts.Cleaner, error) {
if details.ProtectedTimestampRecord != nil {
// A protected time stamp has already been set. No need to write a new one.
return nil, nil
}
var target *ptpb.Target
switch {
case details.DescriptorCoverage == tree.AllDescriptors:
target = ptpb.MakeClusterTarget()
case len(tenantRekeys) > 0:
tenantIDs := make([]roachpb.TenantID, 0, len(tenantRekeys))
for _, tenant := range tenantRekeys {
tenantIDs = append(tenantIDs, tenant.NewID)
}
target = ptpb.MakeTenantsTarget(tenantIDs)
case len(details.DatabaseDescs) > 0:
databaseIDs := make([]descpb.ID, 0, len(details.DatabaseDescs))
for i := range details.DatabaseDescs {
databaseIDs = append(databaseIDs, details.DatabaseDescs[i].GetID())
}
target = ptpb.MakeSchemaObjectsTarget(databaseIDs)
default:
tableIDs := make([]descpb.ID, 0, len(details.TableDescs))
for i := range details.TableDescs {
tableIDs = append(tableIDs, details.TableDescs[i].GetID())
}
target = ptpb.MakeSchemaObjectsTarget(tableIDs)
}
return execCfg.ProtectedTimestampManager.Protect(ctx, jobID, target, hlc.Timestamp{})
}

// remapPublicSchemas is used to create a descriptor backed public schema
// for databases that have virtual public schemas.
// The rewrite map is updated with the new public schema id.
Expand Down Expand Up @@ -1541,6 +1583,16 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
mainData.addTenant(from, to)
}

_, err = protectRestoreSpans(ctx, p.ExecCfg(), r.job.ID(), details, mainData.tenantRekeys)
if err != nil {
return err
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint(
"restore.before_flow"); err != nil {
return err
}

numNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() {
Expand All @@ -1551,6 +1603,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}

var resTotal roachpb.RowCount

if !preData.isEmpty() {
res, err := restoreWithRetry(
ctx,
Expand Down Expand Up @@ -1689,7 +1742,9 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
return err
}
}

if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job.ID()); err != nil {
return err
}
r.notifyStatsRefresherOfNewTables()

r.restoreStats = resTotal
Expand Down Expand Up @@ -2212,6 +2267,10 @@ func (r *restoreResumer) OnFailOrCancel(
telemetry.CountBucketed("restore.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds()))

if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job.ID()); err != nil {
return err
}

details := r.job.Details().(jobspb.RestoreDetails)
logJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr)

Expand Down
36 changes: 26 additions & 10 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -311,22 +311,31 @@ message RestoreDetails {
repeated string uris = 3 [(gogoproto.customname) = "URIs"];
repeated BackupLocalityInfo backup_locality_info = 7 [(gogoproto.nullable) = false];

// We keep track of the descriptors that we're creating as part of the
// restore.
// DatabaseDescs contain the database descriptors for the whole databases we're restoring,
// remapped to their new IDs.
repeated sqlbase.DatabaseDescriptor database_descs = 16;

// TableDescs contain the table descriptors for the whole tables we're restoring,
// remapped to their new IDs.
repeated sqlbase.TableDescriptor table_descs = 5;
// TypeDescs contains the type descriptors written as part of this restore.
// Note that it does not include type descriptors existing in the cluster
// that backed up types are remapped to.

// TypeDescs contains the type descriptors written as part of this restore,
// remapped with their new IDs. Note that it does not include type descriptors
// existing in the cluster that backed up types are remapped to.
repeated sqlbase.TypeDescriptor type_descs = 14;
// SchemaDescs contains schema descriptors written as part of this restore.
// Like TypeDescs, it does not include existing schema descriptors in the
// cluster that backed up schemas are remapped to.

// SchemaDescs contains schema descriptors written as part of this restore,
// remapped with their new IDs. Like TypeDescs, it does not include existing
// schema descriptors in the cluster that backed up schemas are remapped to.
repeated sqlbase.SchemaDescriptor schema_descs = 15;

// FunctionDescs contains function descriptors written as part of this
// restore.
// restore, remapped with their new IDs.
repeated sqlbase.FunctionDescriptor function_descs = 27;
reserved 13;

// Tenants contain info on each tenant to restore. Note this field contains the backed up
// tenant id.
repeated sqlbase.TenantInfoWithUsage tenants = 21 [(gogoproto.nullable) = false];

string override_db = 6 [(gogoproto.customname) = "OverrideDB"];
Expand Down Expand Up @@ -394,7 +403,14 @@ message RestoreDetails {

bool VerifyData = 26;

// NEXT ID: 28.
// ProtectedTimestampRecord is the ID of the protected timestamp record
// corresponding to this job.
bytes protected_timestamp_record = 28 [
(gogoproto.customname) = "ProtectedTimestampRecord",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
];

// NEXT ID: 29.
}


Expand Down
14 changes: 8 additions & 6 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Cleaner func(ctx context.Context) error

func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details {
switch v := details.(type) {
case jobspb.RestoreDetails:
v.ProtectedTimestampRecord = u
return v
case jobspb.NewSchemaChangeDetails:
v.ProtectedTimestampRecord = u
return v
Expand All @@ -63,6 +66,8 @@ func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details {

func getProtectedTSOnJob(details jobspb.Details) *uuid.UUID {
switch v := details.(type) {
case jobspb.RestoreDetails:
return v.ProtectedTimestampRecord
case jobspb.NewSchemaChangeDetails:
return v.ProtectedTimestampRecord
case jobspb.SchemaChangeDetails:
Expand Down Expand Up @@ -127,7 +132,8 @@ func (p *Manager) TryToProtectBeforeGC(

select {
case <-time.After(waitBeforeProtectedTS):
unprotectCallback, err = p.Protect(ctx, jobID, tableDesc, readAsOf)
target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()})
unprotectCallback, err = p.Protect(ctx, jobID, target, readAsOf)
if err != nil {
return err
}
Expand Down Expand Up @@ -158,10 +164,7 @@ func (p *Manager) TryToProtectBeforeGC(
// a new timestamp. Returns a Cleaner function to remove the protected timestamp,
// if one was installed.
func (p *Manager) Protect(
ctx context.Context,
jobID jobspb.JobID,
tableDesc catalog.TableDescriptor,
readAsOf hlc.Timestamp,
ctx context.Context, jobID jobspb.JobID, target *ptpb.Target, readAsOf hlc.Timestamp,
) (Cleaner, error) {
// If we are not running a historical query, nothing to do here.
if readAsOf.IsEmpty() {
Expand All @@ -187,7 +190,6 @@ func (p *Manager) Protect(
md.Payload.Details = jobspb.WrapPayloadDetails(details)
ju.UpdatePayload(md.Payload)

target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()})
rec := MakeRecord(*protectedtsID,
int64(jobID), readAsOf, nil, Jobs, target)
return p.protectedTSProvider.Protect(ctx, txn, rec)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/schemachanger/scexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings/cluster",
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/schemachanger/scexec/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -408,6 +409,17 @@ type ProtectedTimestampManager interface {
ctx context.Context, jobID jobspb.JobID, tableDesc catalog.TableDescriptor, readAsOf hlc.Timestamp,
) jobsprotectedts.Cleaner

// Protect adds a protected timestamp record for a historical transaction for
// a specific target immediately. If an existing record is found, it will be
// updated with a new timestamp. Returns a Cleaner function to remove the
// protected timestamp, if one was installed.
Protect(
ctx context.Context,
jobID jobspb.JobID,
target *ptpb.Target,
readAsOf hlc.Timestamp,
) (jobsprotectedts.Cleaner, error)

// Unprotect unprotects just based on a job ID, mainly for last resort cleanup.
// Note: This should only be used for job cleanup if its not currently,
// executing.
Expand Down

0 comments on commit a57ac1b

Please sign in to comment.