Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

{backup,changefeed,streaming}ccl: start populating pts target #74248

Merged
merged 1 commit into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_library(
"//pkg/kv/kvserver/batcheval",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb:with-mocks",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
55 changes: 46 additions & 9 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
Expand Down Expand Up @@ -858,8 +859,7 @@ func backupPlanHook(
jobID := p.ExecCfg().JobRegistry.MakeJobID()

if err := protectTimestampForBackup(
ctx, p, plannerTxn, jobID, backupManifest.Spans,
backupManifest.StartTime, endTime, backupDetails,
ctx, p, plannerTxn, jobID, backupManifest, backupDetails,
); err != nil {
return err
}
Expand Down Expand Up @@ -1204,25 +1204,62 @@ func makeNewEncryptionOptions(
return encryptionOptions, encryptionInfo, nil
}

func getProtectedTimestampTargetForBackup(backupManifest BackupManifest) *ptpb.Target {
if backupManifest.DescriptorCoverage == tree.AllDescriptors {
return ptpb.MakeClusterTarget()
}

if len(backupManifest.Tenants) > 0 {
tenantID := make([]roachpb.TenantID, 0)
for _, tenant := range backupManifest.Tenants {
tenantID = append(tenantID, roachpb.MakeTenantID(tenant.ID))
}
return ptpb.MakeTenantsTarget(tenantID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently BACKUP syntax doesn't allow backing up table 245 and tenant 123, but that is a valid backup job. It looks like however that is not possible with the ptpb.Target? i.e. tenants target and schema object targets are mutually exclusive?

Not blocking, since it's unreachable with current syntax, just wondering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible today yeah, we could in the future switch the Record to hold a repeated Target instead of a single target to accommodate for this.

}

// ResolvedCompleteDBs contains all the "complete" databases being backed up.
//
// This includes explicit `BACKUP DATABASE` targets as well as expansions as a
// result of `BACKUP TABLE db.*`. In both cases we want to write a protected
// timestamp record that covers the entire database.
if len(backupManifest.CompleteDbs) > 0 {
return ptpb.MakeSchemaObjectsTarget(backupManifest.CompleteDbs)
}

// At this point we are dealing with a `BACKUP TABLE`, so we write a protected
// timestamp record on each table being backed up.
tableIDs := make(descpb.IDs, 0)
for _, desc := range backupManifest.Descriptors {
t, _, _, _ := descpb.FromDescriptorWithMVCCTimestamp(&desc, hlc.Timestamp{})
if t != nil {
tableIDs = append(tableIDs, t.GetID())
}
}
return ptpb.MakeSchemaObjectsTarget(tableIDs)
}

func protectTimestampForBackup(
ctx context.Context,
p sql.PlanHookState,
txn *kv.Txn,
jobID jobspb.JobID,
spans []roachpb.Span,
startTime, endTime hlc.Timestamp,
backupManifest BackupManifest,
backupDetails jobspb.BackupDetails,
) error {
if backupDetails.ProtectedTimestampRecord == nil {
return nil
}
if len(spans) > 0 {
tsToProtect := endTime
if !startTime.IsEmpty() {
tsToProtect = startTime
if len(backupManifest.Spans) > 0 {
tsToProtect := backupManifest.EndTime
if !backupManifest.StartTime.IsEmpty() {
tsToProtect = backupManifest.StartTime
}

// Resolve the target that the PTS record will protect as part of this
// backup.
target := getProtectedTimestampTargetForBackup(backupManifest)
rec := jobsprotectedts.MakeRecord(*backupDetails.ProtectedTimestampRecord, int64(jobID),
tsToProtect, spans, jobsprotectedts.Jobs)
tsToProtect, backupManifest.Spans, jobsprotectedts.Jobs, target)
err := p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, rec)
if err != nil {
return err
Expand Down
36 changes: 17 additions & 19 deletions pkg/ccl/backupccl/schedule_pts_chaining.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
roachpb "github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
Expand Down Expand Up @@ -145,22 +146,22 @@ func manageFullBackupPTSChaining(
return err
}

// Resolve the spans that need to be protected on this execution of the
// Resolve the target that needs to be protected on this execution of the
// scheduled backup.
spansToProtect, err := getSpansProtectedByBackup(ctx, backupDetails, txn, exec)
targetToProtect, deprecatedSpansToProtect, err := getTargetProtectedByBackup(ctx, backupDetails, txn, exec)
if err != nil {
return errors.Wrap(err, "getting spans to protect")
}

// Protect the spans after the EndTime of the current backup. We do not need
// Protect the target after the EndTime of the current backup. We do not need
// to verify this new record as we have a record written by the backup during
// planning, already protecting these spans after EndTime.
// planning, already protecting this target after EndTime.
//
// Since this record will be stored on the incremental schedule, we use the
// inc schedule ID as the records' Meta. This ensures that even if the full
// schedule is dropped, the reconciliation job will not release the pts
// record stored on the inc schedule, and the chaining will continue.
ptsRecord, err := protectTimestampRecordForSchedule(ctx, spansToProtect,
ptsRecord, err := protectTimestampRecordForSchedule(ctx, targetToProtect, deprecatedSpansToProtect,
backupDetails.EndTime, incSj.ScheduleID(), exec, txn)
if err != nil {
return errors.Wrap(err, "protect and verify pts record for schedule")
Expand Down Expand Up @@ -213,36 +214,33 @@ func manageIncrementalBackupPTSChaining(
return err
}

func getSpansProtectedByBackup(
func getTargetProtectedByBackup(
ctx context.Context, backupDetails jobspb.BackupDetails, txn *kv.Txn, exec *sql.ExecutorConfig,
) ([]roachpb.Span, error) {
) (target *ptpb.Target, deprecatedSpans []roachpb.Span, err error) {
if backupDetails.ProtectedTimestampRecord == nil {
return nil, nil
return nil, nil, nil
}

ptsRecord, err := exec.ProtectedTimestampProvider.GetRecord(ctx, txn,
*backupDetails.ProtectedTimestampRecord)
if err != nil {
return nil, err
return nil, nil, err
}

return ptsRecord.DeprecatedSpans, nil
return ptsRecord.Target, ptsRecord.DeprecatedSpans, nil
}

func protectTimestampRecordForSchedule(
ctx context.Context,
spansToProtect []roachpb.Span,
targetToProtect *ptpb.Target,
deprecatedSpansToProtect roachpb.Spans,
tsToProtect hlc.Timestamp,
scheduleID int64,
exec *sql.ExecutorConfig,
txn *kv.Txn,
) (uuid.UUID, error) {
var protectedtsID uuid.UUID
if len(spansToProtect) == 0 {
return protectedtsID, nil
}
protectedtsID = uuid.MakeV4()
rec := jobsprotectedts.MakeRecord(protectedtsID, scheduleID, tsToProtect, spansToProtect,
jobsprotectedts.Schedules)
protectedtsID := uuid.MakeV4()
rec := jobsprotectedts.MakeRecord(protectedtsID, scheduleID, tsToProtect, deprecatedSpansToProtect,
jobsprotectedts.Schedules, targetToProtect)
return protectedtsID, exec.ProtectedTimestampProvider.Protect(ctx, txn, rec)
}
20 changes: 18 additions & 2 deletions pkg/ccl/changefeedccl/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -64,12 +66,26 @@ func createProtectedTimestampRecord(
progress.ProtectedTimestampRecord = uuid.MakeV4()
log.VEventf(ctx, 2, "creating protected timestamp %v at %v",
progress.ProtectedTimestampRecord, resolved)
spansToProtect := makeSpansToProtect(codec, targets)
deprecatedSpansToProtect := makeSpansToProtect(codec, targets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need both targets and spans?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Until we run the migration in #74281 we want to continue protecting spans so that jobs started in a mixed version state do not fail. Once the migration is complete, the Protect method in the protected ts Storage interface will stop writing these passed in spans to the underlying system.pts_records table (this will come as a follow up PR). So all jobs started after the migration might pass in a record with the spans field set, but this will not be persisted.

The idea is that GC for 22.1 will continue to respect both spans protected by the old subsystem, and targets protected by the new subsystem since this simplifies the migration in a mixed version state. In 22.2 with some elbow grease, we should be able to stop populating the spans field in the record entirely.

targetToProtect := makeTargetToProtect(targets)
rec := jobsprotectedts.MakeRecord(
progress.ProtectedTimestampRecord, int64(jobID), resolved, spansToProtect, jobsprotectedts.Jobs)
progress.ProtectedTimestampRecord, int64(jobID), resolved, deprecatedSpansToProtect,
jobsprotectedts.Jobs, targetToProtect)
return pts.Protect(ctx, txn, rec)
}

func makeTargetToProtect(targets jobspb.ChangefeedTargets) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, len(targets)+1)
for t := range targets {
tablesToProtect = append(tablesToProtect, t)
}
tablesToProtect = append(tablesToProtect, keys.DescriptorTableID)
return ptpb.MakeSchemaObjectsTarget(tablesToProtect)
}

func makeSpansToProtect(codec keys.SQLCodec, targets jobspb.ChangefeedTargets) []roachpb.Span {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
Expand Down
7 changes: 3 additions & 4 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -415,16 +414,16 @@ func changefeedPlanHook(
{

var protectedTimestampID uuid.UUID
var spansToProtect []roachpb.Span
var ptr *ptpb.Record

shouldProtectTimestamp := initialScanFromOptions(details.Opts) && p.ExecCfg().Codec.ForSystemTenant()
if shouldProtectTimestamp {
protectedTimestampID = uuid.MakeV4()
spansToProtect = makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
deprecatedSpansToProtect := makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
targetToProtect := makeTargetToProtect(details.Targets)
progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID
ptr = jobsprotectedts.MakeRecord(protectedTimestampID, int64(jobID), statementTime,
spansToProtect, jobsprotectedts.Jobs)
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)
}

jr := jobs.Record{
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamproducer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed:with-mocks",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/server/telemetry",
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ func TestStreamReplicationProducerJob(t *testing.T) {
}
runJobWithProtectedTimestamp := func(ptsID uuid.UUID, ts hlc.Timestamp, jr jobs.Record) error {
return source.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
deprecatedTenantSpan := roachpb.Spans{*makeTenantSpan(30)}
tenantTarget := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MakeTenantID(30)})
if err := ptp.Protect(ctx, txn,
jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), ts,
[]roachpb.Span{*makeTenantSpan(30)}, jobsprotectedts.Jobs)); err != nil {
deprecatedTenantSpan, jobsprotectedts.Jobs, tenantTarget)); err != nil {
return err
}
_, err := registry.CreateAdoptableJobWithTxn(ctx, jr, jr.JobID, txn)
Expand Down
7 changes: 6 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -129,8 +130,12 @@ func startReplicationStreamJob(
statementTime := hlc.Timestamp{
WallTime: evalCtx.GetStmtTimestamp().UnixNano(),
}

deprecatedSpansToProtect := roachpb.Spans{*makeTenantSpan(tenantID)}
targetToProtect := ptpb.MakeTenantsTarget([]roachpb.TenantID{roachpb.MakeTenantID(tenantID)})

pts := jobsprotectedts.MakeRecord(ptsID, int64(jr.JobID), statementTime,
[]roachpb.Span{*makeTenantSpan(tenantID)}, jobsprotectedts.Jobs)
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)

if err := ptp.Protect(evalCtx.Ctx(), txn, pts); err != nil {
return streaming.InvalidStreamID, err
Expand Down
9 changes: 7 additions & 2 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,25 @@ func MakeStatusFunc(

// MakeRecord makes a protected timestamp record to protect a timestamp on
// behalf of this job.
//
// TODO(adityamaru): In 22.2 stop passing `deprecatedSpans` since PTS records
// will stop protecting key spans.
func MakeRecord(
recordID uuid.UUID,
metaID int64,
tsToProtect hlc.Timestamp,
spans []roachpb.Span,
deprecatedSpans []roachpb.Span,
metaType MetaType,
target *ptpb.Target,
) *ptpb.Record {
return &ptpb.Record{
ID: recordID.GetBytesMut(),
Timestamp: tsToProtect,
Mode: ptpb.PROTECT_AFTER,
MetaType: metaTypes[metaType],
Meta: encodeID(metaID),
DeprecatedSpans: spans,
DeprecatedSpans: deprecatedSpans,
Target: target,
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,10 @@ func TestJobsProtectedTimestamp(t *testing.T) {
if j, err = jr.CreateJobWithTxn(ctx, mkJobRec(), jobID, txn); err != nil {
return err
}
deprecatedSpansToProtect := roachpb.Spans{{Key: keys.MinKey, EndKey: keys.MaxKey}}
targetToProtect := ptpb.MakeClusterTarget()
rec = jobsprotectedts.MakeRecord(uuid.MakeV4(), int64(jobID), ts,
[]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, jobsprotectedts.Jobs)
deprecatedSpansToProtect, jobsprotectedts.Jobs, targetToProtect)
return ptp.Protect(ctx, txn, rec)
}))
return j, rec
Expand Down Expand Up @@ -163,8 +165,10 @@ func TestSchedulesProtectedTimestamp(t *testing.T) {
require.NoError(t, s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj = mkScheduledJobRec(scheduleLabel)
require.NoError(t, sj.Create(ctx, s0.InternalExecutor().(sqlutil.InternalExecutor), txn))
deprecatedSpansToProtect := roachpb.Spans{{Key: keys.MinKey, EndKey: keys.MaxKey}}
targetToProtect := ptpb.MakeClusterTarget()
rec = jobsprotectedts.MakeRecord(uuid.MakeV4(), sj.ScheduleID(), ts,
[]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}, jobsprotectedts.Schedules)
deprecatedSpansToProtect, jobsprotectedts.Schedules, targetToProtect)
return ptp.Protect(ctx, txn, rec)
}))
return sj, rec
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/protectedts/ptpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ go_proto_library(

go_library(
name = "ptpb",
srcs = ["protectedts.go"],
embed = [":ptpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb",
visibility = ["//visibility:public"],
deps = [
"//pkg/roachpb:with-mocks",
"//pkg/sql/catalog/descpb",
],
)
34 changes: 34 additions & 0 deletions pkg/kv/kvserver/protectedts/ptpb/protectedts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package ptpb

import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
)

// MakeClusterTarget returns a target, which when used in a Record, will
// protect the entire keyspace of the cluster.
func MakeClusterTarget() *Target {
return &Target{&Target_Cluster{Cluster: &Target_ClusterTarget{}}}
}

// MakeTenantsTarget returns a target, which when used in a Record, will
// protect the keyspace of all tenants in ids.
func MakeTenantsTarget(ids []roachpb.TenantID) *Target {
return &Target{&Target_Tenants{Tenants: &Target_TenantsTarget{IDs: ids}}}
}

// MakeSchemaObjectsTarget returns a target, which when used in a Record,
// will protect the keyspace of all schema objects (database/table).
func MakeSchemaObjectsTarget(ids descpb.IDs) *Target {
return &Target{&Target_SchemaObjects{SchemaObjects: &Target_SchemaObjectsTarget{IDs: ids}}}
}
Loading