Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv,changefeedccl: make kv.rangefeed.enabled a TenantReadOnly setting #79224

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,11 @@ func validateSettings(ctx context.Context, p sql.PlanHookState) error {
// Changefeeds are based on the Rangefeed abstraction, which
// requires the `kv.rangefeed.enabled` setting to be true.
if !kvserver.RangefeedEnabled.Get(&p.ExecCfg().Settings.SV) {
return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`))
docsURL := docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)
if !p.ExecCfg().Codec.ForSystemTenant() {
return errors.Errorf("rangefeeds require the system cluster operator set kv.rangefeed.enable to true for this tenant. See %s", docsURL)
}
return errors.Errorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", docsURL)
}

ok, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED)
Expand Down
7 changes: 4 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func TestChangefeedTenants(t *testing.T) {

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, serverSetupStatements)
tenantSQL.Exec(t, tenantSetupStatements)

tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)
t.Run("changefeed on non-tenant table fails", func(t *testing.T) {
Expand Down Expand Up @@ -445,7 +445,7 @@ func TestChangefeedTenantsExternalIOEnabled(t *testing.T) {

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, serverSetupStatements)
tenantSQL.Exec(t, tenantSetupStatements)
tenantSQL.Exec(t, `CREATE TABLE foo_in_tenant (pk INT PRIMARY KEY)`)

t.Run("sinkful changefeed works", func(t *testing.T) {
Expand Down Expand Up @@ -952,7 +952,8 @@ func TestChangefeedExternalIODisabled(t *testing.T) {
})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, serverSetupStatements)
sqlDB.Exec(t, hostSetupStatements)
sqlDB.Exec(t, tenantSetupStatements)
sqlDB.Exec(t, "CREATE TABLE target_table (pk INT PRIMARY KEY)")
for _, proto := range disallowedSinkProtos {
sqlDB.ExpectErr(t, "Outbound IO is disabled by configuration, cannot create changefeed",
Expand Down
19 changes: 13 additions & 6 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,15 @@ func expectResolvedTimestampAvro(t testing.TB, f cdctest.TestFeed) hlc.Timestamp
return parseTimeToHLC(t, resolved.(map[string]interface{})[`string`].(string))
}

var serverSetupStatements = `
var hostSetupStatements = `
ALTER TENANT ALL SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.rangefeed.enabled = true;
SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s';
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
`

var tenantSetupStatements = `
SET CLUSTER SETTING sql.defaults.vectorize=on;
SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms';
CREATE DATABASE d;
`

Expand Down Expand Up @@ -359,7 +363,9 @@ func startTestFullServer(
}
}()

_, err = db.ExecContext(ctx, serverSetupStatements)
_, err = db.ExecContext(ctx, hostSetupStatements)
require.NoError(t, err)
_, err = db.ExecContext(ctx, tenantSetupStatements)
require.NoError(t, err)

if region := serverArgsRegion(args); region != "" {
Expand Down Expand Up @@ -399,7 +405,9 @@ func startTestCluster(t testing.TB) (serverutils.TestClusterInterface, *gosql.DB
require.NoError(t, err)
}
}()
_, err = db.ExecContext(ctx, serverSetupStatements)
_, err = db.ExecContext(ctx, hostSetupStatements)
require.NoError(t, err)
_, err = db.ExecContext(ctx, tenantSetupStatements)
require.NoError(t, err)

_, err = db.ExecContext(ctx, `ALTER DATABASE d PRIMARY REGION "us-east1"`)
Expand Down Expand Up @@ -431,8 +439,7 @@ func startTestTenant(
}

tenantServer, tenantDB := serverutils.StartTenant(t, kvServer, tenantArgs)
// Re-run setup on the tenant as well
_, err := tenantDB.ExecContext(ctx, serverSetupStatements)
_, err := tenantDB.ExecContext(ctx, tenantSetupStatements)
require.NoError(t, err)

server := &testServerShim{tenantServer, kvServer}
Expand Down
19 changes: 19 additions & 0 deletions pkg/cli/initial_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,28 @@ func runInitialSQL(
}
}

if err := enableRangefeeds(ctx, s); err != nil {
return err
}

return nil
}

func enableRangefeeds(ctx context.Context, s *server.Server) error {
return s.RunLocalSQL(ctx, func(ctx context.Context, ie *sql.InternalExecutor) error {
enableRangefeedStmt := "SET CLUSTER SETTING kv.rangefeed.enabled = true"
if _, err := ie.Exec(ctx, "enable-rangefeeds", nil, enableRangefeedStmt); err != nil {
return err
}

enableRangfeedForTenantsStmt := "ALTER TENANT ALL SET CLUSTER SETTING kv.rangefeed.enabled = true"
if _, err := ie.Exec(ctx, "enable-rangefeeds-for-tenants", nil, enableRangfeedForTenantsStmt); err != nil {
return err
}
return nil
})
}

// createAdminUser creates an admin user with the given name.
func createAdminUser(ctx context.Context, s *server.Server, adminUser, adminPassword string) error {
return s.RunLocalSQL(ctx,
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import (
// ranges and ranges covering tables in the system database); this setting
// covers everything else.
var RangefeedEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
settings.TenantReadOnly,
"kv.rangefeed.enabled",
"if set, rangefeed registration is enabled",
false,
Expand Down