diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 767a78d3e1fa..71524f3dc1c1 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -52,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -6436,6 +6437,9 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) { conn := tc.ServerConn(0) runner := sqlutils.MakeSQLRunner(conn) runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)") + runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';") + runner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test + close(allowRequest) for _, testrun := range []struct { @@ -6465,7 +6469,6 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) { baseBackupURI := "nodelocal://0/foo" + testrun.name testrun.runBackup(t, fmt.Sprintf(`BACKUP TABLE FOO TO '%s'`, baseBackupURI), runner) // create a base backup. allowRequest = make(chan struct{}) - runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '100ms';") runner.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 1;") rRand, _ := randutil.NewTestRand() writeGarbage := func(from, to int) { @@ -6924,8 +6927,6 @@ func TestRestoreErrorPropagates(t *testing.T) { // TestProtectedTimestampsFailDueToLimits ensures that when creating a protected // timestamp record fails, we return the correct error. -// -// TODO(adityamaru): Remove in 22.2 once no records protect spans. func TestProtectedTimestampsFailDueToLimits(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -6941,12 +6942,32 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) { runner := sqlutils.MakeSQLRunner(db) runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)") runner.Exec(t, "CREATE TABLE bar (k INT PRIMARY KEY, v BYTES)") - runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_spans = 1") + runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_bytes = 1") // Creating the protected timestamp record should fail because there are too // many spans. Ensure that we get the appropriate error. _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`) - require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans") + require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+30 > 1 bytes") + + // TODO(adityamaru): Remove in 22.2 once no records protect spans. + t.Run("deprecated-spans-limit", func(t *testing.T) { + params := base.TestClusterArgs{} + params.ServerArgs.ExternalIODir = dir + params.ServerArgs.Knobs.ProtectedTS = &protectedts.TestingKnobs{ + DisableProtectedTimestampForMultiTenant: true} + tc := testcluster.StartTestCluster(t, 1, params) + defer tc.Stopper().Stop(ctx) + db := tc.ServerConn(0) + runner := sqlutils.MakeSQLRunner(db) + runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)") + runner.Exec(t, "CREATE TABLE bar (k INT PRIMARY KEY, v BYTES)") + runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_spans = 1") + + // Creating the protected timestamp record should fail because there are too + // many spans. Ensure that we get the appropriate error. + _, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`) + require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans") + }) } func TestPaginatedBackupTenant(t *testing.T) { diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 43da7d3ba702..23d8328c5a5b 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -182,6 +182,8 @@ go_test( "//pkg/server/status", "//pkg/server/telemetry", "//pkg/settings/cluster", + "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigptsreader", "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/descbuilder", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e64b007e6a88..8c528b4b25fa 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -54,6 +54,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" @@ -3936,18 +3938,20 @@ func TestChangefeedUpdateProtectedTimestamp(t *testing.T) { defer log.Scope(t).Close(t) testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + ctx := context.Background() ptsInterval := 50 * time.Millisecond changefeedbase.ProtectTimestampInterval.Override( context.Background(), &f.Server().ClusterSettings().SV, ptsInterval) sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';") + sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`) defer closeFeed(t, foo) fooDesc := desctestutils.TestingGetPublicTableDescriptor( f.Server().DB(), keys.SystemSQLCodec, "d", "foo") - tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) ptsProvider := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider var tableID int @@ -3955,23 +3959,17 @@ func TestChangefeedUpdateProtectedTimestamp(t *testing.T) { `WHERE name = 'foo' AND database_name = current_database()`). Scan(&tableID) - getTablePtsRecord := func() *ptpb.Record { - var r *ptpb.Record - require.NoError(t, ptsProvider.Refresh(context.Background(), f.Server().Clock().Now())) - ptsProvider.Iterate(context.Background(), tableSpan.Key, tableSpan.EndKey, func(record *ptpb.Record) (wantMore bool) { - r = record - return false - }) - - expectedKeys := map[string]struct{}{ - string(keys.SystemSQLCodec.TablePrefix(uint32(tableID))): {}, - string(keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)): {}, - } - require.Equal(t, len(r.DeprecatedSpans), len(expectedKeys)) - for _, s := range r.DeprecatedSpans { - require.Contains(t, expectedKeys, string(s.Key)) - } - return r + // Get the protection policies that will be written on the span configs + // emitted by the tenant's reconciliation job. + getProtectionPolicies := func(ctx context.Context, txn *kv.Txn) []roachpb.ProtectionPolicy { + ptsState, err := ptsProvider.GetState(ctx, txn) + require.NoError(t, err) + ptsStateReader := spanconfig.NewProtectedTimestampStateReader(ctx, ptsState) + protections := ptsStateReader.GetProtectionPoliciesForSchemaObject(fooDesc.GetID()) + require.Len(t, protections, 1) + descProtection := ptsStateReader.GetProtectionPoliciesForSchemaObject(keys.DescriptorTableID) + require.Len(t, protections, 1) + return append(protections, descProtection...) } // Wait and return the next resolved timestamp after the wait time @@ -3990,8 +3988,26 @@ func TestChangefeedUpdateProtectedTimestamp(t *testing.T) { // Progress the changefeed and allow time for a pts record to be laid down nextResolved := waitAndDrainResolved(100 * time.Millisecond) time.Sleep(2 * ptsInterval) - rec := getTablePtsRecord() - require.LessOrEqual(t, nextResolved.GoTime().UnixNano(), rec.Timestamp.GoTime().UnixNano()) + + var pp []roachpb.ProtectionPolicy + // Get the protection policies that have been written by the changefeed job. + require.NoError(t, f.Server().DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + pp = getProtectionPolicies(ctx, txn) + return nil + })) + // We expect to see two protection policies corresponding to the record + // protecting the user table and the descriptor table. + for i := range pp { + require.LessOrEqual(t, nextResolved.GoTime().UnixNano(), + pp[i].ProtectedTimestamp.GoTime().UnixNano()) + } + + // TODO(CDC): Given the frequency with which the test is writing pts + // records it is hard to enforce a strict equality check on the records + // written and the state reconciled in KV. Maybe there is a way to "pause" + // the resolved timestamp so that we can allow KV state to be reconciled + // upto a point, and then match the protection policies persisted in KV with + // the policies in `pp` above. } } @@ -4048,43 +4064,43 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { } return nil }) - mkGetPtsRec = func(t *testing.T, ptp protectedts.Provider, clock *hlc.Clock) func() *ptpb.Record { - return func() (r *ptpb.Record) { - t.Helper() - require.NoError(t, ptp.Refresh(ctx, clock.Now())) - ptp.Iterate(ctx, userSpan.Key, userSpan.EndKey, func(record *ptpb.Record) (wantMore bool) { - r = record - return false - }) - return r + mkGetProtections = func(t *testing.T, ptp protectedts.Provider, + srv serverutils.TestServerInterface, ptsReader spanconfig.ProtectedTSReader) func() []hlc.Timestamp { + return func() (r []hlc.Timestamp) { + require.NoError(t, + spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, srv.Clock().Now())) + protections, _, err := ptsReader.GetProtectionTimestamps(ctx, keys.EverythingSpan) + require.NoError(t, err) + return protections } } - mkCheckRecord = func(t *testing.T, tableID int) func(r *ptpb.Record) error { - expectedKeys := map[string]struct{}{ + mkCheckProtection = func(t *testing.T, tableID int) func(protections []hlc.Timestamp) error { + expectedSpansWithProtections := map[string]struct{}{ string(keys.SystemSQLCodec.TablePrefix(uint32(tableID))): {}, string(keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)): {}, } - return func(ptr *ptpb.Record) error { - if ptr == nil { - return errors.Errorf("expected protected timestamp") - } - require.Equal(t, len(ptr.DeprecatedSpans), len(expectedKeys), ptr.DeprecatedSpans, expectedKeys) - for _, s := range ptr.DeprecatedSpans { - require.Contains(t, expectedKeys, string(s.Key)) - } + return func(protections []hlc.Timestamp) error { + require.Equal(t, len(expectedSpansWithProtections), len(protections)) return nil } } - checkNoRecord = func(ptr *ptpb.Record) error { - if ptr != nil { - return errors.Errorf("expected protected timestamp to not exist, found %v", ptr) + checkProtection = func(protections []hlc.Timestamp) error { + if len(protections) == 0 { + return errors.New("expected protected timestamp to exist") + } + return nil + } + checkNoProtection = func(protections []hlc.Timestamp) error { + if len(protections) != 0 { + return errors.Errorf("expected protected timestamp to not exist, found %v", protections) } return nil } - mkWaitForRecordCond = func(t *testing.T, getRecord func() *ptpb.Record, check func(record *ptpb.Record) error) func() { + mkWaitForProtectionCond = func(t *testing.T, getProtection func() []hlc.Timestamp, + check func(protection []hlc.Timestamp) error) func() { return func() { t.Helper() - testutils.SucceedsSoon(t, func() error { return check(getRecord()) }) + testutils.SucceedsSoon(t, func() error { return check(getProtection()) }) } } ) @@ -4093,6 +4109,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { defer close(done) sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';`) + sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';`) sqlDB.Exec(t, `ALTER RANGE default CONFIGURE ZONE USING gc.ttlseconds = 100`) sqlDB.Exec(t, `ALTER RANGE system CONFIGURE ZONE USING gc.ttlseconds = 100`) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) @@ -4107,19 +4125,27 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { context.Background(), &f.Server().ClusterSettings().SV, 100*time.Millisecond) ptp := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider - getPtsRec := mkGetPtsRec(t, ptp, f.Server().Clock()) - waitForRecord := mkWaitForRecordCond(t, getPtsRec, mkCheckRecord(t, tableID)) - waitForNoRecord := mkWaitForRecordCond(t, getPtsRec, checkNoRecord) + store, err := f.Server().GetStores().(*kvserver.Stores).GetStore(f.Server().GetFirstStoreID()) + require.NoError(t, err) + ptsReader := store.GetStoreConfig().ProtectedTimestampReader + getPtsRec := mkGetProtections(t, ptp, f.Server(), ptsReader) + waitForProtection := mkWaitForProtectionCond(t, getPtsRec, checkProtection) + waitForTableAndDescriptorProtection := mkWaitForProtectionCond(t, getPtsRec, mkCheckProtection(t, tableID)) + waitForNoProtection := mkWaitForProtectionCond(t, getPtsRec, checkNoProtection) waitForBlocked := requestBlockedScan() - waitForRecordAdvanced := func(ts hlc.Timestamp) { - check := func(ptr *ptpb.Record) error { - if ptr != nil && !ptr.Timestamp.LessEq(ts) { - return nil + waitForProtectionAdvanced := func(ts hlc.Timestamp) { + check := func(protections []hlc.Timestamp) error { + if len(protections) != 0 { + for _, p := range protections { + if p.LessEq(ts) { + return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, p) + } + } } - return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, ptr.Timestamp) + return nil } - mkWaitForRecordCond(t, getPtsRec, check)() + mkWaitForProtectionCond(t, getPtsRec, check)() } foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved`) @@ -4128,7 +4154,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { // Ensure that there's a protected timestamp on startup that goes // away after the initial scan. unblock := waitForBlocked() - require.NotNil(t, getPtsRec()) + waitForProtection() unblock() assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1, "b": "a"}}`, @@ -4138,7 +4164,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { `foo: [8]->{"after": {"a": 8, "b": "e"}}`, }) resolved, _ := expectResolvedTimestamp(t, foo) - waitForRecordAdvanced(resolved) + waitForProtectionAdvanced(resolved) } { @@ -4147,7 +4173,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { waitForBlocked = requestBlockedScan() sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN c INT NOT NULL DEFAULT 1`) unblock := waitForBlocked() - waitForRecord() + waitForTableAndDescriptorProtection() unblock() assertPayloads(t, foo, []string{ `foo: [1]->{"after": {"a": 1, "b": "a", "c": 1}}`, @@ -4157,7 +4183,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { `foo: [8]->{"after": {"a": 8, "b": "e", "c": 1}}`, }) resolved, _ := expectResolvedTimestamp(t, foo) - waitForRecordAdvanced(resolved) + waitForProtectionAdvanced(resolved) } { @@ -4166,9 +4192,9 @@ func TestChangefeedProtectedTimestamps(t *testing.T) { waitForBlocked = requestBlockedScan() sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN d INT NOT NULL DEFAULT 2`) _ = waitForBlocked() - waitForRecord() + waitForTableAndDescriptorProtection() sqlDB.Exec(t, `CANCEL JOB $1`, foo.(cdctest.EnterpriseTestFeed).JobID()) - waitForNoRecord() + waitForNoProtection() } }, feedTestNoTenants, withArgsFn(func(args *base.TestServerArgs) { storeKnobs := &kvserver.StoreTestingKnobs{} diff --git a/pkg/ccl/jobsccl/jobsprotectedtsccl/jobs_protected_ts_test.go b/pkg/ccl/jobsccl/jobsprotectedtsccl/jobs_protected_ts_test.go index 4d71ebf1147a..9e7c43d4c71f 100644 --- a/pkg/ccl/jobsccl/jobsprotectedtsccl/jobs_protected_ts_test.go +++ b/pkg/ccl/jobsccl/jobsprotectedtsccl/jobs_protected_ts_test.go @@ -138,9 +138,6 @@ func TestJobsProtectedTimestamp(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ - ProtectedTS: &protectedts.TestingKnobs{ - EnableProtectedTimestampForMultiTenant: true, - }, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), }, }, @@ -259,15 +256,7 @@ func TestSchedulesProtectedTimestamp(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - ProtectedTS: &protectedts.TestingKnobs{ - EnableProtectedTimestampForMultiTenant: true, - }, - }, - }, - }) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) // Now I want to create some artifacts that should get reconciled away and diff --git a/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go index 20b83423b274..61714ac106ad 100644 --- a/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigcomparedccl/datadriven_test.go @@ -110,7 +110,7 @@ func TestDataDriven(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) } - spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */) + spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs) defer spanConfigTestCluster.Cleanup() kvSubscriber := tc.Server(0).SpanConfigKVSubscriber().(spanconfig.KVSubscriber) diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel index 840eb2a12674..9923ad6f5f49 100644 --- a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel @@ -13,7 +13,6 @@ go_test( "//pkg/ccl/partitionccl", "//pkg/ccl/utilccl", "//pkg/jobs", - "//pkg/kv/kvserver/protectedts", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go index d0671ba0ddc2..d62877556522 100644 --- a/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigreconcilerccl/datadriven_test.go @@ -19,7 +19,6 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/jobs" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" @@ -96,13 +95,11 @@ func TestDataDriven(t *testing.T) { // Checkpoint noops frequently; speeds this test up. SQLWatcherCheckpointNoopsEveryDurationOverride: 100 * time.Millisecond, } - ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speeds up test SpanConfig: scKnobs, - ProtectedTS: ptsKnobs, }, }, }) @@ -114,7 +111,7 @@ func TestDataDriven(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) } - spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, ptsKnobs) + spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs) defer spanConfigTestCluster.Cleanup() systemTenant := spanConfigTestCluster.InitializeTenant(ctx, roachpb.SystemTenantID) diff --git a/pkg/ccl/spanconfigccl/spanconfigsplitterccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigsplitterccl/datadriven_test.go index 748dec6dc355..23d32e8186f4 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsplitterccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsplitterccl/datadriven_test.go @@ -70,7 +70,7 @@ func TestDataDriven(t *testing.T) { }) defer tc.Stopper().Stop(ctx) - spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil) + spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs) defer spanConfigTestCluster.Cleanup() var tenant *spanconfigtestcluster.Tenant diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel index 87dd237f5200..f40864bf1aeb 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/BUILD.bazel @@ -13,7 +13,6 @@ go_test( "//pkg/ccl/partitionccl", "//pkg/ccl/utilccl", "//pkg/config/zonepb", - "//pkg/kv/kvserver/protectedts", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go index d8c65cbeda58..6e74524baeb4 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl/datadriven_test.go @@ -19,7 +19,6 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/config/zonepb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigtestutils" @@ -87,23 +86,17 @@ func TestDataDriven(t *testing.T) { // test cluster). ManagerDisableJobCreation: true, } - // TODO(adityamaru): Delete once the value for this knob defaults to true - // prior to release-22.1. - ptsKnobs := &protectedts.TestingKnobs{ - EnableProtectedTimestampForMultiTenant: true, - } datadriven.Walk(t, testutils.TestDataPath(t), func(t *testing.T, path string) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ - SpanConfig: scKnobs, - ProtectedTS: ptsKnobs, + SpanConfig: scKnobs, }, }, }) defer tc.Stopper().Stop(ctx) - spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, ptsKnobs) + spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs) defer spanConfigTestCluster.Cleanup() var tenant *spanconfigtestcluster.Tenant diff --git a/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/BUILD.bazel b/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/BUILD.bazel index 19e82075ac23..d89dc6dc1aa1 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/BUILD.bazel +++ b/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/BUILD.bazel @@ -16,7 +16,6 @@ go_test( "//pkg/jobs", "//pkg/keys", "//pkg/kv/kvclient/rangefeed", - "//pkg/kv/kvserver/protectedts", "//pkg/roachpb", "//pkg/security", "//pkg/security/securitytest", diff --git a/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go b/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go index e3c1959bb843..adeb62e5fc9c 100644 --- a/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go +++ b/pkg/ccl/spanconfigccl/spanconfigsqlwatcherccl/sqlwatcher_test.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" @@ -66,9 +65,6 @@ func TestSQLWatcherReactsToUpdates(t *testing.T) { ManagerDisableJobCreation: true, // disable the automatic job creation. }, JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), // speed up schema changes. - ProtectedTS: &protectedts.TestingKnobs{ - EnableProtectedTimestampForMultiTenant: true, - }, }, }, }) diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 8e8c388fb3ad..5eb0fabeb90b 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -317,6 +317,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient", "//pkg/kv/kvclient/kvcoord", + "//pkg/kv/kvclient/rangefeed/rangefeedcache", "//pkg/kv/kvserver/abortspan", "//pkg/kv/kvserver/apply", "//pkg/kv/kvserver/batcheval", diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index 74eb5378c892..ca6295b1e2b8 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -124,6 +125,13 @@ func TestProtectedTimestamps(t *testing.T) { return startKey } + getTableID := func() descpb.ID { + var tableID descpb.ID + require.NoError(t, + conn.QueryRow(`SELECT id FROM system.namespace WHERE name = 'foo'`).Scan(&tableID)) + return tableID + } + getStoreAndReplica := func() (*kvserver.Store, *kvserver.Replica) { startKey := getTableStartKey() // Okay great now we have a key and can go find replicas and stores and what not. @@ -176,17 +184,11 @@ func TestProtectedTimestamps(t *testing.T) { pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor), nil /* knobs */) ptsWithDB := ptstorage.WithDatabase(pts, s0.DB()) - startKey := getTableStartKey() ptsRec := ptpb.Record{ ID: uuid.MakeV4().GetBytes(), Timestamp: s0.Clock().Now(), Mode: ptpb.PROTECT_AFTER, - DeprecatedSpans: []roachpb.Span{ - { - Key: startKey, - EndKey: startKey.PrefixEnd(), - }, - }, + Target: ptpb.MakeSchemaObjectsTarget([]descpb.ID{getTableID()}), } require.NoError(t, ptsWithDB.Protect(ctx, nil /* txn */, &ptsRec)) upsertUntilBackpressure() diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 681c1ba18854..6b392ec95431 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedcache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" @@ -41,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/kvclientutils" @@ -3452,6 +3454,10 @@ func TestTransferLeaseBlocksWrites(t *testing.T) { // TestStrictGCEnforcement ensures that strict GC enforcement is respected and // furthermore is responsive to changes in protected timestamps and in changes // to the zone configs. +// +// TODO(adityamaru,arulajmani): Once the protectedts.Cache goes away this test +// will probably need a rewrite since it is tightly coupled with the freshness +// of the Cache. func TestStrictGCEnforcement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -3464,8 +3470,27 @@ func TestStrictGCEnforcement(t *testing.T) { } ctx := context.Background() + var mu struct { + syncutil.Mutex + blockOnTimestampUpdate func() + } tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SpanConfig: &spanconfig.TestingKnobs{ + KVSubscriberRangeFeedKnobs: &rangefeedcache.TestingKnobs{ + OnTimestampAdvance: func(timestamp hlc.Timestamp) { + mu.Lock() + defer mu.Unlock() + if mu.blockOnTimestampUpdate != nil { + mu.blockOnTimestampUpdate() + } + }, + }, + }, + }, + }, }) defer tc.Stopper().Stop(ctx) @@ -3483,11 +3508,12 @@ func TestStrictGCEnforcement(t *testing.T) { tenSecondsAgo hlc.Timestamp // written in setup tableKey = keys.SystemSQLCodec.TablePrefix(tableID) tableSpan = roachpb.Span{Key: tableKey, EndKey: tableKey.PrefixEnd()} + tableTarget = ptpb.MakeSchemaObjectsTarget([]descpb.ID{descpb.ID(tableID)}) mkRecord = func() ptpb.Record { return ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: tenSecondsAgo.Add(-10*time.Second.Nanoseconds(), 0), - DeprecatedSpans: []roachpb.Span{tableSpan}, + ID: uuid.MakeV4().GetBytes(), + Timestamp: tenSecondsAgo.Add(-10*time.Second.Nanoseconds(), 0), + Target: tableTarget, } } mkStaleTxn = func() *kv.Txn { @@ -3512,24 +3538,6 @@ func TestStrictGCEnforcement(t *testing.T) { t.Helper() require.NoError(t, performScan()) } - // Make sure the cache has been updated. Once it has then we know it won't - // be for minutes. It should read on startup. - waitForCacheAfter = func(t *testing.T, min hlc.Timestamp) { - t.Helper() - testutils.SucceedsSoon(t, func() error { - for i := 0; i < tc.NumServers(); i++ { - ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader - _, asOf, err := ptsReader.GetProtectionTimestamps(ctx, tableSpan) - if err != nil { - return err - } - if asOf.Less(min) { - return errors.Errorf("not yet read") - } - } - return nil - }) - } setGCTTL = func(t *testing.T, object string, exp int) { t.Helper() testutils.SucceedsSoon(t, func() error { @@ -3586,16 +3594,23 @@ func TestStrictGCEnforcement(t *testing.T) { require.NoError(t, err) } } - refreshCacheAndUpdatePTSState = func(t *testing.T, nodeID roachpb.NodeID) { + waitForProtectionAndReadProtectedTimestamps = func(t *testing.T, nodeID roachpb.NodeID, + protectionTimestamp hlc.Timestamp, span roachpb.Span) { for i := 0; i < tc.NumServers(); i++ { if tc.Server(i).NodeID() != nodeID { continue } - ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider - require.NoError(t, ptp.Refresh(ctx, tc.Server(i).Clock().Now())) + ptsReader := tc.GetFirstStoreFromServer(t, i).GetStoreConfig().ProtectedTimestampReader _, r := getFirstStoreReplica(t, tc.Server(i), tableKey) - err := r.ReadProtectedTimestamps(ctx) - require.NoError(t, err) + testutils.SucceedsSoon(t, func() error { + if err := verifyProtectionTimestampExistsOnSpans(ctx, t, tc, ptsReader, protectionTimestamp, + []roachpb.Span{span}); err != nil { + return errors.Newf("protection timestamp %s does not exist on span %s", + protectionTimestamp, span) + } + return nil + }) + require.NoError(t, r.ReadProtectedTimestamps(ctx)) } } ) @@ -3610,8 +3625,8 @@ func TestStrictGCEnforcement(t *testing.T) { require.NoError(t, err) setTableGCTTL(t, 1) - waitForCacheAfter(t, hlc.Timestamp{}) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '10 ms'") defer sqlDB.Exec(t, `SET CLUSTER SETTING kv.gc_ttl.strict_enforcement.enabled = DEFAULT`) setStrictGC(t, true) tenSecondsAgo = tc.Server(0).Clock().Now().Add(-10*time.Second.Nanoseconds(), 0) @@ -3640,22 +3655,41 @@ func TestStrictGCEnforcement(t *testing.T) { require.NoError(t, err) }) t.Run("protected timestamps are respected", func(t *testing.T) { - waitForCacheAfter(t, hlc.Timestamp{}) + // Block the KVSubscriber rangefeed from progressing. + blockKVSubscriberCh := make(chan struct{}) + var isBlocked bool + mu.Lock() + mu.blockOnTimestampUpdate = func() { + isBlocked = true + <-blockKVSubscriberCh + } + mu.Unlock() + + // Ensure that the KVSubscriber has been blocked. + testutils.SucceedsSoon(t, func() error { + if !isBlocked { + return errors.New("kvsubscriber not blocked yet") + } + return nil + }) + ptp := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider assertScanRejected(t) - // Create a protected timestamp, don't verify it, make sure it's not - // respected. + // Create a protected timestamp, and make sure it's not respected since the + // KVSubscriber is blocked. rec := mkRecord() require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return ptp.Protect(ctx, txn, &rec) })) assertScanRejected(t) + // Unblock the KVSubscriber and wait for the PTS record to reach KV. + close(blockKVSubscriberCh) desc, err := tc.LookupRange(tableKey) require.NoError(t, err) target, err := tc.FindRangeLeaseHolder(desc, nil) require.NoError(t, err) - refreshCacheAndUpdatePTSState(t, target.NodeID) + waitForProtectionAndReadProtectedTimestamps(t, target.NodeID, rec.Timestamp, tableSpan) assertScanOk(t) // Transfer the lease and demonstrate that the query succeeds because we're diff --git a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel index 1a46390e209c..f5e266ff2d34 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel @@ -41,6 +41,7 @@ go_test( "//pkg/security", "//pkg/security/securitytest", "//pkg/server", + "//pkg/sql", "//pkg/sql/catalog/systemschema", "//pkg/sql/sqlutil", "//pkg/testutils", diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index 37a5dd43ad1b..4bd86edfe757 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -44,11 +45,20 @@ import ( func TestCacheBasic(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + ProtectedTS: &protectedts.TestingKnobs{ + DisableProtectedTimestampForMultiTenant: true, + }, + }, + }, + }) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), - nil /* knobs */), s.DB()) + p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), + s.InternalExecutor().(sqlutil.InternalExecutor), + &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB()) // Set the poll interval to be very short. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Microsecond) @@ -105,19 +115,23 @@ func TestRefresh(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() st := &scanTracker{} + ptsKnobs := &protectedts.TestingKnobs{ + DisableProtectedTimestampForMultiTenant: true, + } tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ TestingRequestFilter: st.requestFilter, }, + ProtectedTS: ptsKnobs, }, }, }) defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), ptsKnobs), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) @@ -222,9 +236,17 @@ func TestStart(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() setup := func() (*testcluster.TestCluster, *ptcache.Cache) { - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + ProtectedTS: &protectedts.TestingKnobs{ + DisableProtectedTimestampForMultiTenant: true, + }, + }, + }, + }) s := tc.Server(0) - p := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */) + p := s.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ @@ -257,7 +279,8 @@ func TestQueryRecord(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), + &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) c := ptcache.New(ptcache.Config{ @@ -314,7 +337,8 @@ func TestIterate(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), + &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) @@ -375,7 +399,15 @@ func (recs *records) sorted() []*ptpb.Record { func TestGetProtectionTimestamps(t *testing.T) { ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + ProtectedTS: &protectedts.TestingKnobs{ + DisableProtectedTimestampForMultiTenant: true, + }, + }, + }, + }) defer tc.Stopper().Stop(ctx) // Set the poll interval to be very long. s := tc.Server(0) @@ -451,7 +483,8 @@ func TestGetProtectionTimestamps(t *testing.T) { } { t.Run(testCase.name, func(t *testing.T) { p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), + &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB()) c := ptcache.New(ptcache.Config{ Settings: s.ClusterSettings(), @@ -475,7 +508,8 @@ func TestSettingChangedLeadsToFetch(t *testing.T) { defer tc.Stopper().Stop(ctx) s := tc.Server(0) p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), - s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) + s.InternalExecutor().(sqlutil.InternalExecutor), + &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), s.DB()) // Set the poll interval to be very long. protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) diff --git a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go index f273002cf477..ca24342b7a52 100644 --- a/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go +++ b/pkg/kv/kvserver/protectedts/ptreconcile/reconciler_test.go @@ -37,75 +37,85 @@ func TestReconciler(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) - defer tc.Stopper().Stop(ctx) - - // Now I want to create some artifacts that should get reconciled away and - // then make sure that they do and others which should not do not. - s0 := tc.Server(0) - ptp := s0.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider - - settings := cluster.MakeTestingClusterSettings() - const testTaskType = "foo" - var state = struct { - mu syncutil.Mutex - toRemove map[string]struct{} - }{} - state.toRemove = map[string]struct{}{} - r := ptreconcile.New(settings, s0.DB(), ptp, - ptreconcile.StatusFuncs{ - testTaskType: func( - ctx context.Context, txn *kv.Txn, meta []byte, - ) (shouldRemove bool, err error) { - state.mu.Lock() - defer state.mu.Unlock() - _, shouldRemove = state.toRemove[string(meta)] - return shouldRemove, nil + testutils.RunTrueAndFalse(t, "reconciler", func(t *testing.T, withDeprecatedSpans bool) { + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + ProtectedTS: &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: withDeprecatedSpans}, + }, }, }) - require.NoError(t, r.StartReconciler(ctx, s0.Stopper())) - recMeta := "a" - rec1 := ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: s0.Clock().Now(), - Mode: ptpb.PROTECT_AFTER, - MetaType: testTaskType, - Meta: []byte(recMeta), - DeprecatedSpans: []roachpb.Span{ - {Key: keys.MinKey, EndKey: keys.MaxKey}, - }, - } - require.NoError(t, s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - return ptp.Protect(ctx, txn, &rec1) - })) + defer tc.Stopper().Stop(ctx) - t.Run("update settings", func(t *testing.T) { - ptreconcile.ReconcileInterval.Override(ctx, &settings.SV, time.Millisecond) - testutils.SucceedsSoon(t, func() error { - require.Equal(t, int64(0), r.Metrics().RecordsRemoved.Count()) - require.Equal(t, int64(0), r.Metrics().ReconciliationErrors.Count()) - if processed := r.Metrics().RecordsProcessed.Count(); processed < 1 { - return errors.Errorf("expected processed to be at least 1, got %d", processed) - } - return nil + // Now I want to create some artifacts that should get reconciled away and + // then make sure that they do and others which should not do not. + s0 := tc.Server(0) + ptp := s0.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider + + settings := cluster.MakeTestingClusterSettings() + const testTaskType = "foo" + var state = struct { + mu syncutil.Mutex + toRemove map[string]struct{} + }{} + state.toRemove = map[string]struct{}{} + r := ptreconcile.New(settings, s0.DB(), ptp, + ptreconcile.StatusFuncs{ + testTaskType: func( + ctx context.Context, txn *kv.Txn, meta []byte, + ) (shouldRemove bool, err error) { + state.mu.Lock() + defer state.mu.Unlock() + _, shouldRemove = state.toRemove[string(meta)] + return shouldRemove, nil + }, + }) + require.NoError(t, r.StartReconciler(ctx, s0.Stopper())) + recMeta := "a" + rec1 := ptpb.Record{ + ID: uuid.MakeV4().GetBytes(), + Timestamp: s0.Clock().Now(), + Mode: ptpb.PROTECT_AFTER, + MetaType: testTaskType, + Meta: []byte(recMeta), + } + if withDeprecatedSpans { + rec1.DeprecatedSpans = []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}} + } else { + rec1.Target = ptpb.MakeClusterTarget() + } + require.NoError(t, s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + return ptp.Protect(ctx, txn, &rec1) + })) + + t.Run("update settings", func(t *testing.T) { + ptreconcile.ReconcileInterval.Override(ctx, &settings.SV, time.Millisecond) + testutils.SucceedsSoon(t, func() error { + require.Equal(t, int64(0), r.Metrics().RecordsRemoved.Count()) + require.Equal(t, int64(0), r.Metrics().ReconciliationErrors.Count()) + if processed := r.Metrics().RecordsProcessed.Count(); processed < 1 { + return errors.Errorf("expected processed to be at least 1, got %d", processed) + } + return nil + }) }) - }) - t.Run("reconcile", func(t *testing.T) { - state.mu.Lock() - state.toRemove[recMeta] = struct{}{} - state.mu.Unlock() + t.Run("reconcile", func(t *testing.T) { + state.mu.Lock() + state.toRemove[recMeta] = struct{}{} + state.mu.Unlock() - ptreconcile.ReconcileInterval.Override(ctx, &settings.SV, time.Millisecond) - testutils.SucceedsSoon(t, func() error { - require.Equal(t, int64(0), r.Metrics().ReconciliationErrors.Count()) - if removed := r.Metrics().RecordsRemoved.Count(); removed != 1 { - return errors.Errorf("expected processed to be 1, got %d", removed) - } - return nil + ptreconcile.ReconcileInterval.Override(ctx, &settings.SV, time.Millisecond) + testutils.SucceedsSoon(t, func() error { + require.Equal(t, int64(0), r.Metrics().ReconciliationErrors.Count()) + if removed := r.Metrics().RecordsRemoved.Count(); removed != 1 { + return errors.Errorf("expected processed to be 1, got %d", removed) + } + return nil + }) + require.Regexp(t, protectedts.ErrNotExists, s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + _, err := ptp.GetRecord(ctx, txn, rec1.ID.GetUUID()) + return err + })) }) - require.Regexp(t, protectedts.ErrNotExists, s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err := ptp.GetRecord(ctx, txn, rec1.ID.GetUUID()) - return err - })) }) } diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage.go b/pkg/kv/kvserver/protectedts/ptstorage/storage.go index 42f7b5f5ff57..a908a8faed5a 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage.go @@ -52,11 +52,12 @@ type storage struct { var _ protectedts.Storage = (*storage)(nil) +// TODO(adityamaru): Delete in 22.2. func useDeprecatedProtectedTSStorage( ctx context.Context, st *cluster.Settings, knobs *protectedts.TestingKnobs, ) bool { return !st.Version.IsActive(ctx, clusterversion.AlterSystemProtectedTimestampAddColumn) || - !knobs.EnableProtectedTimestampForMultiTenant + knobs.DisableProtectedTimestampForMultiTenant } // New creates a new Storage. diff --git a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go index bcfd954d4a40..b808de2a20f3 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/storage_test.go @@ -322,7 +322,7 @@ type testContext struct { db *kv.DB // If set to false, the test will be run with - // `EnableProtectedTimestampForMultiTenant` set to true, thereby testing the + // `DisableProtectedTimestampForMultiTenant` set to true, thereby testing the // "new" protected timestamp logic that runs on targets instead of spans. runWithDeprecatedSpans bool @@ -473,8 +473,8 @@ func (test testCase) run(t *testing.T) { var params base.TestServerArgs ptsKnobs := &protectedts.TestingKnobs{} - if !test.runWithDeprecatedSpans { - ptsKnobs.EnableProtectedTimestampForMultiTenant = true + if test.runWithDeprecatedSpans { + ptsKnobs.DisableProtectedTimestampForMultiTenant = true params.Knobs.ProtectedTS = ptsKnobs } tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) @@ -665,15 +665,15 @@ func TestCorruptData(t *testing.T) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ - SpanConfig: &spanconfig.TestingKnobs{ManagerDisableJobCreation: true}, + SpanConfig: &spanconfig.TestingKnobs{ManagerDisableJobCreation: true}, + ProtectedTS: &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}, }, }, }) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), - nil /* knobs */) + pts := s.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider tCtx := &testContext{runWithDeprecatedSpans: true} runCorruptDataTest(tCtx, s, tc, pts) @@ -684,14 +684,12 @@ func TestCorruptData(t *testing.T) { defer scope.Close(t) params, _ := tests.CreateTestServerParams() - ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} - params.Knobs.ProtectedTS = ptsKnobs params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) + pts := s.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider runCorruptDataTest(&testContext{}, s, tc, pts) }) t.Run("corrupt hlc timestamp", func(t *testing.T) { @@ -700,14 +698,12 @@ func TestCorruptData(t *testing.T) { defer scope.Close(t) params, _ := tests.CreateTestServerParams() - ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} - params.Knobs.ProtectedTS = ptsKnobs params.Knobs.SpanConfig = &spanconfig.TestingKnobs{ManagerDisableJobCreation: true} tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) s := tc.Server(0) - pts := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(*sql.InternalExecutor), ptsKnobs) + pts := s.ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider rec := newRecord(&testContext{}, s.Clock().Now(), "foo", []byte("bar"), tableTarget(42), tableSpan(42)) require.NoError(t, s.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { @@ -756,8 +752,6 @@ func TestCorruptData(t *testing.T) { func TestErrorsFromSQL(t *testing.T) { ctx := context.Background() params, _ := tests.CreateTestServerParams() - ptsKnobs := &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true} - params.Knobs.ProtectedTS = ptsKnobs tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: params}) defer tc.Stopper().Stop(ctx) @@ -765,7 +759,7 @@ func TestErrorsFromSQL(t *testing.T) { s := tc.Server(0) ie := s.InternalExecutor().(sqlutil.InternalExecutor) wrappedIE := &wrappedInternalExecutor{wrapped: ie} - pts := ptstorage.New(s.ClusterSettings(), wrappedIE, ptsKnobs) + pts := ptstorage.New(s.ClusterSettings(), wrappedIE, &protectedts.TestingKnobs{}) wrappedIE.setErrFunc(func(string) error { return errors.New("boom") diff --git a/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go b/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go index 97f20918969d..dba048afedcc 100644 --- a/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go +++ b/pkg/kv/kvserver/protectedts/ptstorage/validate_test.go @@ -79,7 +79,7 @@ func TestValidateRecordForProtect(t *testing.T) { t.Run(strconv.Itoa(i), func(t *testing.T) { st := cluster.MakeTestingClusterSettings() require.Equal(t, validateRecordForProtect(context.Background(), tc.r, st, - &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true}), tc.err) + &protectedts.TestingKnobs{}), tc.err) }) // Test that prior to the `AlterSystemProtectedTimestampAddColumn` migration @@ -94,7 +94,7 @@ func TestValidateRecordForProtect(t *testing.T) { } st := cluster.MakeTestingClusterSettings() require.Equal(t, validateRecordForProtect(context.Background(), r, st, - &protectedts.TestingKnobs{}), errEmptySpans) + &protectedts.TestingKnobs{DisableProtectedTimestampForMultiTenant: true}), errEmptySpans) }) } } diff --git a/pkg/kv/kvserver/protectedts/testing_knobs.go b/pkg/kv/kvserver/protectedts/testing_knobs.go index 5a57770f930b..8c3b1f1cb023 100644 --- a/pkg/kv/kvserver/protectedts/testing_knobs.go +++ b/pkg/kv/kvserver/protectedts/testing_knobs.go @@ -15,12 +15,12 @@ import "github.com/cockroachdb/cockroach/pkg/base" // TestingKnobs provide fine-grained control over the various span config // components for testing. type TestingKnobs struct { - // EnableProtectedTimestampForMultiTenant when set to true, runs the protected - // timestamp subsystem that depends on span configuration reconciliation. + // DisableProtectedTimestampForMultiTenant when set to true, runs the + // deprecated protected timestamp subsystem that does not work in a + // multi-tenant environment. // - // TODO(adityamaru,arulajmani): Default this to true, or flip it to - // `DisableProtectedTimestampForMultiTenant` prior to release 22.1. - EnableProtectedTimestampForMultiTenant bool + // TODO(adityamaru): Delete in 22.2. + DisableProtectedTimestampForMultiTenant bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. diff --git a/pkg/spanconfig/spanconfigptsreader/BUILD.bazel b/pkg/spanconfig/spanconfigptsreader/BUILD.bazel index c211793c9f7b..2e5ebc44e02a 100644 --- a/pkg/spanconfig/spanconfigptsreader/BUILD.bazel +++ b/pkg/spanconfig/spanconfigptsreader/BUILD.bazel @@ -6,12 +6,14 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", "//pkg/kv/kvserver/protectedts", "//pkg/roachpb", "//pkg/spanconfig", "//pkg/testutils", "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/spanconfig/spanconfigptsreader/adapter.go b/pkg/spanconfig/spanconfigptsreader/adapter.go index b6a853cb5566..80129e30493e 100644 --- a/pkg/spanconfig/spanconfigptsreader/adapter.go +++ b/pkg/spanconfig/spanconfigptsreader/adapter.go @@ -14,12 +14,14 @@ import ( "context" "testing" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) // adapter implements the spanconfig.ProtectedTSReader interface and is intended @@ -87,8 +89,12 @@ func TestingRefreshPTSState( if !ok { return errors.AssertionFailedf("could not convert protectedTSReader to adapter") } + // First refresh the cache past asOf. + require.NoError(t, a.cache.Refresh(ctx, asOf)) + + // Now ensure the KVSubscriber is fresh enough. testutils.SucceedsSoon(t, func() error { - _, fresh, err := a.GetProtectionTimestamps(ctx, roachpb.Span{}) + _, fresh, err := a.GetProtectionTimestamps(ctx, keys.EverythingSpan) if err != nil { return err } @@ -97,5 +103,5 @@ func TestingRefreshPTSState( } return nil }) - return a.cache.Refresh(ctx, asOf) + return nil } diff --git a/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel b/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel index a242d0180fc2..772abbb478e0 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel +++ b/pkg/spanconfig/spanconfigsqlwatcher/BUILD.bazel @@ -53,7 +53,6 @@ go_test( "//pkg/jobs/jobsprotectedts", "//pkg/keys", "//pkg/kv", - "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/security", diff --git a/pkg/spanconfig/spanconfigsqlwatcher/protectedtsdecoder_test.go b/pkg/spanconfig/spanconfigsqlwatcher/protectedtsdecoder_test.go index 8ddc8fedc380..692eefddf154 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/protectedtsdecoder_test.go +++ b/pkg/spanconfig/spanconfigsqlwatcher/protectedtsdecoder_test.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigsqlwatcher" @@ -37,12 +36,7 @@ func TestProtectedTimestampDecoder(t *testing.T) { defer leaktest.AfterTest(t)() ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - ProtectedTS: &protectedts.TestingKnobs{EnableProtectedTimestampForMultiTenant: true}}, - }, - }) + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) s0 := tc.Server(0) diff --git a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go index 45d2dedc632b..3969707d7534 100644 --- a/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go +++ b/pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster/cluster.go @@ -17,7 +17,6 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/spanconfig" @@ -33,26 +32,21 @@ import ( // cluster while providing convenient, scoped access to each tenant's specific // span config primitives. It's not safe for concurrent use. type Handle struct { - t *testing.T - tc *testcluster.TestCluster - ts map[roachpb.TenantID]*Tenant - scKnobs *spanconfig.TestingKnobs - ptsKnobs *protectedts.TestingKnobs + t *testing.T + tc *testcluster.TestCluster + ts map[roachpb.TenantID]*Tenant + scKnobs *spanconfig.TestingKnobs } // NewHandle returns a new Handle. func NewHandle( - t *testing.T, - tc *testcluster.TestCluster, - scKnobs *spanconfig.TestingKnobs, - ptsKnobs *protectedts.TestingKnobs, + t *testing.T, tc *testcluster.TestCluster, scKnobs *spanconfig.TestingKnobs, ) *Handle { return &Handle{ - t: t, - tc: tc, - ts: make(map[roachpb.TenantID]*Tenant), - scKnobs: scKnobs, - ptsKnobs: ptsKnobs, + t: t, + tc: tc, + ts: make(map[roachpb.TenantID]*Tenant), + scKnobs: scKnobs, } }