Skip to content

Commit

Permalink
protectedts: flip testing knob to enable multi-tenant PTS
Browse files Browse the repository at this point in the history
This change switches the `EnableProtectedTimestampForMultiTenant`
testing knob to `DisableProtectedTimestampForMultiTenant`. This means
that all tests will now run with the ptpb.Target and spanconfig backed
protectedts infrastructure by default.

Informs: #73727

Release note: None

Release justification: non-production code changes
  • Loading branch information
adityamaru committed Mar 14, 2022
1 parent d44874b commit 9da1051
Show file tree
Hide file tree
Showing 27 changed files with 418 additions and 280 deletions.
32 changes: 25 additions & 7 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6437,6 +6437,9 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
conn := tc.ServerConn(0)
runner := sqlutils.MakeSQLRunner(conn)
runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
runner.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test

close(allowRequest)

for _, testrun := range []struct {
Expand Down Expand Up @@ -6466,7 +6469,6 @@ func TestProtectedTimestampsDuringBackup(t *testing.T) {
baseBackupURI := "nodelocal://0/foo" + testrun.name
testrun.runBackup(t, fmt.Sprintf(`BACKUP TABLE FOO TO '%s'`, baseBackupURI), runner) // create a base backup.
allowRequest = make(chan struct{})
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '100ms';")
runner.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING gc.ttlseconds = 1;")
rRand, _ := randutil.NewTestRand()
writeGarbage := func(from, to int) {
Expand Down Expand Up @@ -6925,8 +6927,6 @@ func TestRestoreErrorPropagates(t *testing.T) {

// TestProtectedTimestampsFailDueToLimits ensures that when creating a protected
// timestamp record fails, we return the correct error.
//
// TODO(adityamaru): Remove in 22.2 once no records protect spans.
func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -6942,12 +6942,32 @@ func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "CREATE TABLE bar (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_spans = 1")
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_bytes = 1")

// Creating the protected timestamp record should fail because there are too
// many spans. Ensure that we get the appropriate error.
_, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`)
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans")
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+30 > 1 bytes")

// TODO(adityamaru): Remove in 22.2 once no records protect spans.
t.Run("deprecated-spans-limit", func(t *testing.T) {
params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir
params.ServerArgs.Knobs.ProtectedTS = &protectedts.TestingKnobs{
DisableProtectedTimestampForMultiTenant: true}
tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(ctx)
db := tc.ServerConn(0)
runner := sqlutils.MakeSQLRunner(db)
runner.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "CREATE TABLE bar (k INT PRIMARY KEY, v BYTES)")
runner.Exec(t, "SET CLUSTER SETTING kv.protectedts.max_spans = 1")

// Creating the protected timestamp record should fail because there are too
// many spans. Ensure that we get the appropriate error.
_, err := db.Exec(`BACKUP TABLE foo, bar TO 'nodelocal://0/foo'`)
require.EqualError(t, err, "pq: protectedts: limit exceeded: 0+2 > 1 spans")
})
}

func TestPaginatedBackupTenant(t *testing.T) {
Expand Down Expand Up @@ -9740,8 +9760,6 @@ func TestExcludeDataFromBackupDoesNotHoldupGC(t *testing.T) {
DisableGCQueue: true,
DisableLastProcessedCheck: true,
}
params.ServerArgs.Knobs.ProtectedTS = &protectedts.TestingKnobs{
EnableProtectedTimestampForMultiTenant: true}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(ctx)
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ go_test(
"//pkg/server/status",
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/descbuilder",
Expand Down
197 changes: 126 additions & 71 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
Expand Down Expand Up @@ -3936,43 +3938,25 @@ func TestChangefeedUpdateProtectedTimestamp(t *testing.T) {
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
ctx := context.Background()
ptsInterval := 50 * time.Millisecond
changefeedbase.ProtectTimestampInterval.Override(
context.Background(), &f.Server().ClusterSettings().SV, ptsInterval)

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';")
sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'") // speeds up the test
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '20ms'`)
defer closeFeed(t, foo)

fooDesc := desctestutils.TestingGetPublicTableDescriptor(
f.Server().DB(), keys.SystemSQLCodec, "d", "foo")
tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)
ptsProvider := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider

var tableID int
sqlDB.QueryRow(t, `SELECT table_id FROM crdb_internal.tables `+
`WHERE name = 'foo' AND database_name = current_database()`).
Scan(&tableID)

getTablePtsRecord := func() *ptpb.Record {
var r *ptpb.Record
require.NoError(t, ptsProvider.Refresh(context.Background(), f.Server().Clock().Now()))
ptsProvider.Iterate(context.Background(), tableSpan.Key, tableSpan.EndKey, func(record *ptpb.Record) (wantMore bool) {
r = record
return false
})

expectedKeys := map[string]struct{}{
string(keys.SystemSQLCodec.TablePrefix(uint32(tableID))): {},
string(keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)): {},
}
require.Equal(t, len(r.DeprecatedSpans), len(expectedKeys))
for _, s := range r.DeprecatedSpans {
require.Contains(t, expectedKeys, string(s.Key))
}
return r
}
ptp := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider
store, err := f.Server().GetStores().(*kvserver.Stores).GetStore(f.Server().GetFirstStoreID())
require.NoError(t, err)
ptsReader := store.GetStoreConfig().ProtectedTimestampReader

// Wait and return the next resolved timestamp after the wait time
waitAndDrainResolved := func(ts time.Duration) hlc.Timestamp {
Expand All @@ -3985,13 +3969,62 @@ func TestChangefeedUpdateProtectedTimestamp(t *testing.T) {
}
}

mkGetProtections := func(t *testing.T, ptp protectedts.Provider,
srv serverutils.TestServerInterface, ptsReader spanconfig.ProtectedTSReader,
span roachpb.Span) func() []hlc.Timestamp {
return func() (r []hlc.Timestamp) {
require.NoError(t,
spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, srv.Clock().Now()))
protections, _, err := ptsReader.GetProtectionTimestamps(ctx, span)
require.NoError(t, err)
return protections
}
}

mkWaitForProtectionCond := func(t *testing.T, getProtection func() []hlc.Timestamp,
check func(protection []hlc.Timestamp) error) func() {
return func() {
t.Helper()
testutils.SucceedsSoon(t, func() error { return check(getProtection()) })
}
}

// Setup helpers on the system.descriptors table.
descriptorTableKey := keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)
descriptorTableSpan := roachpb.Span{
Key: descriptorTableKey, EndKey: descriptorTableKey.PrefixEnd(),
}
getDescriptorTableProtection := mkGetProtections(t, ptp, f.Server(), ptsReader,
descriptorTableSpan)

// Setup helpers on the user table.
tableKey := keys.SystemSQLCodec.TablePrefix(uint32(fooDesc.GetID()))
tableSpan := roachpb.Span{
Key: tableKey, EndKey: tableKey.PrefixEnd(),
}
getTableProtection := mkGetProtections(t, ptp, f.Server(), ptsReader, tableSpan)
waitForProtectionAdvanced := func(ts hlc.Timestamp, getProtection func() []hlc.Timestamp) {
check := func(protections []hlc.Timestamp) error {
if len(protections) == 0 {
return errors.New("expected protection but found none")
}
for _, p := range protections {
if p.LessEq(ts) {
return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, p)
}
}
return nil
}

mkWaitForProtectionCond(t, getProtection, check)()
}

// Observe the protected timestamp advancing along with resolved timestamps
for i := 0; i < 5; i++ {
// Progress the changefeed and allow time for a pts record to be laid down
nextResolved := waitAndDrainResolved(100 * time.Millisecond)
time.Sleep(2 * ptsInterval)
rec := getTablePtsRecord()
require.LessOrEqual(t, nextResolved.GoTime().UnixNano(), rec.Timestamp.GoTime().UnixNano())
waitForProtectionAdvanced(nextResolved, getTableProtection)
waitForProtectionAdvanced(nextResolved, getDescriptorTableProtection)
}
}

Expand Down Expand Up @@ -4048,43 +4081,34 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
}
return nil
})
mkGetPtsRec = func(t *testing.T, ptp protectedts.Provider, clock *hlc.Clock) func() *ptpb.Record {
return func() (r *ptpb.Record) {
t.Helper()
require.NoError(t, ptp.Refresh(ctx, clock.Now()))
ptp.Iterate(ctx, userSpan.Key, userSpan.EndKey, func(record *ptpb.Record) (wantMore bool) {
r = record
return false
})
return r
mkGetProtections = func(t *testing.T, ptp protectedts.Provider,
srv serverutils.TestServerInterface, ptsReader spanconfig.ProtectedTSReader,
span roachpb.Span) func() []hlc.Timestamp {
return func() (r []hlc.Timestamp) {
require.NoError(t,
spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, srv.Clock().Now()))
protections, _, err := ptsReader.GetProtectionTimestamps(ctx, span)
require.NoError(t, err)
return protections
}
}
mkCheckRecord = func(t *testing.T, tableID int) func(r *ptpb.Record) error {
expectedKeys := map[string]struct{}{
string(keys.SystemSQLCodec.TablePrefix(uint32(tableID))): {},
string(keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)): {},
}
return func(ptr *ptpb.Record) error {
if ptr == nil {
return errors.Errorf("expected protected timestamp")
}
require.Equal(t, len(ptr.DeprecatedSpans), len(expectedKeys), ptr.DeprecatedSpans, expectedKeys)
for _, s := range ptr.DeprecatedSpans {
require.Contains(t, expectedKeys, string(s.Key))
}
return nil
checkProtection = func(protections []hlc.Timestamp) error {
if len(protections) == 0 {
return errors.New("expected protected timestamp to exist")
}
return nil
}
checkNoRecord = func(ptr *ptpb.Record) error {
if ptr != nil {
return errors.Errorf("expected protected timestamp to not exist, found %v", ptr)
checkNoProtection = func(protections []hlc.Timestamp) error {
if len(protections) != 0 {
return errors.Errorf("expected protected timestamp to not exist, found %v", protections)
}
return nil
}
mkWaitForRecordCond = func(t *testing.T, getRecord func() *ptpb.Record, check func(record *ptpb.Record) error) func() {
mkWaitForProtectionCond = func(t *testing.T, getProtection func() []hlc.Timestamp,
check func(protection []hlc.Timestamp) error) func() {
return func() {
t.Helper()
testutils.SucceedsSoon(t, func() error { return check(getRecord()) })
testutils.SucceedsSoon(t, func() error { return check(getProtection()) })
}
}
)
Expand All @@ -4093,6 +4117,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
defer close(done)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.protectedts.poll_interval = '10ms';`)
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms';`)
sqlDB.Exec(t, `ALTER RANGE default CONFIGURE ZONE USING gc.ttlseconds = 100`)
sqlDB.Exec(t, `ALTER RANGE system CONFIGURE ZONE USING gc.ttlseconds = 100`)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
Expand All @@ -4107,19 +4133,44 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
context.Background(), &f.Server().ClusterSettings().SV, 100*time.Millisecond)

ptp := f.Server().DistSQLServer().(*distsql.ServerImpl).ServerConfig.ProtectedTimestampProvider
getPtsRec := mkGetPtsRec(t, ptp, f.Server().Clock())
waitForRecord := mkWaitForRecordCond(t, getPtsRec, mkCheckRecord(t, tableID))
waitForNoRecord := mkWaitForRecordCond(t, getPtsRec, checkNoRecord)
store, err := f.Server().GetStores().(*kvserver.Stores).GetStore(f.Server().GetFirstStoreID())
require.NoError(t, err)
ptsReader := store.GetStoreConfig().ProtectedTimestampReader

// Setup helpers on the system.descriptors table.
descriptorTableKey := keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)
descriptorTableSpan := roachpb.Span{
Key: descriptorTableKey, EndKey: descriptorTableKey.PrefixEnd(),
}
getDescriptorTableProtection := mkGetProtections(t, ptp, f.Server(), ptsReader,
descriptorTableSpan)
waitForDescriptorTableProtection := mkWaitForProtectionCond(t, getDescriptorTableProtection,
checkProtection)
waitForNoDescriptorTableProtection := mkWaitForProtectionCond(t, getDescriptorTableProtection,
checkNoProtection)

// Setup helpers on the user table.
tableKey := keys.SystemSQLCodec.TablePrefix(uint32(tableID))
tableSpan := roachpb.Span{
Key: tableKey, EndKey: tableKey.PrefixEnd(),
}
getTableProtection := mkGetProtections(t, ptp, f.Server(), ptsReader, tableSpan)
waitForTableProtection := mkWaitForProtectionCond(t, getTableProtection, checkProtection)
waitForNoTableProtection := mkWaitForProtectionCond(t, getTableProtection, checkNoProtection)
waitForBlocked := requestBlockedScan()
waitForRecordAdvanced := func(ts hlc.Timestamp) {
check := func(ptr *ptpb.Record) error {
if ptr != nil && !ptr.Timestamp.LessEq(ts) {
return nil
waitForProtectionAdvanced := func(ts hlc.Timestamp, getProtection func() []hlc.Timestamp) {
check := func(protections []hlc.Timestamp) error {
if len(protections) != 0 {
for _, p := range protections {
if p.LessEq(ts) {
return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, p)
}
}
}
return errors.Errorf("expected protected timestamp to exceed %v, found %v", ts, ptr.Timestamp)
return nil
}

mkWaitForRecordCond(t, getPtsRec, check)()
mkWaitForProtectionCond(t, getProtection, check)()
}

foo := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved`)
Expand All @@ -4128,7 +4179,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
// Ensure that there's a protected timestamp on startup that goes
// away after the initial scan.
unblock := waitForBlocked()
require.NotNil(t, getPtsRec())
waitForTableProtection()
unblock()
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a"}}`,
Expand All @@ -4138,7 +4189,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
`foo: [8]->{"after": {"a": 8, "b": "e"}}`,
})
resolved, _ := expectResolvedTimestamp(t, foo)
waitForRecordAdvanced(resolved)
waitForProtectionAdvanced(resolved, getTableProtection)
}

{
Expand All @@ -4147,7 +4198,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
waitForBlocked = requestBlockedScan()
sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN c INT NOT NULL DEFAULT 1`)
unblock := waitForBlocked()
waitForRecord()
waitForTableProtection()
waitForDescriptorTableProtection()
unblock()
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "a", "c": 1}}`,
Expand All @@ -4157,7 +4209,8 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
`foo: [8]->{"after": {"a": 8, "b": "e", "c": 1}}`,
})
resolved, _ := expectResolvedTimestamp(t, foo)
waitForRecordAdvanced(resolved)
waitForProtectionAdvanced(resolved, getTableProtection)
waitForProtectionAdvanced(resolved, getDescriptorTableProtection)
}

{
Expand All @@ -4166,9 +4219,11 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
waitForBlocked = requestBlockedScan()
sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN d INT NOT NULL DEFAULT 2`)
_ = waitForBlocked()
waitForRecord()
waitForTableProtection()
waitForDescriptorTableProtection()
sqlDB.Exec(t, `CANCEL JOB $1`, foo.(cdctest.EnterpriseTestFeed).JobID())
waitForNoRecord()
waitForNoTableProtection()
waitForNoDescriptorTableProtection()
}
}, feedTestNoTenants, withArgsFn(func(args *base.TestServerArgs) {
storeKnobs := &kvserver.StoreTestingKnobs{}
Expand Down
Loading

0 comments on commit 9da1051

Please sign in to comment.