Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv,changefeedccl: make kv.rangefeed.enabled a TenantReadOnly setting #79224

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
@@ -478,8 +478,11 @@ func validateSettings(ctx context.Context, p sql.PlanHookState) error {
// Changefeeds are based on the Rangefeed abstraction, which
// requires the `kv.rangefeed.enabled` setting to be true.
if !kvserver.RangefeedEnabled.Get(&p.ExecCfg().Settings.SV) {
return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`))
docsURL := docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)
if !p.ExecCfg().Codec.ForSystemTenant() {
return errors.Errorf("rangefeeds require the system cluster operator set kv.rangefeed.enable to true for this tenant. See %s", docsURL)
}
return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", docsURL)
}

ok, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED)
7 changes: 4 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
@@ -374,7 +374,7 @@ func TestChangefeedTenants(t *testing.T) {

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, serverSetupStatements)
tenantSQL.Exec(t, tenantSetupStatements)

tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)
t.Run("changefeed on non-tenant table fails", func(t *testing.T) {
@@ -445,7 +445,7 @@ func TestChangefeedTenantsExternalIOEnabled(t *testing.T) {

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, serverSetupStatements)
tenantSQL.Exec(t, tenantSetupStatements)
tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)

t.Run("sinkful changefeed works", func(t *testing.T) {
@@ -952,7 +952,8 @@ func TestChangefeedExternalIODisabled(t *testing.T) {
})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, serverSetupStatements)
sqlDB.Exec(t, hostSetupStatements)
sqlDB.Exec(t, tenantSetupStatements)
sqlDB.Exec(t, "CREATE TABLE target_table (pk INT PRIMARY KEY)")
for _, proto := range disallowedSinkProtos {
sqlDB.ExpectErr(t, "Outbound IO is disabled by configuration, cannot create changefeed",
19 changes: 13 additions & 6 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
@@ -305,11 +305,15 @@ func expectResolvedTimestampAvro(t testing.TB, f cdctest.TestFeed) hlc.Timestamp
return parseTimeToHLC(t, resolved.(map[string]interface{})[`string`].(string))
}

var serverSetupStatements = `
var hostSetupStatements = `
ALTER TENANT ALL SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
`

var tenantSetupStatements = `
SET CLUSTER SETTING sql.defaults.vectorize=on;
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
CREATE DATABASE d;
`

@@ -359,7 +363,9 @@ func startTestFullServer(
}
}()

_, err = db.ExecContext(ctx, serverSetupStatements)
_, err = db.ExecContext(ctx, hostSetupStatements)
require.NoError(t, err)
_, err = db.ExecContext(ctx, tenantSetupStatements)
require.NoError(t, err)

if region := serverArgsRegion(args); region != "" {
@@ -399,7 +405,9 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB
require.NoError(t, err)
}
}()
_, err = db.ExecContext(ctx, serverSetupStatements)
_, err = db.ExecContext(ctx, hostSetupStatements)
require.NoError(t, err)
_, err = db.ExecContext(ctx, tenantSetupStatements)
require.NoError(t, err)

_, err = db.ExecContext(ctx, `ALTER DATABASE d PRIMARY REGION "us-east1"`)
@@ -431,8 +439,7 @@ func startTestTenant(
}

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
// Re-run setup on the tenant as well
_, err := tenantDB.ExecContext(ctx, serverSetupStatements)
_, err := tenantDB.ExecContext(ctx, tenantSetupStatements)
require.NoError(t, err)

server := &testServerShim{tenantServer, kvServer}
19 changes: 19 additions & 0 deletions pkg/cli/initial_sql.go
Original file line number Diff line number Diff line change
@@ -56,9 +56,28 @@ func runInitialSQL(
}
}

if err := enableRangefeeds(ctx, s); err != nil {
return err
}

return nil
}

func enableRangefeeds(ctx context.Context, s *server.Server) error {
return s.RunLocalSQL(ctx, func(ctx context.Context, ie *sql.InternalExecutor) error {
enableRangefeedStmt := "SET CLUSTER SETTING kv.rangefeed.enabled = true"
if _, err := ie.Exec(ctx, "enable-rangefeeds", nil, enableRangefeedStmt); err != nil {
return err
}

enableRangfeedForTenantsStmt := "ALTER TENANT ALL SET CLUSTER SETTING kv.rangefeed.enabled = true"
if _, err := ie.Exec(ctx, "enable-rangefeeds-for-tenants", nil, enableRangfeedForTenantsStmt); err != nil {
return err
}
return nil
})
}

// createAdminUser creates an admin user with the given name.
func createAdminUser(ctx context.Context, s *server.Server, adminUser, adminPassword string) error {
return s.RunLocalSQL(ctx,
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ import (
// ranges and ranges covering tables in the system database); this setting
// covers everything else.
var RangefeedEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
settings.TenantReadOnly,
"kv.rangefeed.enabled",
"if set, rangefeed registration is enabled",
false,