Skip to content

Commit

Permalink
{backup,changefeed,streaming}ccl: start populating pts target
Browse files Browse the repository at this point in the history
This change touches all jobs that create a protected timestamp
record before calling `Protect`. Previously, the created record
would contain the spans that the record was going to protect.

With this change, the record will also populate the `Target`
field on `ptpb.Record` with the object it is going to protect.
The `Target` field is a proto message defined in `ptpb.Target`
and can be one of:

- Cluster
- Tenants
- Schema object (database or table)

For backups, this target field is determined based on the targets
passed in by the user via the `BACKUP <target>` query.

For changefeeds, this target is the group of tables on which the
changefeed is being started + `system.descriptors` table.

For the streaming job, this target is the tenant that is being
streamed.

This change does not touch any test files that create
a raw `ptpb.Record` for testing purposes. That will come in a follow
up PR where we actually teach `Protect` to validate and make use
of this `Target` field. A test for how backup chooses its target
will also come in a follow up PR once Protect actually writes
the encoded protobuf target field to the underlying system table.

Informs: #73727

Release note: None
  • Loading branch information
adityamaru committed Jan 6, 2022
1 parent ae23190 commit 2528c38
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 67 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"//pkg/kv/kvserver/batcheval",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb:with-mocks",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
55 changes: 46 additions & 9 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -858,8 +859,7 @@ func backupPlanHook(
jobID := p.ExecCfg().JobRegistry.MakeJobID()

if err := protectTimestampForBackup(
ctx, p, plannerTxn, jobID, backupManifest.Spans,
backupManifest.StartTime, endTime, backupDetails,
ctx, p, plannerTxn, jobID, backupManifest, backupDetails,
); err != nil {
return err
}
Expand Down Expand Up @@ -1204,25 +1204,62 @@ func makeNewEncryptionOptions(
return encryptionOptions, encryptionInfo, nil
}

func getProtectedTimestampTargetForBackup(backupManifest BackupManifest) *ptpb.Target {
if backupManifest.DescriptorCoverage == tree.AllDescriptors {
return ptpb.MakeClusterTarget()
}

if len(backupManifest.Tenants) > 0 {
tenantID := make([]roachpb.TenantID, 0)
for _, tenant := range backupManifest.Tenants {
tenantID = append(tenantID, roachpb.MakeTenantID(tenant.ID))
}
return ptpb.MakeTenantsTarget(tenantID)
}

// ResolvedCompleteDBs contains all the "complete" databases being backed up.
//
// This includes explicit `BACKUP DATABASE` targets as well as expansions as a
// result of `BACKUP TABLE db.*`. In both cases we want to write a protected
// timestamp record that covers the entire database.
if len(backupManifest.CompleteDbs) > 0 {
return ptpb.MakeSchemaObjectsTarget(backupManifest.CompleteDbs)
}

// At this point we are dealing with a `BACKUP TABLE`, so we write a protected
// timestamp record on each table being backed up.
tableIDs := make(descpb.IDs, 0)
for _, desc := range backupManifest.Descriptors {
t, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, hlc.Timestamp{})
if t != nil {
tableIDs = append(tableIDs, t.GetID())
}
}
return ptpb.MakeSchemaObjectsTarget(tableIDs)
}

func protectTimestampForBackup(
ctx context.Context,
p sql.PlanHookState,
txn *kv.Txn,
jobID jobspb.JobID,
spans []roachpb.Span,
startTime, endTime hlc.Timestamp,
backupManifest BackupManifest,
backupDetails jobspb.BackupDetails,
) error {
if backupDetails.ProtectedTimestampRecord == nil {
return nil
}
if len(spans) > 0 {
tsToProtect := endTime
if !startTime.IsEmpty() {
tsToProtect = startTime
if len(backupManifest.Spans) > 0 {
tsToProtect := backupManifest.EndTime
if !backupManifest.StartTime.IsEmpty() {
tsToProtect = backupManifest.StartTime
}

// Resolve the target that the PTS record will protect as part of this
// backup.
target := getProtectedTimestampTargetForBackup(backupManifest)
rec := jobsprotectedts.MakeRecord(*backupDetails.ProtectedTimestampRecord, int64(jobID),
tsToProtect, spans, jobsprotectedts.Jobs)
tsToProtect, backupManifest.Spans, jobsprotectedts.Jobs, target)
err := p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, rec)
if err != nil {
return err
Expand Down
36 changes: 17 additions & 19 deletions pkg/ccl/backupccl/schedule_pts_chaining.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -145,22 +146,22 @@ func manageFullBackupPTSChaining(
return err
}

// Resolve the spans that need to be protected on this execution of the
// Resolve the target that needs to be protected on this execution of the
// scheduled backup.
spansToProtect, err := getSpansProtectedByBackup(ctx, backupDetails, txn, exec)
targetToProtect, deprecatedSpansToProtect, err := getTargetProtectedByBackup(ctx, backupDetails, txn, exec)
if err != nil {
return errors.Wrap(err, "getting spans to protect")
}

// Protect the spans after the EndTime of the current backup. We do not need
// Protect the target after the EndTime of the current backup. We do not need
// to verify this new record as we have a record written by the backup during
// planning, already protecting these spans after EndTime.
// planning, already protecting this target after EndTime.
//
// Since this record will be stored on the incremental schedule, we use the
// inc schedule ID as the records' Meta. This ensures that even if the full
// schedule is dropped, the reconciliation job will not release the pts
// record stored on the inc schedule, and the chaining will continue.
ptsRecord, err := protectTimestampRecordForSchedule(ctx, spansToProtect,
ptsRecord, err := protectTimestampRecordForSchedule(ctx, targetToProtect, deprecatedSpansToProtect,
backupDetails.EndTime, incSj.ScheduleID(), exec, txn)
if err != nil {
return errors.Wrap(err, "protect and verify pts record for schedule")
Expand Down Expand Up @@ -213,36 +214,33 @@ func manageIncrementalBackupPTSChaining(
return err
}

func getSpansProtectedByBackup(
func getTargetProtectedByBackup(
ctx context.Context, backupDetails jobspb.BackupDetails, txn *kv.Txn, exec *sql.ExecutorConfig,
) ([]roachpb.Span, error) {
) (target *ptpb.Target, deprecatedSpans []roachpb.Span, err error) {
if backupDetails.ProtectedTimestampRecord == nil {
return nil, nil
return nil, nil, nil
}

ptsRecord, err := exec.ProtectedTimestampProvider.GetRecord(ctx, txn,
*backupDetails.ProtectedTimestampRecord)
if err != nil {
return nil, err
return nil, nil, err
}

return ptsRecord.DeprecatedSpans, nil
return ptsRecord.Target, ptsRecord.DeprecatedSpans, nil
}

func protectTimestampRecordForSchedule(
ctx context.Context,
spansToProtect []roachpb.Span,
targetToProtect *ptpb.Target,
deprecatedSpansToProtect roachpb.Spans,
tsToProtect hlc.Timestamp,
scheduleID int64,
exec *sql.ExecutorConfig,
txn *kv.Txn,
) (uuid.UUID, error) {
var protectedtsID uuid.UUID
if len(spansToProtect) == 0 {
return protectedtsID, nil
}
protectedtsID = uuid.MakeV4()
rec := jobsprotectedts.MakeRecord(protectedtsID, scheduleID, tsToProtect, spansToProtect,
jobsprotectedts.Schedules)
protectedtsID := uuid.MakeV4()
rec := jobsprotectedts.MakeRecord(protectedtsID, scheduleID, tsToProtect, deprecatedSpansToProtect,
jobsprotectedts.Schedules, targetToProtect)
return protectedtsID, exec.ProtectedTimestampProvider.Protect(ctx, txn, rec)
}
20 changes: 18 additions & 2 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -64,12 +66,26 @@ func createProtectedTimestampRecord(
progress.ProtectedTimestampRecord = uuid.MakeV4()
log.VEventf(ctx, 2, "creating protected timestamp %v at %v",
progress.ProtectedTimestampRecord, resolved)
spansToProtect := makeSpansToProtect(codec, targets)
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)
targetToProtect := makeTargetToProtect(targets)
rec := jobsprotectedts.MakeRecord(
progress.ProtectedTimestampRecord, int64(jobID), resolved, spansToProtect, jobsprotectedts.Jobs)
progress.ProtectedTimestampRecord, int64(jobID), resolved, deprecatedSpansToProtect,
jobsprotectedts.Jobs, targetToProtect)
return pts.Protect(ctx, txn, rec)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, len(targets)+1)
for t := range targets {
tablesToProtect = append(tablesToProtect, t)
}
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -415,16 +414,16 @@ func changefeedPlanHook(
{

var protectedTimestampID uuid.UUID
var spansToProtect []roachpb.Span
var ptr *ptpb.Record

shouldProtectTimestamp := initialScanFromOptions(details.Opts) && p.ExecCfg().Codec.ForSystemTenant()
if shouldProtectTimestamp {
protectedTimestampID = uuid.MakeV4()
spansToProtect = makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
deprecatedSpansToProtect := makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
targetToProtect := makeTargetToProtect(details.Targets)
progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID
ptr = jobsprotectedts.MakeRecord(protectedTimestampID, int64(jobID), statementTime,
spansToProtect, jobsprotectedts.Jobs)
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)
}

jr := jobs.Record{
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/server/telemetry",
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ func TestStreamReplicationProducerJob(t *testing.T) {
}
runJobWithProtectedTimestamp := func(ptsID uuid.UUID, ts hlc.Timestamp, jr jobs.Record) error {
return source.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
deprecatedTenantSpan := roachpb.Spans{*makeTenantSpan(30)}
tenantTarget := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MakeTenantID(30)})
if err := ptp.Protect(ctx, txn,
jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), ts,
[]roachpb.Span{*makeTenantSpan(30)}, jobsprotectedts.Jobs)); err != nil {
deprecatedTenantSpan, jobsprotectedts.Jobs, tenantTarget)); err != nil {
return err
}
_, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn)
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -129,8 +130,12 @@ func startReplicationStreamJob(
statementTime := hlc.Timestamp{
WallTime: evalCtx.GetStmtTimestamp().UnixNano(),
}

deprecatedSpansToProtect := roachpb.Spans{*makeTenantSpan(tenantID)}
targetToProtect := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MakeTenantID(tenantID)})

pts := jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), statementTime,
[]roachpb.Span{*makeTenantSpan(tenantID)}, jobsprotectedts.Jobs)
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)

if err := ptp.Protect(evalCtx.Ctx(), txn, pts); err != nil {
return streaming.InvalidStreamID, err
Expand Down
9 changes: 7 additions & 2 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,25 @@ func MakeStatusFunc(

// MakeRecord makes a protected timestamp record to protect a timestamp on
// behalf of this job.
//
// TODO(adityamaru): In 22.2 stop passing `deprecatedSpans` since PTS records
// will stop protecting key spans.
func MakeRecord(
recordID uuid.UUID,
metaID int64,
tsToProtect hlc.Timestamp,
spans []roachpb.Span,
deprecatedSpans []roachpb.Span,
metaType MetaType,
target *ptpb.Target,
) *ptpb.Record {
return &ptpb.Record{
ID: recordID.GetBytesMut(),
Timestamp: tsToProtect,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaTypes[metaType],
Meta: encodeID(metaID),
DeprecatedSpans: spans,
DeprecatedSpans: deprecatedSpans,
Target: target,
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ func TestJobsProtectedTimestamp(t *testing.T) {
if j, err = jr.CreateJobWithTxn(ctx, mkJobRec(), jobID, txn); err != nil {
return err
}
deprecatedSpansToProtect := roachpb.Spans{{Key: keys.MinKey, EndKey: keys.MaxKey}}
targetToProtect := ptpb.MakeClusterTarget()
rec = jobsprotectedts.MakeRecord(uuid.MakeV4(), int64(jobID), ts,
[]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, jobsprotectedts.Jobs)
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)
return ptp.Protect(ctx, txn, rec)
}))
return j, rec
Expand Down Expand Up @@ -163,8 +165,10 @@ func TestSchedulesProtectedTimestamp(t *testing.T) {
require.NoError(t, s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj = mkScheduledJobRec(scheduleLabel)
require.NoError(t, sj.Create(ctx, s0.InternalExecutor().(sqlutil.InternalExecutor), txn))
deprecatedSpansToProtect := roachpb.Spans{{Key: keys.MinKey, EndKey: keys.MaxKey}}
targetToProtect := ptpb.MakeClusterTarget()
rec = jobsprotectedts.MakeRecord(uuid.MakeV4(), sj.ScheduleID(), ts,
[]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, jobsprotectedts.Schedules)
deprecatedSpansToProtect, jobsprotectedts.Schedules, targetToProtect)
return ptp.Protect(ctx, txn, rec)
}))
return sj, rec
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/protectedts/ptpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ go_proto_library(

go_library(
name = "ptpb",
srcs = ["protectedts.go"],
embed = [":ptpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog/descpb",
],
)
34 changes: 34 additions & 0 deletions pkg/kv/kvserver/protectedts/ptpb/protectedts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package ptpb

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
)

// MakeClusterTarget returns a target, which when used in a Record, will
// protect the entire keyspace of the cluster.
func MakeClusterTarget() *Target {
return &Target{&Target_Cluster{Cluster: &Target_ClusterTarget{}}}
}

// MakeTenantsTarget returns a target, which when used in a Record, will
// protect the keyspace of all tenants in ids.
func MakeTenantsTarget(ids []roachpb.TenantID) *Target {
return &Target{&Target_Tenants{Tenants: &Target_TenantsTarget{IDs: ids}}}
}

// MakeSchemaObjectsTarget returns a target, which when used in a Record,
// will protect the keyspace of all schema objects (database/table).
func MakeSchemaObjectsTarget(ids descpb.IDs) *Target {
return &Target{&Target_SchemaObjects{SchemaObjects: &Target_SchemaObjectsTarget{IDs: ids}}}
}
Loading

0 comments on commit 2528c38

Please sign in to comment.