Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-24.1: changefeedccl: protect system.role_members using protected timestamps #131648

Merged
merged 1 commit into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/protected_timestamps.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ var systemTablesToProtect = []descpb.ID{
keys.DescriptorTableID,
keys.CommentsTableID,
keys.ZonesTableID,
// Required for CDC Queries.
keys.RoleMembersTableID,
// TODO(#128806): identify and add any more required tables (such as, possibly, `keys.UsersTableID`)
}

func makeTargetToProtect(targets changefeedbase.Targets) *ptpb.Target {
// NB: We add 1 because we're also going to protect system.descriptors.
// We protect system.descriptors because a changefeed needs all of the history
// of table descriptors to version data.
tablesToProtect := make(descpb.IDs, 0, targets.NumUniqueTables()+len(systemTablesToProtect))
_ = targets.EachTableID(func(id descpb.ID) error {
tablesToProtect = append(tablesToProtect, id)
Expand Down
51 changes: 47 additions & 4 deletions pkg/ccl/changefeedccl/protected_timestamps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -447,9 +448,9 @@ func TestChangefeedCanceledWhenPTSIsOld(t *testing.T) {
cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)
}

// TestPTSRecordProtectsTargetsAndDescriptorTable tests that descriptors are not
// GC'd when they are protected by a PTS record.
func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) {
// TestPTSRecordProtectsTargetsAndSystemTables tests that descriptors and other
// required tables are not GC'd when they are protected by a PTS record.
func TestPTSRecordProtectsTargetsAndSystemTables(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

Expand All @@ -459,6 +460,8 @@ func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `ALTER DATABASE system CONFIGURE ZONE USING gc.ttlseconds = 1`)
sqlDB.Exec(t, "CREATE TABLE foo (a INT, b STRING)")
sqlDB.Exec(t, `CREATE USER test`)
sqlDB.Exec(t, `GRANT admin TO test`)
ts := s.Clock().Now()
ctx := context.Background()

Expand Down Expand Up @@ -516,6 +519,10 @@ func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) {
// Alter foo few times, then force GC at ts-1.
sqlDB.Exec(t, "ALTER TABLE foo ADD COLUMN c STRING")
sqlDB.Exec(t, "ALTER TABLE foo ADD COLUMN d STRING")

// Remove this entry from role_members.
sqlDB.Exec(t, "REVOKE admin FROM test")

time.Sleep(2 * time.Second)
// If you want to GC all system tables:
//
Expand All @@ -528,11 +535,16 @@ func TestPTSRecordProtectsTargetsAndDescriptorTable(t *testing.T) {
gcTestTableRange("system", "descriptor")
gcTestTableRange("system", "zones")
gcTestTableRange("system", "comments")
gcTestTableRange("system", "role_members")

// We can still fetch table descriptors because of protected timestamp record.
// We can still fetch table descriptors and role members because of protected timestamp record.
asOf := ts
_, err := fetchTableDescriptors(ctx, &execCfg, targets, asOf)
require.NoError(t, err)
// The role_members entry we removed is still visible at the asOf time because of the PTS record.
rms, err := fetchRoleMembers(ctx, &execCfg, asOf)
require.NoError(t, err)
require.Contains(t, rms, []string{"admin", "test"})
}

// TestChangefeedUpdateProtectedTimestamp tests that changefeeds using the
Expand Down Expand Up @@ -671,3 +683,34 @@ func TestChangefeedMigratesProtectedTimestamps(t *testing.T) {

cdcTestWithSystem(t, testFn, feedTestEnterpriseSinks)
}

func fetchRoleMembers(
ctx context.Context, execCfg *sql.ExecutorConfig, ts hlc.Timestamp,
) ([][]string, error) {
var roleMembers [][]string
err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
if err := txn.KV().SetFixedTimestamp(ctx, ts); err != nil {
return err
}
it, err := txn.QueryIteratorEx(ctx, "test-get-role-members", txn.KV(), sessiondata.NoSessionDataOverride, "SELECT role, member FROM system.role_members")
if err != nil {
return err
}
defer func() { _ = it.Close() }()

var ok bool
for ok, err = it.Next(ctx); ok && err == nil; ok, err = it.Next(ctx) {
role, member := string(tree.MustBeDString(it.Cur()[0])), string(tree.MustBeDString(it.Cur()[1]))
roleMembers = append(roleMembers, []string{role, member})
}
if err != nil {
return err
}

return nil
})
if err != nil {
return nil, err
}
return roleMembers, nil
}
7 changes: 6 additions & 1 deletion pkg/upgrade/upgrades/v24_1_migrate_pts_records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,19 @@ func TestMigrateOldStlePTSRecords(t *testing.T) {
allTargets = append(allTargets, []catid.DescID{
keys.DescriptorTableID,
keys.ZonesTableID,
keys.RoleMembersTableID,
keys.CommentsTableID,
tableDesc.GetID()})
descIDsArr = append(descIDsArr, tableDesc.GetID())
allTables = append(allTables, tbl)
}
_, err = sqlDB.Exec(fmt.Sprintf("create changefeed for %s INTO 'null://'", strings.Join(allTables, ",")))
require.NoError(t, err)
descIDsArr = append(descIDsArr, keys.DescriptorTableID, keys.ZonesTableID, keys.CommentsTableID)
descIDsArr = append(descIDsArr,
keys.DescriptorTableID,
keys.ZonesTableID,
keys.RoleMembersTableID,
keys.CommentsTableID)
sort.Slice(descIDsArr, func(i int, j int) bool {
return descIDsArr[i] < descIDsArr[j]
})
Expand Down