Skip to content

Commit

Permalink
spanconfigsqltranslator,jobsprotectedts: add `ignore_if_excluded_from…
Browse files Browse the repository at this point in the history
…_backup` to SpanConfig

This change is a follow up to #75451 which taught ExportRequests
to noop on ranges marked as `exclude_data_from_backup`. This change
adds a field to the ProtectionPolicy in the SpanConfig. This field is set to
true if the ProtectionPolicy can be ignored if the span it applies to
has been marked to be excluded from backups i.e. the applied SpanConfig
has `exclude_data_from_backup = true`.

This field is currently only set to true when a protected timestamp record
has been written by a backup job. This is to ensure that ProtectionPolicies
written by non-backup users (CDC, streaming) on spans marked as
`exclude_data_from_backup` are still respected when making GC decisions on
the span. The work to teach the GC queue about this field will be done in
a final follow up.

Informs: #73536

Release note: None
  • Loading branch information
adityamaru committed Feb 20, 2022
1 parent 822130a commit 6655925
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 31 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ go_test(
"//pkg/ccl/partitionccl",
"//pkg/ccl/utilccl",
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster"
Expand Down Expand Up @@ -208,9 +211,37 @@ func TestDataDriven(t *testing.T) {
var protectTS int
d.ScanArgs(t, "record-id", &recordID)
d.ScanArgs(t, "ts", &protectTS)
target := spanconfigtestutils.ParseProtectionTarget(t, d.Input)

jobID := tenant.JobsRegistry().MakeJobID()
if d.HasArg("mk-job-with-type") {
var mkJobWithType string
d.ScanArgs(t, "mk-job-with-type", &mkJobWithType)
var rec jobs.Record
switch mkJobWithType {
case "backup":
rec = jobs.Record{
Description: "backup",
Username: security.RootUserName(),
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
}
default:
rec = jobs.Record{
Description: "cdc",
Username: security.RootUserName(),
Details: jobspb.ChangefeedDetails{},
Progress: jobspb.ChangefeedProgress{},
}
}
require.NoError(t, tenant.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
if _, err = tenant.JobsRegistry().CreateJobWithTxn(ctx, rec, jobID, txn); err != nil {
return err
}
return nil
}))
}
target := spanconfigtestutils.ParseProtectionTarget(t, d.Input)

require.NoError(t, tenant.ExecCfg().DB.Txn(ctx,
func(ctx context.Context, txn *kv.Txn) (err error) {
require.Len(t, recordID, 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,32 @@ translate database=db table=t2
----
/Table/10{7-8} range default

# Write a protection record as a backup.
protect record-id=1 ts=1 mk-job-with-type=backup
descs 104
----

# Write another protection record as a non-backup user.
protect record-id=2 ts=2 mk-job-with-type=cdc
descs 104
----

# Translate to ensure that the ProtectionPolicy is set with
# `ignore_if_excluded_from_backup` for the record written by the backup only.
translate database=db
----
/Table/10{6-7} protection_policies=[{ts: 1,ignore_if_excluded_from_backup: true} {ts: 2}] exclude_data_from_backup=true
/Table/10{7-8} protection_policies=[{ts: 1,ignore_if_excluded_from_backup: true} {ts: 2}]

# Alter table t1 to unmark its data ephemeral.
exec-sql
ALTER TABLE db.t1 SET (exclude_data_from_backup = false);
----

release record-id=1
----

translate database=db
----
/Table/10{6-7} range default
/Table/10{7-8} range default
/Table/10{6-7} protection_policies=[{ts: 2}]
/Table/10{7-8} protection_policies=[{ts: 2}]
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ descs 106

translate database=db
----
/Table/10{6-7} num_replicas=7 num_voters=5 pts=[1]
/Table/10{6-7} num_replicas=7 num_voters=5 protection_policies=[{ts: 1}]
/Table/10{7-8} num_replicas=7

# Write a protected timestamp on db, so we should see it on both t1 and t2.
Expand All @@ -38,17 +38,17 @@ descs 104

translate database=db
----
/Table/10{6-7} num_replicas=7 num_voters=5 pts=[1 2]
/Table/10{7-8} num_replicas=7 pts=[2]
/Table/10{6-7} num_replicas=7 num_voters=5 protection_policies=[{ts: 1} {ts: 2}]
/Table/10{7-8} num_replicas=7 protection_policies=[{ts: 2}]

# Release the protected timestamp on table t1
release record-id=1
----

translate database=db
----
/Table/10{6-7} num_replicas=7 num_voters=5 pts=[2]
/Table/10{7-8} num_replicas=7 pts=[2]
/Table/10{6-7} num_replicas=7 num_voters=5 protection_policies=[{ts: 2}]
/Table/10{7-8} num_replicas=7 protection_policies=[{ts: 2}]

# Release the protected timestamp on database db
release record-id=2
Expand All @@ -71,7 +71,7 @@ descs 106

translate database=db
----
/Table/106{-/2} num_replicas=7 num_voters=5 pts=[3]
/Table/106/{2-3} ttl_seconds=1 num_replicas=7 num_voters=5 pts=[3]
/Table/10{6/3-7} num_replicas=7 num_voters=5 pts=[3]
/Table/106{-/2} num_replicas=7 num_voters=5 protection_policies=[{ts: 3}]
/Table/106/{2-3} ttl_seconds=1 num_replicas=7 num_voters=5 protection_policies=[{ts: 3}]
/Table/10{6/3-7} num_replicas=7 num_voters=5 protection_policies=[{ts: 3}]
/Table/10{7-8} num_replicas=7
32 changes: 31 additions & 1 deletion pkg/jobs/jobsprotectedts/jobs_protected_ts.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,38 @@ func GetMetaType(metaType MetaType) string {
return metaTypes[metaType]
}

// IsRecordWrittenByJobType returns true if the job implied with this value of
// meta is of type jobType. This method can be used to filter records written by
// a particular job type.
func IsRecordWrittenByJobType(
ctx context.Context,
jr *jobs.Registry,
txn *kv.Txn,
meta []byte,
metaType MetaType,
jobType jobspb.Type,
) (bool, error) {
switch metaType {
case Jobs:
jobID, err := decodeID(meta)
if err != nil {
return false, err
}
j, err := jr.LoadJobWithTxn(ctx, jobspb.JobID(jobID), txn)
if jobs.HasJobNotFoundError(err) {
return false, nil
}
if err != nil {
return false, err
}
payload := j.Payload()
return payload.Type() == jobType, nil
}
return false, nil
}

// MakeStatusFunc returns a function which determines whether the job or
// schedule implied with this value of meta should be removed by the reconciler.
// schedule implied with this value of meta should be removed by the ptreconciler.
func MakeStatusFunc(
jr *jobs.Registry, ie sqlutil.InternalExecutor, metaType MetaType,
) ptreconcile.StatusFunc {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/protectedts/protectedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ type Storage interface {
// passed txn remains safe for future use.
Release(context.Context, *kv.Txn, uuid.UUID) error

// GetMetadata retreives the metadata with the provided Txn.
// GetMetadata retrieves the metadata with the provided Txn.
GetMetadata(context.Context, *kv.Txn) (ptpb.Metadata, error)

// GetState retreives the entire state of protectedts.Storage with the
// GetState retrieves the entire state of protectedts.Storage with the
// provided Txn.
GetState(context.Context, *kv.Txn) (ptpb.State, error)

Expand Down
12 changes: 12 additions & 0 deletions pkg/roachpb/span_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,18 @@ func (c ConstraintsConjunction) String() string {
return sb.String()
}

// String implements the stringer interface.
func (p ProtectionPolicy) String() string {
var sb strings.Builder
sb.WriteString(fmt.Sprintf("{ts: %d", int(p.ProtectedTimestamp.WallTime)))
if p.IgnoreIfExcludedFromBackup {
sb.WriteString(fmt.Sprintf(",ignore_if_excluded_from_backup: %t",
p.IgnoreIfExcludedFromBackup))
}
sb.WriteString("}")
return sb.String()
}

// TestingDefaultSpanConfig exports the default span config for testing purposes.
func TestingDefaultSpanConfig() SpanConfig {
return SpanConfig{
Expand Down
11 changes: 11 additions & 0 deletions pkg/roachpb/span_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,22 @@ message GCPolicy {
// applies over a given span.
message ProtectionPolicy {
option (gogoproto.equal) = true;
option (gogoproto.goproto_stringer) = false;
option (gogoproto.populate) = true;

// ProtectedTimestamp is a timestamp above which GC should not run, regardless
// of the GC TTL.
util.hlc.Timestamp protected_timestamp = 1 [(gogoproto.nullable) = false];

// IgnoreIfExcludedFromBackup is set to true if the ProtectionPolicy can be
// ignored if the span it applies to has been marked to be excluded from
// backups i.e. the applied SpanConfig has `exclude_data_from_backup = true`.
// This field is currently only set to true when a protected timestamp record
// has been written by a backup job. This is to ensure that ProtectionPolicies
// written by non-backup users (CDC, streaming) on spans marked as
// `exclude_data_from_backup` are still respected when making GC decisions on
// the span.
bool ignore_if_excluded_from_backup = 2;
}

// Constraint constrains the stores that a replica can be stored on. It
Expand Down
12 changes: 12 additions & 0 deletions pkg/spanconfig/spanconfigsqltranslator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/config/zonepb",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprotectedts",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts/ptpb",
Expand All @@ -28,16 +31,25 @@ go_library(
go_test(
name = "spanconfigsqltranslator_test",
srcs = [
"main_test.go",
"protectedts_state_reader_test.go",
"sqltranslator_test.go",
],
data = glob(["testdata/**"]),
embed = [":spanconfigsqltranslator"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobsprotectedts",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/catalog/descpb",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
31 changes: 31 additions & 0 deletions pkg/spanconfig/spanconfigsqltranslator/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigsqltranslator_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
38 changes: 27 additions & 11 deletions pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ package spanconfigsqltranslator
import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/errors"
)

// protectedTimestampStateReader provides a target specific view of the
Expand All @@ -31,15 +36,15 @@ type protectedTimestampStateReader struct {
// protected timestamp records given the supplied ptpb.State. The ptpb.State is
// the transactional state of the `system.protected_ts_records` table.
func newProtectedTimestampStateReader(
_ context.Context, ptsState ptpb.State,
) *protectedTimestampStateReader {
ctx context.Context, jr *jobs.Registry, txn *kv.Txn, ptsState ptpb.State,
) (*protectedTimestampStateReader, error) {
reader := &protectedTimestampStateReader{
schemaObjectProtections: make(map[descpb.ID][]roachpb.ProtectionPolicy),
tenantProtections: make([]tenantProtectedTimestamps, 0),
clusterProtections: make([]roachpb.ProtectionPolicy, 0),
}
reader.loadProtectedTimestampRecords(ptsState)
return reader
err := reader.loadProtectedTimestampRecords(ctx, jr, txn, ptsState)
return reader, err
}

// getProtectionPoliciesForCluster returns all the protected timestamps that
Expand Down Expand Up @@ -70,22 +75,32 @@ func (p *protectedTimestampStateReader) getProtectionPoliciesForSchemaObject(
return p.schemaObjectProtections[descID]
}

func (p *protectedTimestampStateReader) loadProtectedTimestampRecords(ptsState ptpb.State) {
func (p *protectedTimestampStateReader) loadProtectedTimestampRecords(
ctx context.Context, jr *jobs.Registry, txn *kv.Txn, ptsState ptpb.State,
) error {
tenantProtections := make(map[roachpb.TenantID][]roachpb.ProtectionPolicy)
for _, record := range ptsState.Records {
// TODO(adityamaru): When schedules start writing protected timestamp
// records extend this logic to also include those records.
isBackupProtectedTimestampRecord, err := jobsprotectedts.IsRecordWrittenByJobType(ctx, jr, txn,
record.Meta, jobsprotectedts.Jobs, jobspb.TypeBackup)
if err != nil {
return errors.Wrap(err, "failed to check if record was written by a BACKUP")
}
protectionPolicy := roachpb.ProtectionPolicy{
ProtectedTimestamp: record.Timestamp,
IgnoreIfExcludedFromBackup: isBackupProtectedTimestampRecord,
}
switch t := record.Target.GetUnion().(type) {
case *ptpb.Target_Cluster:
p.clusterProtections = append(p.clusterProtections,
roachpb.ProtectionPolicy{ProtectedTimestamp: record.Timestamp})
p.clusterProtections = append(p.clusterProtections, protectionPolicy)
case *ptpb.Target_Tenants:
for _, tenID := range t.Tenants.IDs {
tenantProtections[tenID] = append(tenantProtections[tenID],
roachpb.ProtectionPolicy{ProtectedTimestamp: record.Timestamp})
tenantProtections[tenID] = append(tenantProtections[tenID], protectionPolicy)
}
case *ptpb.Target_SchemaObjects:
for _, descID := range t.SchemaObjects.IDs {
p.schemaObjectProtections[descID] = append(p.schemaObjectProtections[descID],
roachpb.ProtectionPolicy{ProtectedTimestamp: record.Timestamp})
p.schemaObjectProtections[descID] = append(p.schemaObjectProtections[descID], protectionPolicy)
}
}
}
Expand All @@ -94,4 +109,5 @@ func (p *protectedTimestampStateReader) loadProtectedTimestampRecords(ptsState p
p.tenantProtections = append(p.tenantProtections,
tenantProtectedTimestamps{tenantID: tenID, protections: tenantProtections})
}
return nil
}
Loading

0 comments on commit 6655925

Please sign in to comment.