diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 82e5e8319019..b8ed45ffe783 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -174,4 +174,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 21.2-48 set the active cluster version in the format '.' +version version 21.2-50 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 055c598e7107..89219468e517 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -181,6 +181,6 @@ trace.jaeger.agentstringthe address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as :. If no port is specified, 6381 will be used. trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion21.2-48set the active cluster version in the format '.' +versionversion21.2-50set the active cluster version in the format '.' diff --git a/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go index f05e6b4c7bbb..ca11df504192 100644 --- a/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go @@ -109,7 +109,7 @@ func TestDataDriven(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) } - spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs) + spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */) defer spanConfigTestCluster.Cleanup() kvSubscriber := tc.Server(0).SpanConfigKVSubscriber().(spanconfig.KVSubscriber) diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go index 10632465338d..a73c0b343f5f 100644 --- a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go @@ -102,7 +102,7 @@ func TestDataDriven(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) } - spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs) + spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */) defer spanConfigTestCluster.Cleanup() systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID) diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel index d8adfb8da891..2b8c5650e7ca 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel @@ -13,6 +13,10 @@ go_test( "//pkg/ccl/partitionccl", "//pkg/ccl/utilccl", "//pkg/config/zonepb", + "//pkg/jobs/jobsprotectedts", + "//pkg/kv", + "//pkg/kv/kvserver/protectedts", + "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb:with-mocks", "//pkg/security", "//pkg/security/securitytest", @@ -26,9 +30,11 @@ go_test( "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", + "//pkg/util/uuid", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_stretchr_testify//require", ], diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go index 705f8bf9affa..3bd06606034c 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go @@ -19,6 +19,10 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/config/zonepb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" @@ -28,8 +32,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" ) @@ -61,6 +67,15 @@ import ( // - "mark-table-public" [database=] [table=] // Marks the given table as public. // +// - "protect" [record-id=] [ts=] +// cluster OR +// tenants id1,id2... OR +// descs id1,id2... +// Creates and writes a protected timestamp record with id and ts with an +// appropriate ptpb.Target. +// +// - "release" [record-id=] +// Releases the protected timestamp record with id. func TestDataDriven(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -77,17 +92,21 @@ func TestDataDriven(t *testing.T) { // test cluster). ManagerDisableJobCreation: true, } + ptsKnobs := &protectedts.TestingKnobs{ + EnableProtectedTimestampForMultiTenant: true, + } datadriven.Walk(t, testutils.TestDataPath(t), func(t *testing.T, path string) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ - SpanConfig: scKnobs, + SpanConfig: scKnobs, + ProtectedTS: ptsKnobs, }, }, }) defer tc.Stopper().Stop(ctx) - spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs) + spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, ptsKnobs) defer spanConfigTestCluster.Cleanup() var tenant *spanconfigtestcluster.Tenant @@ -98,6 +117,10 @@ func TestDataDriven(t *testing.T) { tenant = spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID) } + execCfg := tenant.ExecCfg() + ptp := tenant.ProtectedTimestampProvider() + jr := tenant.JobsRegistry() + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "exec-sql": @@ -183,6 +206,40 @@ func TestDataDriven(t *testing.T) { mutable.SetPublic() }) + case "protect": + mkRecordAndProtect := func(recordID string, ts hlc.Timestamp, target *ptpb.Target) { + jobID := jr.MakeJobID() + require.NoError(t, execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + require.Len(t, recordID, 1, "datadriven test only supports single digit record IDs") + recID, err := uuid.FromBytes([]byte(strings.Repeat(recordID, 16))) + require.NoError(t, err) + rec := jobsprotectedts.MakeRecord(recID, int64(jobID), ts, + nil /* deprecatedSpans */, jobsprotectedts.Jobs, target) + return ptp.Protect(ctx, txn, rec) + })) + } + + var recordID string + var protectTS int + d.ScanArgs(t, "record-id", &recordID) + d.ScanArgs(t, "ts", &protectTS) + target := spanconfigtestutils.ParseProtectionTarget(t, d.Input) + + mkRecordAndProtect(recordID, hlc.Timestamp{WallTime: int64(protectTS)}, target) + + case "release": + releaseRecord := func(recordID string) { + require.NoError(t, execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + require.Len(t, recordID, 1, "datadriven test only supports single digit record IDs") + recID, err := uuid.FromBytes([]byte(strings.Repeat(recordID, 16))) + require.NoError(t, err) + return ptp.Release(ctx, txn, recID) + })) + } + + var recordID string + d.ScanArgs(t, "record-id", &recordID) + releaseRecord(recordID) default: t.Fatalf("unknown command: %s", d.Cmd) } diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/testdata/protectedts b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/testdata/protectedts new file mode 100644 index 000000000000..563c0ba3a944 --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/testdata/protectedts @@ -0,0 +1,76 @@ +# Create a database with some tables and write protected timestamps on the +# tables and database. Check that span configurations are as we expect. + +exec-sql +CREATE DATABASE db; +CREATE TABLE db.t1(id INT); +CREATE TABLE db.t2(); +---- + +# Schema object IDs +# db: 54 +# t1: 56 +# t2: 57 + +# Alter zone config fields on the database and one of the tables to ensure +# things are cascading. +exec-sql +ALTER DATABASE db CONFIGURE ZONE USING num_replicas=7; +ALTER TABLE db.t1 CONFIGURE ZONE USING num_voters=5; +---- + +# Write a protected timestamp on t1. +protect record-id=1 ts=1 +descs 56 +---- + +translate database=db +---- +/Table/5{6-7} num_replicas=7 num_voters=5 pts=[1] +/Table/5{7-8} num_replicas=7 + +# Write a protected timestamp on db, so we should see it on both t1 and t2. +protect record-id=2 ts=2 +descs 54 +---- + +translate database=db +---- +/Table/5{6-7} num_replicas=7 num_voters=5 pts=[1 2] +/Table/5{7-8} num_replicas=7 pts=[2] + +# Release the protected timestamp on table t1 +release record-id=1 +---- + +translate database=db +---- +/Table/5{6-7} num_replicas=7 num_voters=5 pts=[2] +/Table/5{7-8} num_replicas=7 pts=[2] + +# Release the protected timestamp on database db +release record-id=2 +---- + +translate database=db +---- +/Table/5{6-7} num_replicas=7 num_voters=5 +/Table/5{7-8} num_replicas=7 + +# Create an index on t1 to ensure that subzones also see protected timestamps. +exec-sql +CREATE INDEX idx ON db.t1(id); +ALTER INDEX db.t1@idx CONFIGURE ZONE USING gc.ttlseconds = 1; +---- + +protect record-id=3 ts=3 +descs 56 +---- + +translate database=db +---- +/Table/56{-/2} num_replicas=7 num_voters=5 pts=[3] +/Table/56/{2-3} ttl_seconds=1 num_replicas=7 num_voters=5 pts=[3] +/Table/5{6/3-7} num_replicas=7 num_voters=5 pts=[3] +/Table/5{7-8} num_replicas=7 + diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/testdata/tenant/protectedts b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/testdata/tenant/protectedts new file mode 100644 index 000000000000..172b0fb58d16 --- /dev/null +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/testdata/tenant/protectedts @@ -0,0 +1,75 @@ +# Create a database with some tables and write protected timestamps on the +# tables and database. Check that span configurations are as we expect. + +exec-sql +CREATE DATABASE db; +CREATE TABLE db.t1(id INT); +CREATE TABLE db.t2(); +---- + +# Schema object IDs +# db: 54 +# t1: 56 +# t2: 57 + +# Alter zone config fields on the database and one of the tables to ensure +# things are cascading. +exec-sql +ALTER DATABASE db CONFIGURE ZONE USING num_replicas=7; +ALTER TABLE db.t1 CONFIGURE ZONE USING num_voters=5; +---- + +# Write a protected timestamp on t1. +protect record-id=1 ts=1 +descs 56 +---- + +translate database=db +---- +/Tenant/10/Table/5{6-7} num_replicas=7 num_voters=5 pts=[1] +/Tenant/10/Table/5{7-8} num_replicas=7 + +# Write a protected timestamp on db, so we should see it on both t1 and t2. +protect record-id=2 ts=2 +descs 54 +---- + +translate database=db +---- +/Tenant/10/Table/5{6-7} num_replicas=7 num_voters=5 pts=[1 2] +/Tenant/10/Table/5{7-8} num_replicas=7 pts=[2] + +# Release the protected timestamp on table t1 +release record-id=1 +---- + +translate database=db +---- +/Tenant/10/Table/5{6-7} num_replicas=7 num_voters=5 pts=[2] +/Tenant/10/Table/5{7-8} num_replicas=7 pts=[2] + +# Release the protected timestamp on database db +release record-id=2 +---- + +translate database=db +---- +/Tenant/10/Table/5{6-7} num_replicas=7 num_voters=5 +/Tenant/10/Table/5{7-8} num_replicas=7 + +# Create an index on t1 to ensure that subzones also see protected timestamps. +exec-sql +CREATE INDEX idx ON db.t1(id); +ALTER INDEX db.t1@idx CONFIGURE ZONE USING gc.ttlseconds = 1; +---- + +protect record-id=3 ts=3 +descs 56 +---- + +translate database=db +---- +/Tenant/10/Table/56{-/2} num_replicas=7 num_voters=5 pts=[3] +/Tenant/10/Table/56/{2-3} ttl_seconds=1 num_replicas=7 num_voters=5 pts=[3] +/Tenant/10/Table/5{6/3-7} num_replicas=7 num_voters=5 pts=[3] +/Tenant/10/Table/5{7-8} num_replicas=7 diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 2e34b8aa41f5..1cc066ae2a2f 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -225,10 +225,6 @@ const ( SeedTenantSpanConfigs // PublicSchemasWithDescriptors backs public schemas with descriptors. PublicSchemasWithDescriptors - // AlterSystemProtectedTimestampAddColumn adds a target column to the - // system.protected_ts_records table that describes what is protected by the - // record. - AlterSystemProtectedTimestampAddColumn // EnsureSpanConfigReconciliation ensures that the host tenant has run its // reconciliation process at least once. EnsureSpanConfigReconciliation @@ -248,6 +244,13 @@ const ( // that correspond to range descriptor changes resulting from recovery // procedures. UnsafeLossOfQuorumRecoveryRangeLog + // AlterSystemProtectedTimestampAddColumn adds a target column to the + // system.protected_ts_records table that describes what is protected by the + // record. + AlterSystemProtectedTimestampAddColumn + // EnableProtectedTimestampsForTenant enables the use of protected timestamps + // in secondary tenants. + EnableProtectedTimestampsForTenant // ************************************************* // Step (1): Add new versions here. @@ -353,33 +356,37 @@ var versionsSingleton = keyedVersions{ Version: roachpb.Version{Major: 21, Minor: 2, Internal: 34}, }, { - Key: AlterSystemProtectedTimestampAddColumn, + Key: EnsureSpanConfigReconciliation, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 36}, }, { - Key: EnsureSpanConfigReconciliation, + Key: EnsureSpanConfigSubscription, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 38}, }, { - Key: EnsureSpanConfigSubscription, + Key: EnableSpanConfigStore, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 40}, }, { - Key: EnableSpanConfigStore, + Key: ScanWholeRows, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 42}, }, { - Key: ScanWholeRows, + Key: SCRAMAuthentication, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 44}, }, { - Key: SCRAMAuthentication, + Key: UnsafeLossOfQuorumRecoveryRangeLog, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 46}, }, { - Key: UnsafeLossOfQuorumRecoveryRangeLog, + Key: AlterSystemProtectedTimestampAddColumn, Version: roachpb.Version{Major: 21, Minor: 2, Internal: 48}, }, + { + Key: EnableProtectedTimestampsForTenant, + Version: roachpb.Version{Major: 21, Minor: 2, Internal: 50}, + }, // ************************************************* // Step (2): Add new versions here. diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 107d7ac4868d..1f9bfd1df5b4 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -26,18 +26,19 @@ func _() { _ = x[PreSeedTenantSpanConfigs-15] _ = x[SeedTenantSpanConfigs-16] _ = x[PublicSchemasWithDescriptors-17] - _ = x[AlterSystemProtectedTimestampAddColumn-18] - _ = x[EnsureSpanConfigReconciliation-19] - _ = x[EnsureSpanConfigSubscription-20] - _ = x[EnableSpanConfigStore-21] - _ = x[ScanWholeRows-22] - _ = x[SCRAMAuthentication-23] - _ = x[UnsafeLossOfQuorumRecoveryRangeLog-24] + _ = x[EnsureSpanConfigReconciliation-18] + _ = x[EnsureSpanConfigSubscription-19] + _ = x[EnableSpanConfigStore-20] + _ = x[ScanWholeRows-21] + _ = x[SCRAMAuthentication-22] + _ = x[UnsafeLossOfQuorumRecoveryRangeLog-23] + _ = x[AlterSystemProtectedTimestampAddColumn-24] + _ = x[EnableProtectedTimestampsForTenant-25] } -const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsAlterSystemProtectedTimestampAddColumnEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLog" +const _Key_name = "V21_2Start22_1TargetBytesAvoidExcessAvoidDrainingNamesDrainingNamesMigrationTraceIDDoesntImplyStructuredRecordingAlterSystemTableStatisticsAddAvgSizeColAlterSystemStmtDiagReqsMVCCAddSSTableInsertPublicSchemaNamespaceEntryOnRestoreUnsplitRangesInAsyncGCJobsValidateGrantOptionPebbleFormatBlockPropertyCollectorProbeRequestSelectRPCsTakeTracingInfoInbandPreSeedTenantSpanConfigsSeedTenantSpanConfigsPublicSchemasWithDescriptorsEnsureSpanConfigReconciliationEnsureSpanConfigSubscriptionEnableSpanConfigStoreScanWholeRowsSCRAMAuthenticationUnsafeLossOfQuorumRecoveryRangeLogAlterSystemProtectedTimestampAddColumnEnableProtectedTimestampsForTenant" -var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 463, 493, 521, 542, 555, 574, 608} +var _Key_index = [...]uint16{0, 5, 14, 36, 54, 76, 113, 152, 175, 189, 230, 256, 275, 309, 321, 352, 376, 397, 425, 455, 483, 504, 517, 536, 570, 608, 642} func (i Key) String() string { if i < 0 || i >= Key(len(_Key_index)-1) { diff --git a/pkg/roachpb/span_config.proto b/pkg/roachpb/span_config.proto index 762a01887fae..61e220b2de45 100644 --- a/pkg/roachpb/span_config.proto +++ b/pkg/roachpb/span_config.proto @@ -14,6 +14,7 @@ option go_package = "roachpb"; import "roachpb/data.proto"; import "gogoproto/gogo.proto"; +import "util/hlc/timestamp.proto"; // TODO(irfansharif): We could have the proto definitions in pkg/config/zonepb // use these messages instead of duplicating everything. @@ -140,6 +141,11 @@ message SpanConfig { // preferred option to least. The first preference that an existing replica of // a range matches will take priority for the lease. repeated LeasePreference lease_preferences = 9 [(gogoproto.nullable) = false]; + + // ProtectedTimestamps captures all the protected timestamp records that apply + // to the range. The timestamp values represent the timestamps after which + // data will be protected from GC, if the protection succeeds. + repeated util.hlc.Timestamp protected_timestamps = 10 [(gogoproto.nullable) = false]; } // SpanConfigEntry ties a span to its corresponding config. diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 168d49090f5e..8bba62ec42e8 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -460,7 +460,7 @@ func makeTenantSQLServerArgs( if err != nil { panic(err) } - protectedTSProvider = dummyProtectedTSProvider{pp} + protectedTSProvider = tenantProtectedTSProvider{Provider: pp, st: st} } recorder := status.NewMetricsRecorder(clock, nil, rpcContext, nil, st) diff --git a/pkg/server/testserver.go b/pkg/server/testserver.go index 0a142f883316..c5f6b6df966d 100644 --- a/pkg/server/testserver.go +++ b/pkg/server/testserver.go @@ -28,6 +28,7 @@ import ( "github.com/cenkalti/backoff" circuit "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/gossip" @@ -500,12 +501,18 @@ func (ts *TestServer) Start(ctx context.Context) error { return ts.Server.Start(ctx) } -type dummyProtectedTSProvider struct { +type tenantProtectedTSProvider struct { protectedts.Provider + st *cluster.Settings } -func (d dummyProtectedTSProvider) Protect(context.Context, *kv.Txn, *ptpb.Record) error { - return errors.New("fake protectedts.Provider") +func (d tenantProtectedTSProvider) Protect( + ctx context.Context, txn *kv.Txn, rec *ptpb.Record, +) error { + if d.st.Version.IsActive(ctx, clusterversion.EnableProtectedTimestampsForTenant) { + return d.Provider.Protect(ctx, txn, rec) + } + return errors.New("tenant cannot Protect timestamp until the clusterversion has been finalized") } // TestTenant is an in-memory instantiation of the SQL-only process created for diff --git a/pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader.go b/pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader.go index 8572354bcf70..43c17389d639 100644 --- a/pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader.go +++ b/pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader.go @@ -33,16 +33,14 @@ type protectedTimestampStateReader struct { // the transactional state of the `system.protected_ts_records` table. func newProtectedTimestampStateReader( _ context.Context, ptsState ptpb.State, -) (*protectedTimestampStateReader, error) { +) *protectedTimestampStateReader { reader := &protectedTimestampStateReader{ schemaObjectProtections: make(map[descpb.ID][]hlc.Timestamp), tenantProtections: make([]tenantProtectedTimestamps, 0), clusterProtections: make([]hlc.Timestamp, 0), } - if err := reader.loadProtectedTimestampRecords(ptsState); err != nil { - return nil, err - } - return reader, nil + reader.loadProtectedTimestampRecords(ptsState) + return reader } // GetProtectedTimestampsForCluster returns all the protected timestamps that @@ -73,7 +71,7 @@ func (p *protectedTimestampStateReader) GetProtectedTimestampsForSchemaObject( return p.schemaObjectProtections[descID] } -func (p *protectedTimestampStateReader) loadProtectedTimestampRecords(ptsState ptpb.State) error { +func (p *protectedTimestampStateReader) loadProtectedTimestampRecords(ptsState ptpb.State) { tenantProtections := make(map[roachpb.TenantID][]hlc.Timestamp) for _, record := range ptsState.Records { switch t := record.Target.GetUnion().(type) { @@ -94,5 +92,4 @@ func (p *protectedTimestampStateReader) loadProtectedTimestampRecords(ptsState p p.tenantProtections = append(p.tenantProtections, tenantProtectedTimestamps{tenantID: tenID, protections: tenantProtections}) } - return nil } diff --git a/pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader_test.go b/pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader_test.go index df608c9d9bfa..c30219162d7e 100644 --- a/pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader_test.go +++ b/pkg/spanconfig/spanconfigsqltranslator/protectedts_state_reader_test.go @@ -64,9 +64,7 @@ func TestProtectedTimestampStateReader(t *testing.T) { protectTenants(state, ts(5), []roachpb.TenantID{roachpb.MakeTenantID(2)}) protectTenants(state, ts(6), []roachpb.TenantID{roachpb.MakeTenantID(2)}) - ptsStateReader, err := newProtectedTimestampStateReader(context.Background(), *state) - require.NoError(t, err) - + ptsStateReader := newProtectedTimestampStateReader(context.Background(), *state) clusterTimestamps := ptsStateReader.GetProtectedTimestampsForCluster() require.Len(t, clusterTimestamps, 1) require.Equal(t, []hlc.Timestamp{ts(3)}, clusterTimestamps) diff --git a/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go b/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go index 392a7060361d..6584900c174e 100644 --- a/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go +++ b/pkg/spanconfig/spanconfigsqltranslator/sqltranslator.go @@ -69,6 +69,21 @@ func (s *SQLTranslator) Translate( // attempts. entries = entries[:0] + // Construct an in-memory view of the system.protected_ts_records table to + // populate the protected timestamp field on the emitted span configs. + // + // TODO(adityamaru): This does a full table scan of the + // `system.protected_ts_records` table. While this is not assumed to be very + // expensive given the limited number of concurrent users of the protected + // timestamp subsystem, and the internal limits to limit the size of this + // table, there is scope for improvement in the future. One option could be + // a rangefeed-backed materialized view of the system table. + ptsState, err := s.execCfg.ProtectedTimestampProvider.GetState(ctx, txn) + if err != nil { + return errors.Wrap(err, "failed to get protected timestamp state") + } + ptsStateReader := newProtectedTimestampStateReader(ctx, ptsState) + // For every ID we want to translate, first expand it to descendant leaf // IDs that have span configurations associated for them. We also // de-duplicate leaf IDs to not generate redundant entries. @@ -103,7 +118,7 @@ func (s *SQLTranslator) Translate( // For every unique leaf ID, generate span configurations. for _, leafID := range leafIDs { - translatedEntries, err := s.generateSpanConfigurations(ctx, leafID, txn, descsCol) + translatedEntries, err := s.generateSpanConfigurations(ctx, leafID, txn, descsCol, ptsStateReader) if err != nil { return err } @@ -134,7 +149,11 @@ var descLookupFlags = tree.CommonLookupFlags{ // ID. The ID must belong to an object that has a span configuration associated // with it, i.e, it should either belong to a table or a named zone. func (s *SQLTranslator) generateSpanConfigurations( - ctx context.Context, id descpb.ID, txn *kv.Txn, descsCol *descs.Collection, + ctx context.Context, + id descpb.ID, + txn *kv.Txn, + descsCol *descs.Collection, + ptsStateReader *protectedTimestampStateReader, ) (entries []roachpb.SpanConfigEntry, err error) { if zonepb.IsNamedZoneID(id) { return s.generateSpanConfigurationsForNamedZone(ctx, txn, id) @@ -158,7 +177,7 @@ func (s *SQLTranslator) generateSpanConfigurations( ) } - return s.generateSpanConfigurationsForTable(ctx, txn, desc) + return s.generateSpanConfigurationsForTable(ctx, txn, desc, ptsStateReader) } // generateSpanConfigurationsForNamedZone expects an ID corresponding to a named @@ -222,11 +241,55 @@ func (s *SQLTranslator) generateSpanConfigurationsForNamedZone( return entries, nil } +// setProtectedTimestampsForTable aggregates the protected timestamps that apply +// to the table and sets the field in the table's generated span configs. +// This includes protected timestamps that apply to the table, and its parent +// database. +// +// This method mutates the passed in `entries`. +func setProtectedTimestampsForTable( + entries *[]roachpb.SpanConfigEntry, + desc catalog.Descriptor, + ptsStateReader *protectedTimestampStateReader, +) { + protectedTimestamps := make([]hlc.Timestamp, 0) + // Get protections that apply directly to the table. + protectedTimestamps = append(protectedTimestamps, + ptsStateReader.GetProtectedTimestampsForSchemaObject(desc.GetID())...) + + // Get protections that apply to the database. + protectedTimestamps = append(protectedTimestamps, + ptsStateReader.GetProtectedTimestampsForSchemaObject(desc.GetParentID())...) + + if len(protectedTimestamps) == 0 { + return + } + + for i := range *entries { + (*entries)[i].Config.ProtectedTimestamps = protectedTimestamps + } +} + +// hydrateSpanConfigurationsForTable hydrates fields in a table's span +// configurations that are not derived from the table's zone configuration. +// +// This method mutates the passed in `entries`. +func hydrateSpanConfigurationsForTable( + entries *[]roachpb.SpanConfigEntry, + desc catalog.Descriptor, + ptsStateReader *protectedTimestampStateReader, +) { + setProtectedTimestampsForTable(entries, desc, ptsStateReader) +} + // generateSpanConfigurationsForTable generates the span configurations // corresponding to the given tableID. It uses a transactional view of // system.zones and system.descriptors to do so. func (s *SQLTranslator) generateSpanConfigurationsForTable( - ctx context.Context, txn *kv.Txn, desc catalog.Descriptor, + ctx context.Context, + txn *kv.Txn, + desc catalog.Descriptor, + ptsStateReader *protectedTimestampStateReader, ) ([]roachpb.SpanConfigEntry, error) { if desc.DescriptorType() != catalog.Table { return nil, errors.AssertionFailedf( @@ -279,6 +342,7 @@ func (s *SQLTranslator) generateSpanConfigurationsForTable( }) } + hydrateSpanConfigurationsForTable(&entries, desc, ptsStateReader) return entries, nil // TODO(irfansharif): There's an attack vector here that we haven't @@ -363,6 +427,7 @@ func (s *SQLTranslator) generateSpanConfigurationsForTable( }, ) } + hydrateSpanConfigurationsForTable(&entries, desc, ptsStateReader) return entries, nil } diff --git a/pkg/spanconfig/spanconfigtestutils/BUILD.bazel b/pkg/spanconfig/spanconfigtestutils/BUILD.bazel index 4a439f7753a5..aae898a950fb 100644 --- a/pkg/spanconfig/spanconfigtestutils/BUILD.bazel +++ b/pkg/spanconfig/spanconfigtestutils/BUILD.bazel @@ -10,8 +10,10 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kv", + "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb:with-mocks", "//pkg/spanconfig", + "//pkg/sql/catalog/descpb", "//pkg/util/syncutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_stretchr_testify//require", diff --git a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/BUILD.bazel b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/BUILD.bazel index 2f0b2744a245..5bae11ed4f52 100644 --- a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/BUILD.bazel +++ b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/BUILD.bazel @@ -10,7 +10,9 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/base", + "//pkg/jobs", "//pkg/kv", + "//pkg/kv/kvserver/protectedts", "//pkg/roachpb:with-mocks", "//pkg/security", "//pkg/spanconfig", @@ -20,6 +22,7 @@ go_library( "//pkg/sql/catalog", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/tabledesc", + "//pkg/sql/distsql", "//pkg/sql/sem/tree", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go index bca31c7371f4..45d2dedc632b 100644 --- a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go +++ b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -32,21 +33,26 @@ import ( // cluster while providing convenient, scoped access to each tenant's specific // span config primitives. It's not safe for concurrent use. type Handle struct { - t *testing.T - tc *testcluster.TestCluster - ts map[roachpb.TenantID]*Tenant - scKnobs *spanconfig.TestingKnobs + t *testing.T + tc *testcluster.TestCluster + ts map[roachpb.TenantID]*Tenant + scKnobs *spanconfig.TestingKnobs + ptsKnobs *protectedts.TestingKnobs } // NewHandle returns a new Handle. func NewHandle( - t *testing.T, tc *testcluster.TestCluster, scKnobs *spanconfig.TestingKnobs, + t *testing.T, + tc *testcluster.TestCluster, + scKnobs *spanconfig.TestingKnobs, + ptsKnobs *protectedts.TestingKnobs, ) *Handle { return &Handle{ - t: t, - tc: tc, - ts: make(map[roachpb.TenantID]*Tenant), - scKnobs: scKnobs, + t: t, + tc: tc, + ts: make(map[roachpb.TenantID]*Tenant), + scKnobs: scKnobs, + ptsKnobs: ptsKnobs, } } @@ -63,7 +69,8 @@ func (h *Handle) InitializeTenant(ctx context.Context, tenID roachpb.TenantID) * tenantArgs := base.TestTenantArgs{ TenantID: tenID, TestingKnobs: base.TestingKnobs{ - SpanConfig: h.scKnobs, + SpanConfig: h.scKnobs, + ProtectedTS: h.ptsKnobs, }, } var err error diff --git a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/tenant_state.go b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/tenant_state.go index dc7f7b933c16..7385bf53acf9 100644 --- a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/tenant_state.go +++ b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/tenant_state.go @@ -15,7 +15,9 @@ import ( gosql "database/sql" "testing" + "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigreconciler" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" @@ -23,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -48,6 +51,22 @@ type Tenant struct { } } +// ExecCfg returns a handle to the tenant's ExecutorConfig. +func (s *Tenant) ExecCfg() sql.ExecutorConfig { + return s.ExecutorConfig().(sql.ExecutorConfig) +} + +// ProtectedTimestampProvider returns a handle to the tenant's protected +// timestamp provider. +func (s *Tenant) ProtectedTimestampProvider() protectedts.Provider { + return s.DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider +} + +// JobsRegistry returns a handle to the tenant's job registry. +func (s *Tenant) JobsRegistry() *jobs.Registry { + return s.JobRegistry().(*jobs.Registry) +} + // Exec is a wrapper around gosql.Exec that kills the test on error. It records // the execution timestamp for subsequent use. func (s *Tenant) Exec(query string, args ...interface{}) { diff --git a/pkg/spanconfig/spanconfigtestutils/utils.go b/pkg/spanconfig/spanconfigtestutils/utils.go index 4afe6d7c5a2a..bbfaa3a2fddb 100644 --- a/pkg/spanconfig/spanconfigtestutils/utils.go +++ b/pkg/spanconfig/spanconfigtestutils/utils.go @@ -15,11 +15,15 @@ import ( "fmt" "reflect" "regexp" + "sort" + "strconv" "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/datadriven" "github.com/stretchr/testify/require" ) @@ -236,6 +240,16 @@ func PrintSpanConfigDiffedAgainstDefaults(conf roachpb.SpanConfig) string { if !reflect.DeepEqual(conf.LeasePreferences, defaultConf.LeasePreferences) { diffs = append(diffs, fmt.Sprintf("lease_preferences=%v", conf.VoterConstraints)) } + if !reflect.DeepEqual(conf.ProtectedTimestamps, defaultConf.ProtectedTimestamps) { + sort.Slice(conf.ProtectedTimestamps, func(i, j int) bool { + return conf.ProtectedTimestamps[i].Less(conf.ProtectedTimestamps[j]) + }) + timestamps := make([]string, 0, len(conf.ProtectedTimestamps)) + for _, pts := range conf.ProtectedTimestamps { + timestamps = append(timestamps, strconv.Itoa(int(pts.WallTime))) + } + diffs = append(diffs, fmt.Sprintf("pts=[%s]", strings.Join(timestamps, " "))) + } return strings.Join(diffs, " ") } @@ -324,3 +338,43 @@ func GetSplitPoints(ctx context.Context, t *testing.T, reader spanconfig.StoreRe return splitPoints } + +// ParseProtectionTarget returns a ptpb.Target based on the input. This target +// could either refer to a Cluster, list of Tenants or SchemaObjects. +func ParseProtectionTarget(t *testing.T, input string) *ptpb.Target { + line := strings.Split(input, "\n") + if len(line) != 1 { + t.Fatal("only one target must be specified per protectedts operation") + } + target := line[0] + + const clusterPrefix, tenantPrefix, schemaObjectPrefix = "cluster", "tenants", "descs" + switch { + case strings.HasPrefix(target, clusterPrefix): + return ptpb.MakeClusterTarget() + case strings.HasPrefix(target, tenantPrefix): + target = strings.TrimPrefix(target, target[:len(tenantPrefix)+1]) + tenantIDs := strings.Split(target, ",") + ids := make([]roachpb.TenantID, 0, len(tenantIDs)) + for _, tenID := range tenantIDs { + id, err := strconv.Atoi(tenID) + require.NoError(t, err) + ids = append(ids, roachpb.MakeTenantID(uint64(id))) + } + return ptpb.MakeTenantsTarget(ids) + case strings.HasPrefix(target, schemaObjectPrefix): + target = strings.TrimPrefix(target, target[:len(schemaObjectPrefix)+1]) + schemaObjectIDs := strings.Split(target, ",") + ids := make([]descpb.ID, 0, len(schemaObjectIDs)) + for _, tenID := range schemaObjectIDs { + id, err := strconv.Atoi(tenID) + require.NoError(t, err) + ids = append(ids, descpb.ID(id)) + } + return ptpb.MakeSchemaObjectsTarget(ids) + default: + t.Fatalf("malformed line %q, expected to find prefix %q, %q or %q", target, tenantPrefix, + schemaObjectPrefix, clusterPrefix) + } + return nil +} diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal_tenant b/pkg/sql/logictest/testdata/logic_test/crdb_internal_tenant index 41d3eea1c0b9..14cb6a190bd1 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal_tenant +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal_tenant @@ -11,6 +11,8 @@ SELECT node_id, name FROM crdb_internal.leases ORDER BY name 0 eventlog 0 jobs 0 locations +0 protected_ts_meta +0 protected_ts_records 0 role_members 0 role_options 0 scheduled_jobs