Skip to content

Commit

Permalink
backupccl: issue protected timestamps on restore spans
Browse files Browse the repository at this point in the history
Fixes cockroachdb#91148

Release note: None
  • Loading branch information
msbutler committed Nov 19, 2022
1 parent 9914814 commit 945a07b
Show file tree
Hide file tree
Showing 11 changed files with 240 additions and 17 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ go_test(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
38 changes: 38 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand Down Expand Up @@ -9269,6 +9270,43 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) {
})
}

// TestProtectRestoreSpans ensures the a protected timestamp is issued before
// the restore flow begins and is released when the restore ends.
func TestProtectRestoreSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

numAccounts := 100
_, sqlDB, _, cleanupFn := backupRestoreTestSetupWithParams(t, 3 /* nodes */, numAccounts,
InitManualReplication, base.TestClusterArgs{})
defer cleanupFn()

sqlDB.Exec(t, `BACKUP INTO $1`, localFoo)

// Begin a Restore and assert that PTS with the correct target was persisted
sqlDB.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_flow'`)
var jobId jobspb.JobID
sqlDB.QueryRow(t, `RESTORE DATABASE data FROM LATEST IN $1 WITH detached, new_db_name=data2`, localFoo).Scan(&jobId)
jobutils.WaitForJobToPause(t, sqlDB, jobId)

restoreDetails := jobutils.GetJobPayload(t, sqlDB, jobId).GetRestore()
require.NotNil(t, restoreDetails.ProtectedTimestampRecord)

target := ptutil.GetPTSTarget(t, sqlDB, restoreDetails.ProtectedTimestampRecord)
targetIDs := target.GetSchemaObjects()
require.Equal(t, restoreDetails.DatabaseDescs[0].GetID(), targetIDs.IDs[0])

// Finish the restore and ensure the PTS record was removed
sqlDB.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints = ''`)
sqlDB.Exec(t, `RESUME JOB $1`, jobId)
jobutils.WaitForJobToSucceed(t, sqlDB, jobId)

var count int
sqlDB.QueryRow(t, `SELECT count(*) FROM system.protected_ts_records WHERE id = $1`,
restoreDetails.ProtectedTimestampRecord).Scan(&count)
require.Equal(t, 0, count)
}

// TestBackupRestoreSystemUsers tests RESTORE SYSTEM USERS feature which allows user to
// restore users from a backup into current cluster and regrant roles.
func TestBackupRestoreSystemUsers(t *testing.T) {
Expand Down
62 changes: 61 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/joberror"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand Down Expand Up @@ -1354,6 +1356,47 @@ func createImportingDescriptors(
return dataToPreRestore, preValidation, trackedRestore, nil
}

// protectRestoreSpans issues a protected timestamp over the span we seek to
// restore. If a pts already exists in the job record, due to previous call of
// this function, this noops.
func protectRestoreSpans(
ctx context.Context,
execCfg *sql.ExecutorConfig,
jobID jobspb.JobID,
details jobspb.RestoreDetails,
tenantRekeys []execinfrapb.TenantRekey,
) (jobsprotectedts.Cleaner, error) {
if details.ProtectedTimestampRecord != nil {
// A protected time stamp has already been set. No need to write a new one.
return nil, nil
}
var target *ptpb.Target
switch {
case details.DescriptorCoverage == tree.AllDescriptors:
target = ptpb.MakeClusterTarget()
case len(tenantRekeys) > 0:
tenantIDs := make([]roachpb.TenantID, 0, len(tenantRekeys))
for _, tenant := range tenantRekeys {
tenantIDs = append(tenantIDs, tenant.NewID)
}
target = ptpb.MakeTenantsTarget(tenantIDs)
case len(details.DatabaseDescs) > 0:
databaseIDs := make([]descpb.ID, 0, len(details.DatabaseDescs))
for i := range details.DatabaseDescs {
databaseIDs = append(databaseIDs, details.DatabaseDescs[i].GetID())
}
target = ptpb.MakeSchemaObjectsTarget(databaseIDs)
default:
tableIDs := make([]descpb.ID, 0, len(details.TableDescs))
for i := range details.TableDescs {
tableIDs = append(tableIDs, details.TableDescs[i].GetID())
}
target = ptpb.MakeSchemaObjectsTarget(tableIDs)
}
protectedTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
return execCfg.ProtectedTimestampManager.Protect(ctx, jobID, target, protectedTime)
}

// remapPublicSchemas is used to create a descriptor backed public schema
// for databases that have virtual public schemas.
// The rewrite map is updated with the new public schema id.
Expand Down Expand Up @@ -1541,6 +1584,16 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
mainData.addTenant(from, to)
}

_, err = protectRestoreSpans(ctx, p.ExecCfg(), r.job.ID(), details, mainData.tenantRekeys)
if err != nil {
return err
}

if err := p.ExecCfg().JobRegistry.CheckPausepoint(
"restore.before_flow"); err != nil {
return err
}

numNodes, err := clusterNodeCount(p.ExecCfg().Gossip)
if err != nil {
if !build.IsRelease() && p.ExecCfg().Codec.ForSystemTenant() {
Expand All @@ -1551,6 +1604,7 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
}

var resTotal roachpb.RowCount

if !preData.isEmpty() {
res, err := restoreWithRetry(
ctx,
Expand Down Expand Up @@ -1689,7 +1743,9 @@ func (r *restoreResumer) doResume(ctx context.Context, execCtx interface{}) erro
return err
}
}

if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job.ID()); err != nil {
return err
}
r.notifyStatsRefresherOfNewTables()

r.restoreStats = resTotal
Expand Down Expand Up @@ -2212,6 +2268,10 @@ func (r *restoreResumer) OnFailOrCancel(
telemetry.CountBucketed("restore.duration-sec.failed",
int64(timeutil.Since(timeutil.FromUnixMicros(r.job.Payload().StartedMicros)).Seconds()))

if err := r.execCfg.ProtectedTimestampManager.Unprotect(ctx, r.job.ID()); err != nil {
return err
}

details := r.job.Details().(jobspb.RestoreDetails)
logJobCompletion(ctx, restoreJobEventType, r.job.ID(), false, jobErr)

Expand Down
64 changes: 64 additions & 0 deletions pkg/ccl/backupccl/testdata/backup-restore/restore-protect-spans
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@

# tickle the unprotected timestamp bug
new-server name=s1 disable-tenant
----

exec-sql
CREATE DATABASE d;
USE d;
CREATE TABLE foo (i INT PRIMARY KEY, s STRING);
INSERT INTO foo VALUES (1, 'x'),(2,'y');
----

exec-sql
BACKUP INTO 'nodelocal://1/full_cluster_backup/';
----


exec-sql
SET CLUSTER SETTING bulkio.ingest.flush_delay='20s';
----

exec-sql
SET CLUSTER SETTING spanconfig.reconciliation_job.check_interval='1s';
----

exec-sql
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';
----

exec-sql
ALTER DATABASE defaultdb CONFIGURE ZONE USING gc.ttlseconds = 1;
----

# ensure the gc threshold forwards quickly
exec-sql
SET CLUSTER SETTING kv.protectedts.poll_interval = '1s'
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = 'restore.before_flow';
----


restore expect-pausepoint tag=a
RESTORE TABLE foo FROM LATEST IN 'nodelocal://1/full_cluster_backup/' WITH into_db='defaultdb';
----
job paused at pausepoint

# ensure foo has adopted the ttl
sleep ms=5000
----

exec-sql
SET CLUSTER SETTING jobs.debug.pausepoints = '';
----

job resume=a
----

# If this test worked, the job should fail. but currently it succeeds.
job tag=a wait-for-state=failed
----


36 changes: 26 additions & 10 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -311,22 +311,31 @@ message RestoreDetails {
repeated string uris = 3 [(gogoproto.customname) = "URIs"];
repeated BackupLocalityInfo backup_locality_info = 7 [(gogoproto.nullable) = false];

// We keep track of the descriptors that we're creating as part of the
// restore.
// DatabaseDescs contain the database descriptors for the whole databases we're restoring,
// remapped to their new IDs.
repeated sqlbase.DatabaseDescriptor database_descs = 16;

// TableDescs contain the table descriptors for the whole tables we're restoring,
// remapped to their new IDs.
repeated sqlbase.TableDescriptor table_descs = 5;
// TypeDescs contains the type descriptors written as part of this restore.
// Note that it does not include type descriptors existing in the cluster
// that backed up types are remapped to.

// TypeDescs contains the type descriptors written as part of this restore,
// remapped with their new IDs. Note that it does not include type descriptors
// existing in the cluster that backed up types are remapped to.
repeated sqlbase.TypeDescriptor type_descs = 14;
// SchemaDescs contains schema descriptors written as part of this restore.
// Like TypeDescs, it does not include existing schema descriptors in the
// cluster that backed up schemas are remapped to.

// SchemaDescs contains schema descriptors written as part of this restore,
// remapped with their new IDs. Like TypeDescs, it does not include existing
// schema descriptors in the cluster that backed up schemas are remapped to.
repeated sqlbase.SchemaDescriptor schema_descs = 15;

// FunctionDescs contains function descriptors written as part of this
// restore.
// restore, remapped with their new IDs.
repeated sqlbase.FunctionDescriptor function_descs = 27;
reserved 13;

// Tenants contain info on each tenant to restore. Note this field contains the backed up
// tenant id.
repeated sqlbase.TenantInfoWithUsage tenants = 21 [(gogoproto.nullable) = false];

string override_db = 6 [(gogoproto.customname) = "OverrideDB"];
Expand Down Expand Up @@ -394,7 +403,14 @@ message RestoreDetails {

bool VerifyData = 26;

// NEXT ID: 28.
// ProtectedTimestampRecord is the ID of the protected timestamp record
// corresponding to this job.
bytes protected_timestamp_record = 28 [
(gogoproto.customname) = "ProtectedTimestampRecord",
(gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/util/uuid.UUID"
];

// NEXT ID: 29.
}


Expand Down
14 changes: 8 additions & 6 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Cleaner func(ctx context.Context) error

func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details {
switch v := details.(type) {
case jobspb.RestoreDetails:
v.ProtectedTimestampRecord = u
return v
case jobspb.NewSchemaChangeDetails:
v.ProtectedTimestampRecord = u
return v
Expand All @@ -63,6 +66,8 @@ func setProtectedTSOnJob(details jobspb.Details, u *uuid.UUID) jobspb.Details {

func getProtectedTSOnJob(details jobspb.Details) *uuid.UUID {
switch v := details.(type) {
case jobspb.RestoreDetails:
return v.ProtectedTimestampRecord
case jobspb.NewSchemaChangeDetails:
return v.ProtectedTimestampRecord
case jobspb.SchemaChangeDetails:
Expand Down Expand Up @@ -127,7 +132,8 @@ func (p *Manager) TryToProtectBeforeGC(

select {
case <-time.After(waitBeforeProtectedTS):
unprotectCallback, err = p.Protect(ctx, jobID, tableDesc, readAsOf)
target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()})
unprotectCallback, err = p.Protect(ctx, jobID, target, readAsOf)
if err != nil {
return err
}
Expand Down Expand Up @@ -158,10 +164,7 @@ func (p *Manager) TryToProtectBeforeGC(
// a new timestamp. Returns a Cleaner function to remove the protected timestamp,
// if one was installed.
func (p *Manager) Protect(
ctx context.Context,
jobID jobspb.JobID,
tableDesc catalog.TableDescriptor,
readAsOf hlc.Timestamp,
ctx context.Context, jobID jobspb.JobID, target *ptpb.Target, readAsOf hlc.Timestamp,
) (Cleaner, error) {
// If we are not running a historical query, nothing to do here.
if readAsOf.IsEmpty() {
Expand All @@ -187,7 +190,6 @@ func (p *Manager) Protect(
md.Payload.Details = jobspb.WrapPayloadDetails(details)
ju.UpdatePayload(md.Payload)

target := ptpb.MakeSchemaObjectsTarget(descpb.IDs{tableDesc.GetID()})
rec := MakeRecord(*protectedtsID,
int64(jobID), readAsOf, nil, Jobs, target)
return p.protectedTSProvider.Protect(ctx, txn, rec)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/protectedts/ptutil/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptutil",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvserver/protectedts/ptutil/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -60,3 +64,13 @@ func TestingVerifyProtectionTimestampExistsOnSpans(
})
return nil
}

func GetPTSTarget(t *testing.T, db *sqlutils.SQLRunner, ptsID *uuid.UUID) *ptpb.Target {
ret := &ptpb.Target{}
var buf []byte
db.QueryRow(t, `SELECT target FROM system.protected_ts_records WHERE id = $1`, ptsID).Scan(&buf)
if err := protoutil.Unmarshal(buf, ret); err != nil {
t.Fatal(err)
}
return ret
}
1 change: 1 addition & 0 deletions pkg/sql/schemachanger/scexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/settings/cluster",
Expand Down
Loading

0 comments on commit 945a07b

Please sign in to comment.