Skip to content

Commit

Permalink
protectedts: flip testing knob to enable multi-tenant PTS
Browse files Browse the repository at this point in the history
This change switches the `EnableProtectedTimestampForMultiTenant`
testing knob to `DisableProtectedTimestampForMultiTenant`. This means
that all tests will now run with the ptpb.Target and spanconfig backed
protectedts infrastructure by default.

Informs: cockroachdb#73727

Release note: None

Release justification: non-production code changes
  • Loading branch information
adityamaru committed Mar 11, 2022
1 parent 5755663 commit 8956ac3
Show file tree
Hide file tree
Showing 27 changed files with 352 additions and 259 deletions.
31 changes: 26 additions & 5 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -6436,6 +6437,9 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
conn := tc.ServerConn(0)
runner := sqlutils.MakeSQLRunner(conn)
runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
runner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test

close(allowRequest)

for _, testrun := range []struct {
Expand Down Expand Up @@ -6465,7 +6469,6 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
baseBackupURI := "nodelocal://0/foo" + testrun.name
testrun.runBackup(t, fmt.Sprintf(`BACKUP TABLE FOO TO '%s'`, baseBackupURI), runner) // create a base backup.
allowRequest = make(chan struct{})
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '100ms';")
runner.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 1;")
rRand, _ := randutil.NewTestRand()
writeGarbage := func(from, to int) {
Expand Down Expand Up @@ -6924,8 +6927,6 @@ func TestRestoreErrorPropagates(t *testing.T) {

// TestProtectedTimestampsFailDueToLimits ensures that when creating a protected
// timestamp record fails, we return the correct error.
//
// TODO(adityamaru): Remove in 22.2 once no records protect spans.
func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -6941,12 +6942,32 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "CREATE TABLE bar (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_spans = 1")
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_bytes = 1")

// Creating the protected timestamp record should fail because there are too
// many spans. Ensure that we get the appropriate error.
_, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`)
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans")
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+30 > 1 bytes")

// TODO(adityamaru): Remove in 22.2 once no records protect spans.
t.Run("deprecated-spans-limit", func(t *testing.T) {
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir
params.ServerArgs.Knobs.ProtectedTS = &protectedts.TestingKnobs{
DisableProtectedTimestampForMultiTenant: true}
tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(ctx)
db := tc.ServerConn(0)
runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "CREATE TABLE bar (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_spans = 1")

// Creating the protected timestamp record should fail because there are too
// many spans. Ensure that we get the appropriate error.
_, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`)
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans")
})
}

func TestPaginatedBackupTenant(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ go_test(
"//pkg/server/status",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/descbuilder",
Expand Down
144 changes: 85 additions & 59 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
Expand Down Expand Up @@ -3936,42 +3938,38 @@ func TestChangefeedUpdateProtectedTimestamp(t *testing.T) {
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
ctx := context.Background()
ptsInterval := 50 * time.Millisecond
changefeedbase.ProtectTimestampInterval.Override(
context.Background(), &f.Server().ClusterSettings().SV, ptsInterval)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`)
defer closeFeed(t, foo)

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
f.Server().DB(), keys.SystemSQLCodec, "d", "foo")
tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)
ptsProvider := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider

var tableID int
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+
`WHERE name = 'foo' AND database_name = current_database()`).
Scan(&tableID)

getTablePtsRecord := func() *ptpb.Record {
var r *ptpb.Record
require.NoError(t, ptsProvider.Refresh(context.Background(), f.Server().Clock().Now()))
ptsProvider.Iterate(context.Background(), tableSpan.Key, tableSpan.EndKey, func(record *ptpb.Record) (wantMore bool) {
r = record
return false
})

expectedKeys := map[string]struct{}{
string(keys.SystemSQLCodec.TablePrefix(uint32(tableID))): {},
string(keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)): {},
}
require.Equal(t, len(r.DeprecatedSpans), len(expectedKeys))
for _, s := range r.DeprecatedSpans {
require.Contains(t, expectedKeys, string(s.Key))
}
return r
// Get the protection policies that will be written on the span configs
// emitted by the tenant's reconciliation job.
getProtectionPolicies := func(ctx context.Context, txn *kv.Txn) []roachpb.ProtectionPolicy {
ptsState, err := ptsProvider.GetState(ctx, txn)
require.NoError(t, err)
ptsStateReader := spanconfig.NewProtectedTimestampStateReader(ctx, ptsState)
protections := ptsStateReader.GetProtectionPoliciesForSchemaObject(fooDesc.GetID())
require.Len(t, protections, 1)
descProtection := ptsStateReader.GetProtectionPoliciesForSchemaObject(keys.DescriptorTableID)
require.Len(t, protections, 1)
return append(protections, descProtection...)
}

// Wait and return the next resolved timestamp after the wait time
Expand All @@ -3990,8 +3988,26 @@ func TestChangefeedUpdateProtectedTimestamp(t *testing.T) {
// Progress the changefeed and allow time for a pts record to be laid down
nextResolved := waitAndDrainResolved(100 * time.Millisecond)
time.Sleep(2 * ptsInterval)
rec := getTablePtsRecord()
require.LessOrEqual(t, nextResolved.GoTime().UnixNano(), rec.Timestamp.GoTime().UnixNano())

var pp []roachpb.ProtectionPolicy
// Get the protection policies that have been written by the changefeed job.
require.NoError(t, f.Server().DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
pp = getProtectionPolicies(ctx, txn)
return nil
}))
// We expect to see two protection policies corresponding to the record
// protecting the user table and the descriptor table.
for i := range pp {
require.LessOrEqual(t, nextResolved.GoTime().UnixNano(),
pp[i].ProtectedTimestamp.GoTime().UnixNano())
}

// TODO(CDC): Given the frequency with which the test is writing pts
// records it is hard to enforce a strict equality check on the records
// written and the state reconciled in KV. Maybe there is a way to "pause"
// the resolved timestamp so that we can allow KV state to be reconciled
// upto a point, and then match the protection policies persisted in KV with
// the policies in `pp` above.
}
}

Expand Down Expand Up @@ -4048,43 +4064,43 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
}
return nil
})
mkGetPtsRec = func(t *testing.T, ptp protectedts.Provider, clock *hlc.Clock) func() *ptpb.Record {
return func() (r *ptpb.Record) {
t.Helper()
require.NoError(t, ptp.Refresh(ctx, clock.Now()))
ptp.Iterate(ctx, userSpan.Key, userSpan.EndKey, func(record *ptpb.Record) (wantMore bool) {
r = record
return false
})
return r
mkGetProtections = func(t *testing.T, ptp protectedts.Provider,
srv serverutils.TestServerInterface, ptsReader spanconfig.ProtectedTSReader) func() []hlc.Timestamp {
return func() (r []hlc.Timestamp) {
require.NoError(t,
spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, srv.Clock().Now()))
protections, _, err := ptsReader.GetProtectionTimestamps(ctx, keys.EverythingSpan)
require.NoError(t, err)
return protections
}
}
mkCheckRecord = func(t *testing.T, tableID int) func(r *ptpb.Record) error {
expectedKeys := map[string]struct{}{
mkCheckProtection = func(t *testing.T, tableID int) func(protections []hlc.Timestamp) error {
expectedSpansWithProtections := map[string]struct{}{
string(keys.SystemSQLCodec.TablePrefix(uint32(tableID))): {},
string(keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)): {},
}
return func(ptr *ptpb.Record) error {
if ptr == nil {
return errors.Errorf("expected protected timestamp")
}
require.Equal(t, len(ptr.DeprecatedSpans), len(expectedKeys), ptr.DeprecatedSpans, expectedKeys)
for _, s := range ptr.DeprecatedSpans {
require.Contains(t, expectedKeys, string(s.Key))
}
return func(protections []hlc.Timestamp) error {
require.Equal(t, len(expectedSpansWithProtections), len(protections))
return nil
}
}
checkNoRecord = func(ptr *ptpb.Record) error {
if ptr != nil {
return errors.Errorf("expected protected timestamp to not exist, found %v", ptr)
checkProtection = func(protections []hlc.Timestamp) error {
if len(protections) == 0 {
return errors.New("expected protected timestamp to exist")
}
return nil
}
checkNoProtection = func(protections []hlc.Timestamp) error {
if len(protections) != 0 {
return errors.Errorf("expected protected timestamp to not exist, found %v", protections)
}
return nil
}
mkWaitForRecordCond = func(t *testing.T, getRecord func() *ptpb.Record, check func(record *ptpb.Record) error) func() {
mkWaitForProtectionCond = func(t *testing.T, getProtection func() []hlc.Timestamp,
check func(protection []hlc.Timestamp) error) func() {
return func() {
t.Helper()
testutils.SucceedsSoon(t, func() error { return check(getRecord()) })
testutils.SucceedsSoon(t, func() error { return check(getProtection()) })
}
}
)
Expand All @@ -4093,6 +4109,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
defer close(done)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';`)
sqlDB.Exec(t, `ALTER RANGE default CONFIGURE ZONE USING gc.ttlseconds = 100`)
sqlDB.Exec(t, `ALTER RANGE system CONFIGURE ZONE USING gc.ttlseconds = 100`)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
Expand All @@ -4107,19 +4125,27 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
context.Background(), &f.Server().ClusterSettings().SV, 100*time.Millisecond)

ptp := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider
getPtsRec := mkGetPtsRec(t, ptp, f.Server().Clock())
waitForRecord := mkWaitForRecordCond(t, getPtsRec, mkCheckRecord(t, tableID))
waitForNoRecord := mkWaitForRecordCond(t, getPtsRec, checkNoRecord)
store, err := f.Server().GetStores().(*kvserver.Stores).GetStore(f.Server().GetFirstStoreID())
require.NoError(t, err)
ptsReader := store.GetStoreConfig().ProtectedTimestampReader
getPtsRec := mkGetProtections(t, ptp, f.Server(), ptsReader)
waitForProtection := mkWaitForProtectionCond(t, getPtsRec, checkProtection)
waitForTableAndDescriptorProtection := mkWaitForProtectionCond(t, getPtsRec, mkCheckProtection(t, tableID))
waitForNoProtection := mkWaitForProtectionCond(t, getPtsRec, checkNoProtection)
waitForBlocked := requestBlockedScan()
waitForRecordAdvanced := func(ts hlc.Timestamp) {
check := func(ptr *ptpb.Record) error {
if ptr != nil && !ptr.Timestamp.LessEq(ts) {
return nil
waitForProtectionAdvanced := func(ts hlc.Timestamp) {
check := func(protections []hlc.Timestamp) error {
if len(protections) != 0 {
for _, p := range protections {
if p.LessEq(ts) {
return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, p)
}
}
}
return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, ptr.Timestamp)
return nil
}

mkWaitForRecordCond(t, getPtsRec, check)()
mkWaitForProtectionCond(t, getPtsRec, check)()
}

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved`)
Expand All @@ -4128,7 +4154,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
// Ensure that there's a protected timestamp on startup that goes
// away after the initial scan.
unblock := waitForBlocked()
require.NotNil(t, getPtsRec())
waitForProtection()
unblock()
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
Expand All @@ -4138,7 +4164,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
`foo: [8]->{"after": {"a": 8, "b": "e"}}`,
})
resolved, _ := expectResolvedTimestamp(t, foo)
waitForRecordAdvanced(resolved)
waitForProtectionAdvanced(resolved)
}

{
Expand All @@ -4147,7 +4173,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
waitForBlocked = requestBlockedScan()
sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN c INT NOT NULL DEFAULT 1`)
unblock := waitForBlocked()
waitForRecord()
waitForTableAndDescriptorProtection()
unblock()
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a", "c": 1}}`,
Expand All @@ -4157,7 +4183,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
`foo: [8]->{"after": {"a": 8, "b": "e", "c": 1}}`,
})
resolved, _ := expectResolvedTimestamp(t, foo)
waitForRecordAdvanced(resolved)
waitForProtectionAdvanced(resolved)
}

{
Expand All @@ -4166,9 +4192,9 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
waitForBlocked = requestBlockedScan()
sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN d INT NOT NULL DEFAULT 2`)
_ = waitForBlocked()
waitForRecord()
waitForTableAndDescriptorProtection()
sqlDB.Exec(t, `CANCEL JOB $1`, foo.(cdctest.EnterpriseTestFeed).JobID())
waitForNoRecord()
waitForNoProtection()
}
}, feedTestNoTenants, withArgsFn(func(args *base.TestServerArgs) {
storeKnobs := &kvserver.StoreTestingKnobs{}
Expand Down
13 changes: 1 addition & 12 deletions pkg/ccl/jobsccl/jobsprotectedtsccl/jobs_protected_ts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,6 @@ func TestJobsProtectedTimestamp(t *testing.T) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
ProtectedTS: &protectedts.TestingKnobs{
EnableProtectedTimestampForMultiTenant: true,
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
},
},
Expand Down Expand Up @@ -259,15 +256,7 @@ func TestSchedulesProtectedTimestamp(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
ProtectedTS: &protectedts.TestingKnobs{
EnableProtectedTimestampForMultiTenant: true,
},
},
},
})
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

// Now I want to create some artifacts that should get reconciled away and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestDataDriven(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
}

spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs, nil /* ptsKnobs */)
spanConfigTestCluster := spanconfigtestcluster.NewHandle(t, tc, scKnobs)
defer spanConfigTestCluster.Cleanup()

kvSubscriber := tc.Server(0).SpanConfigKVSubscriber().(spanconfig.KVSubscriber)
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/spanconfigccl/spanconfigreconcilerccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ go_test(
"//pkg/ccl/partitionccl",
"//pkg/ccl/utilccl",
"//pkg/jobs",
"//pkg/kv/kvserver/protectedts",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
Expand Down
Loading

0 comments on commit 8956ac3

Please sign in to comment.