Skip to content

Commit

Permalink
backupccl: protect restore spans
Browse files Browse the repository at this point in the history
Fixes cockroachdb#91148

Release note: None
  • Loading branch information
msbutler committed Dec 5, 2022
1 parent 591995a commit 8ec0e60
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 11 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ go_test(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
98 changes: 98 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand Down Expand Up @@ -9338,6 +9339,103 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) {
})
}

// TestProtectRestoreSpans ensures that a protected timestamp is issued before
// the restore flow begins on the correct target and is released when the restore ends.
func TestProtectRestoreSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

numAccounts := 100
params := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()},
},
}
tc, sqlDB, tempDir, cleanupFn := backupRestoreTestSetupWithParams(t, singleNode,
numAccounts,
InitManualReplication, params)
_, emptyDB, cleanupEmptyCluster := backupRestoreTestSetupEmpty(t, singleNode, tempDir,
InitManualReplication, params)
defer cleanupEmptyCluster()
defer cleanupFn()

ctx := context.Background()
if !tc.StartedDefaultTestTenant() {
_, err := tc.Servers[0].StartTenant(ctx, base.TestTenantArgs{TenantID: roachpb.
MustMakeTenantID(10)})
require.NoError(t, err)
}
sqlDB.Exec(t, `BACKUP INTO $1`, localFoo)

for _, subtest := range []struct {
name string
restoreStmt string
}{
{
name: "tenant",
restoreStmt: `RESTORE TENANT 10 FROM LATEST IN $1 WITH detached, tenant = '20'`,
},
{
name: "database",
restoreStmt: `RESTORE DATABASE data FROM LATEST IN $1 WITH detached, new_db_name=data2`,
},
{
name: "table",
restoreStmt: `RESTORE TABLE bank FROM LATEST IN $1 WITH detached, into_db='defaultdb'`,
},
{
name: "cluster",
restoreStmt: `RESTORE FROM LATEST IN $1 WITH detached`,
},
} {
if tc.StartedDefaultTestTenant() && subtest.name == "tenant" {
// Cannot run a restore of a tenant within a tenant
continue
}
t.Run(subtest.name, func(t *testing.T) {
if subtest.name == "cluster" {
// Use the empty cluster for cluster restore
sqlDB = emptyDB
sqlDB.Exec(t, "USE system")
}
// Begin a Restore and assert that PTS with the correct target was persisted
sqlDB.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_flow'`)
var jobId jobspb.JobID
sqlDB.QueryRow(t, subtest.restoreStmt, localFoo).Scan(&jobId)
jobutils.WaitForJobToPause(t, sqlDB, jobId)

restoreDetails := jobutils.GetJobPayload(t, sqlDB, jobId).GetRestore()
require.NotNil(t, restoreDetails.ProtectedTimestampRecord)

target := ptutil.GetPTSTarget(t, sqlDB, restoreDetails.ProtectedTimestampRecord)
switch subtest.name {
case "cluster":
// The target cluster object doesn't have any info,
// so just assert that the right type was instantiated.
require.NotNil(t, target.GetCluster())
case "tenant":
targetIDs := target.GetTenants()
require.Equal(t, roachpb.TenantID{InternalValue: 20}, targetIDs.IDs[0])
case "database":
targetIDs := target.GetSchemaObjects()
require.Equal(t, restoreDetails.DatabaseDescs[0].GetID(), targetIDs.IDs[0])
case "table":
targetIDs := target.GetSchemaObjects()
require.Equal(t, restoreDetails.TableDescs[0].GetID(), targetIDs.IDs[0])
}
// Finish the restore and ensure the PTS record was removed
sqlDB.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = ''`)
sqlDB.Exec(t, `RESUME JOB $1`, jobId)
jobutils.WaitForJobToSucceed(t, sqlDB, jobId)

var count int
sqlDB.QueryRow(t, `SELECT count(*) FROM system.protected_ts_records WHERE id = $1`,
restoreDetails.ProtectedTimestampRecord).Scan(&count)
require.Equal(t, 0, count)
})
}
}

// TestBackupRestoreSystemUsers tests RESTORE SYSTEM USERS feature which allows user to
// restore users from a backup into current cluster and regrant roles.
func TestBackupRestoreSystemUsers(t *testing.T) {
Expand Down
73 changes: 72 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand Down Expand Up @@ -1355,6 +1357,57 @@ func createImportingDescriptors(
return dataToPreRestore, preValidation, trackedRestore, nil
}

// protectRestoreSpans issues a protected timestamp over the span we seek to
// restore. If a pts already exists in the job record, due to previous call of
// this function, this noops.
func protectRestoreSpans(
ctx context.Context,
execCfg *sql.ExecutorConfig,
job *jobs.Job,
details jobspb.RestoreDetails,
tenantRekeys []execinfrapb.TenantRekey,
) (jobsprotectedts.Cleaner, error) {
if details.ProtectedTimestampRecord != nil {
// A protected time stamp has already been set. No need to write a new one.
return nil, nil
}
var target *ptpb.Target
switch {
case details.DescriptorCoverage == tree.AllDescriptors:
// During a cluster restore, protect the whole key space.
target = ptpb.MakeClusterTarget()
case len(details.Tenants) > 0:
// During restores of tenants, protect whole tenant key spans.
tenantIDs := make([]roachpb.TenantID, 0, len(tenantRekeys))
for _, tenant := range tenantRekeys {
if tenant.OldID == roachpb.SystemTenantID {
// The system tenant rekey acts as metadata for restore processors during
// restores of tenants. The host tenant's keyspace does not need protection.
// https://github.com/cockroachdb/cockroach/pull/73647
continue
}
tenantIDs = append(tenantIDs, tenant.NewID)
}
target = ptpb.MakeTenantsTarget(tenantIDs)
case len(details.DatabaseDescs) > 0:
// During database restores, protect whole databases.
databaseIDs := make([]descpb.ID, 0, len(details.DatabaseDescs))
for i := range details.DatabaseDescs {
databaseIDs = append(databaseIDs, details.DatabaseDescs[i].GetID())
}
target = ptpb.MakeSchemaObjectsTarget(databaseIDs)
default:
// Else, protect individual tables.
tableIDs := make([]descpb.ID, 0, len(details.TableDescs))
for i := range details.TableDescs {
tableIDs = append(tableIDs, details.TableDescs[i].GetID())
}
target = ptpb.MakeSchemaObjectsTarget(tableIDs)
}
protectedTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
return execCfg.ProtectedTimestampManager.Protect(ctx, job, target, protectedTime)
}

// remapPublicSchemas is used to create a descriptor backed public schema
// for databases that have virtual public schemas.
// The rewrite map is updated with the new public schema id.
Expand Down Expand Up @@ -1542,6 +1595,17 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
mainData.addTenant(from, to)
}

_, err = protectRestoreSpans(ctx, p.ExecCfg(), r.job, details, mainData.tenantRekeys)
if err != nil {
return err
}
// the restore details now have pts
details = r.job.Details().(jobspb.RestoreDetails)
if err := p.ExecCfg().JobRegistry.CheckPausepoint(
"restore.before_flow"); err != nil {
return err
}

numNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() {
Expand All @@ -1552,6 +1616,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}

var resTotal roachpb.RowCount

if !preData.isEmpty() {
res, err := restoreWithRetry(
ctx,
Expand Down Expand Up @@ -1690,7 +1755,9 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
return err
}
}

if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job); err != nil {
log.Errorf(ctx, "failed to release protected timestamp: %v", err)
}
r.notifyStatsRefresherOfNewTables()

r.restoreStats = resTotal
Expand Down Expand Up @@ -2214,6 +2281,10 @@ func (r *restoreResumer) OnFailOrCancel(
telemetry.CountBucketed("restore.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds()))

if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job); err != nil {
return err
}

details := r.job.Details().(jobspb.RestoreDetails)
logJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr)

Expand Down
36 changes: 26 additions & 10 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -338,22 +338,31 @@ message RestoreDetails {
repeated string uris = 3 [(gogoproto.customname) = "URIs"];
repeated BackupLocalityInfo backup_locality_info = 7 [(gogoproto.nullable) = false];

// We keep track of the descriptors that we're creating as part of the
// restore.
// DatabaseDescs contain the database descriptors for the whole databases we're restoring,
// remapped to their new IDs.
repeated sqlbase.DatabaseDescriptor database_descs = 16;

// TableDescs contain the table descriptors for the whole tables we're restoring,
// remapped to their new IDs.
repeated sqlbase.TableDescriptor table_descs = 5;
// TypeDescs contains the type descriptors written as part of this restore.
// Note that it does not include type descriptors existing in the cluster
// that backed up types are remapped to.

// TypeDescs contains the type descriptors written as part of this restore,
// remapped with their new IDs. Note that it does not include type descriptors
// existing in the cluster that backed up types are remapped to.
repeated sqlbase.TypeDescriptor type_descs = 14;
// SchemaDescs contains schema descriptors written as part of this restore.
// Like TypeDescs, it does not include existing schema descriptors in the
// cluster that backed up schemas are remapped to.

// SchemaDescs contains schema descriptors written as part of this restore,
// remapped with their new IDs. Like TypeDescs, it does not include existing
// schema descriptors in the cluster that backed up schemas are remapped to.
repeated sqlbase.SchemaDescriptor schema_descs = 15;

// FunctionDescs contains function descriptors written as part of this
// restore.
// restore, remapped with their new IDs.
repeated sqlbase.FunctionDescriptor function_descs = 27;
reserved 13;

// Tenants contain info on each tenant to restore. Note this field contains the backed up
// tenant id.
repeated sqlbase.TenantInfoWithUsage tenants = 21 [(gogoproto.nullable) = false];

string override_db = 6 [(gogoproto.customname) = "OverrideDB"];
Expand Down Expand Up @@ -421,7 +430,14 @@ message RestoreDetails {

bool VerifyData = 26;

// NEXT ID: 28.
// ProtectedTimestampRecord is the ID of the protected timestamp record
// corresponding to this job.
bytes protected_timestamp_record = 28 [
(gogoproto.customname) = "ProtectedTimestampRecord",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
];

// NEXT ID: 29.
}


Expand Down
5 changes: 5 additions & 0 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Cleaner func(ctx context.Context) error

func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details {
switch v := details.(type) {
case jobspb.RestoreDetails:
v.ProtectedTimestampRecord = u
return v
case jobspb.NewSchemaChangeDetails:
v.ProtectedTimestampRecord = u
return v
Expand All @@ -63,6 +66,8 @@ func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details {

func getProtectedTSOnJob(details jobspb.Details) *uuid.UUID {
switch v := details.(type) {
case jobspb.RestoreDetails:
return v.ProtectedTimestampRecord
case jobspb.NewSchemaChangeDetails:
return v.ProtectedTimestampRecord
case jobspb.SchemaChangeDetails:
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/protectedts/ptutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/protectedts/ptutil/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -60,3 +64,13 @@ func TestingVerifyProtectionTimestampExistsOnSpans(
})
return nil
}

func GetPTSTarget(t *testing.T, db *sqlutils.SQLRunner, ptsID *uuid.UUID) *ptpb.Target {
ret := &ptpb.Target{}
var buf []byte
db.QueryRow(t, `SELECT target FROM system.protected_ts_records WHERE id = $1`, ptsID).Scan(&buf)
if err := protoutil.Unmarshal(buf, ret); err != nil {
t.Fatal(err)
}
return ret
}
11 changes: 11 additions & 0 deletions pkg/testutils/jobutils/jobs_verification.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,14 @@ func GetJobProgress(t *testing.T, db *sqlutils.SQLRunner, jobID jobspb.JobID) *j
}
return ret
}

// GetJobPayload loads the Payload message associated with the job.
func GetJobPayload(t *testing.T, db *sqlutils.SQLRunner, jobID jobspb.JobID) *jobspb.Payload {
ret := &jobspb.Payload{}
var buf []byte
db.QueryRow(t, `SELECT payload FROM system.jobs WHERE id = $1`, jobID).Scan(&buf)
if err := protoutil.Unmarshal(buf, ret); err != nil {
t.Fatal(err)
}
return ret
}

0 comments on commit 8ec0e60

Please sign in to comment.