diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index da22e7e8285f..1f74737dbd5b 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -105,6 +105,7 @@ go_library( "//pkg/sql/sem/volatility", "//pkg/sql/sessiondatapb", "//pkg/sql/sqlutil", + "//pkg/sql/syntheticprivilege", "//pkg/sql/types", "//pkg/util", "//pkg/util/admission", @@ -234,6 +235,7 @@ go_test( "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", + "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descbuilder", "//pkg/sql/catalog/descpb", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index d3b9fc399f3c..67e84713768a 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -100,6 +100,17 @@ func alterChangefeedPlanHook( return err } + jobPayload := job.Payload() + + // Having control job is not enough to allow them to modify the changefeed. + canAccess, userErr, err := sql.JobTypeSpecificPrivilegeCheck(ctx, p, jobID, &jobPayload, false) + if err != nil { + return err + } + if !canAccess { + return userErr + } + prevDetails, ok := job.Details().(jobspb.ChangefeedDetails) if !ok { return errors.Errorf(`job %d is not changefeed job`, jobID) @@ -123,11 +134,12 @@ func alterChangefeedPlanHook( return err } - newTargets, newProgress, newStatementTime, originalSpecs, err := generateNewTargets( + newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets( ctx, exprEval, p, alterChangefeedStmt.Cmds, newOptions.AsMap(), // TODO: Remove .AsMap() prevDetails, job.Progress(), + newSinkURI, ) if err != nil { return err @@ -320,7 +332,7 @@ func generateNewOpts( return changefeedbase.MakeStatementOptions(newOptions), sinkURI, nil } -func generateNewTargets( +func generateAndValidateNewTargets( ctx context.Context, exprEval exprutil.Evaluator, p sql.PlanHookState, @@ -328,6 +340,7 @@ func generateNewTargets( opts map[string]string, prevDetails jobspb.ChangefeedDetails, prevProgress jobspb.Progress, + sinkURI string, ) ( tree.ChangefeedTargets, *jobspb.Progress, @@ -490,6 +503,7 @@ func generateNewTargets( existingTargetSpans := fetchSpansForDescs(p, existingTargetDescs) var newTargetDescs []catalog.Descriptor for _, target := range v.Targets { + desc, found, err := getTargetDesc(ctx, p, descResolver, target.TableName) if err != nil { return nil, nil, hlc.Timestamp{}, nil, err @@ -501,6 +515,7 @@ func generateNewTargets( tree.ErrString(&target), ) } + k := targetKey{TableID: desc.GetID(), FamilyName: target.FamilyName} newTargets[k] = target newTableDescs[desc.GetID()] = desc @@ -546,6 +561,7 @@ func generateNewTargets( tree.ErrString(&target), ) } + newTableDescs[desc.GetID()] = desc delete(newTargets, k) } telemetry.CountBucketed(telemetryPath+`.dropped_targets`, int64(len(v.Targets))) @@ -580,6 +596,20 @@ func generateNewTargets( newTargetList = append(newTargetList, target) } + hasSelectPrivOnAllTables := true + hasChangefeedPrivOnAllTables := true + for _, desc := range newTableDescs { + hasSelect, hasChangefeed, err := checkPrivilegesForDescriptor(ctx, p, desc) + if err != nil { + return nil, nil, hlc.Timestamp{}, nil, err + } + hasSelectPrivOnAllTables = hasSelectPrivOnAllTables && hasSelect + hasChangefeedPrivOnAllTables = hasChangefeedPrivOnAllTables && hasChangefeed + } + if err := verifyUserCanCreateChangefeed(ctx, p, sinkURI, hasSelectPrivOnAllTables, hasChangefeedPrivOnAllTables); err != nil { + return nil, nil, hlc.Timestamp{}, nil, err + } + if err := validateNewTargets(ctx, p, newTargetList, newJobProgress, newJobStatementTime); err != nil { return nil, nil, hlc.Timestamp{}, nil, err } diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 367c0966f030..a673397111fb 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -10,19 +10,25 @@ package changefeedccl import ( "context" + gosql "database/sql" "fmt" + "net/url" "sync/atomic" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/tests" @@ -40,6 +46,155 @@ import ( "github.com/stretchr/testify/require" ) +// TestAlterChangefeedAddTargetPrivileges tests permissions for +// users creating new changefeeds while altering them. +func TestAlterChangefeedAddTargetPrivileges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + DistSQL: &execinfra.TestingKnobs{ + Changefeed: &TestingKnobs{ + WrapSink: func(s Sink, _ jobspb.JobID) Sink { + if _, ok := s.(*externalConnectionKafkaSink); ok { + return s + } + return &externalConnectionKafkaSink{sink: s} + }, + }, + }, + }, + }) + ctx := context.Background() + s := srv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + + rootDB := sqlutils.MakeSQLRunner(db) + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_c (id int, type type_a)`) + rootDB.Exec(t, `CREATE USER feedCreator`) + rootDB.Exec(t, `GRANT SELECT ON table_a TO feedCreator`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO feedCreator`) + rootDB.Exec(t, `CREATE EXTERNAL CONNECTION "first" AS 'kafka://nope'`) + rootDB.Exec(t, `GRANT USAGE ON EXTERNAL CONNECTION first TO feedCreator`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) + + rootDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + enableEnterprise := utilccl.TestingDisableEnterprise() + enableEnterprise() + + withUser := func(t *testing.T, user string, fn func(*sqlutils.SQLRunner)) { + password := `password` + rootDB.Exec(t, fmt.Sprintf(`ALTER USER %s WITH PASSWORD '%s'`, user, password)) + + pgURL := url.URL{ + Scheme: "postgres", + User: url.UserPassword(user, password), + Host: s.SQLAddr(), + } + db2, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) + + fn(userDB) + } + + t.Run("using-changefeed-grant", func(t *testing.T) { + rootDB.Exec(t, `CREATE EXTERNAL CONNECTION "second" AS 'kafka://nope'`) + rootDB.Exec(t, `CREATE USER user1`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO user1`) + + var jobID int + withUser(t, "feedCreator", func(userDB *sqlutils.SQLRunner) { + row := userDB.QueryRow(t, "CREATE CHANGEFEED for table_a INTO 'external://first'") + row.Scan(&jobID) + userDB.Exec(t, `PAUSE JOB $1`, jobID) + waitForJobStatus(userDB, t, catpb.JobID(jobID), `paused`) + }) + + // user1 is missing the CHANGEFEED privilege on table_b and table_c. + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO user1`) + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_c TO user1`) + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + + // With require_external_connection_sink enabled, the user requires USAGE on the external connection. + rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.require_external_connection_sink = true") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 does not have USAGE privilege on external_connection second", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + rootDB.Exec(t, `GRANT USAGE ON EXTERNAL CONNECTION second TO user1`) + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.require_external_connection_sink = false") + }) + + // TODO(#94757): remove CONTROLCHANGEFEED entirely + t.Run("using-controlchangefeed-roleoption", func(t *testing.T) { + rootDB.Exec(t, `CREATE USER user2 WITH CONTROLCHANGEFEED`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO user2`) + rootDB.Exec(t, `GRANT SELECT ON table_a TO user2`) + + var jobID int + withUser(t, "feedCreator", func(userDB *sqlutils.SQLRunner) { + row := userDB.QueryRow(t, "CREATE CHANGEFEED for table_a INTO 'kafka://foo'") + row.Scan(&jobID) + userDB.Exec(t, `PAUSE JOB $1`, jobID) + waitForJobStatus(userDB, t, catpb.JobID(jobID), `paused`) + }) + + // user2 is missing the SELECT privilege on table_b and table_c. + withUser(t, "user2", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "pq: user user2 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID), + ) + }) + rootDB.Exec(t, `GRANT SELECT ON table_b TO user2`) + withUser(t, "user2", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "pq: user user2 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID), + ) + }) + rootDB.Exec(t, `GRANT SELECT ON table_c TO user2`) + withUser(t, "user2", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID), + ) + }) + }) +} + func TestAlterChangefeedAddTarget(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1419,3 +1574,73 @@ func TestAlterChangefeedWithOldCursorFromCreateChangefeed(t *testing.T) { cdcTest(t, testFn, feedTestEnterpriseSinks) } + +// TestChangefeedJobControl tests if a user can modify and existing changefeed +// based on their privileges. +func TestAlterChangefeedAccessControl(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ChangefeedJobPermissionsTestSetup(t, s) + rootDB := sqlutils.MakeSQLRunner(s.DB) + + createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { + successfulFeed := feed(t, f, stmt) + closeCf := func() { + closeFeed(t, successfulFeed) + } + _, err := successfulFeed.Next() + require.NoError(t, err) + return successfulFeed.(cdctest.EnterpriseTestFeed), closeCf + } + + // Create a changefeed and pause it. + var currentFeed cdctest.EnterpriseTestFeed + var closeCf func() + asUser(t, f, `feedCreator`, func(_ *sqlutils.SQLRunner) { + currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + }) + rootDB.Exec(t, "PAUSE job $1", currentFeed.JobID()) + waitForJobStatus(rootDB, t, currentFeed.JobID(), `paused`) + + // Verify who can modify the existing changefeed. + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP table_b`, currentFeed.JobID())) + }) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + // jobController can access the job, but will hit an error re-creating the changefeed. + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user jobcontroller requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", fmt.Sprintf(`ALTER CHANGEFEED %d DROP table_b`, currentFeed.JobID())) + }) + asUser(t, f, `userWithSomeGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user userwithsomegrants does not have CHANGEFEED privilege on relation table_b", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + asUser(t, f, `regularUser`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user regularuser does not have CHANGEFEED privilege on relation (table_a|table_b)", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + closeCf() + + // No one can modify changefeeds created by admins, except for admins. + // In this case, the root user creates the changefeed. + currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "PAUSE job $1", currentFeed.JobID()) + require.NoError(t, currentFeed.WaitForStatus(func(s jobs.Status) bool { + return s == jobs.StatusPaused + })) + }) + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + closeCf() + } + + // Only enterprise sinks create jobs. + cdcTest(t, testFn, feedTestEnterpriseSinks) +} diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index e77a2feb2cb8..7555a3595208 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//pkg/sql/catalog", "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", + "//pkg/testutils/sqlutils", "//pkg/util", "//pkg/util/fsm", "//pkg/util/hlc", diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index d61124291778..91e52d5d13dd 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -21,12 +22,11 @@ type TestFeedFactory interface { // Feed creates a new TestFeed. Feed(create string, args ...interface{}) (TestFeed, error) - // AsUser connects to the database as the specified user, - // calls fn(), then goes back to using the same root - // connection. Will return an error if the initial connection - // to the database fails, but fn is responsible for failing - // the test on other errors. - AsUser(user string, fn func()) error + // AsUser connects to the database as the specified user, calls fn() with the + // user's connection, then goes back to using the same root connection. Will + // return an error if the initial connection to the database fails, but fn is + // responsible for failing the test on other errors. + AsUser(user string, fn func(runner *sqlutils.SQLRunner)) error } // TestFeedMessage represents one row update or resolved timestamp message from diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 9c43e5907ac6..bbb09fd61047 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -22,6 +22,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/featureflag" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -45,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/asof" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -390,8 +393,18 @@ func createChangefeedJobRecord( statementTime = initialHighWater } + checkPrivs := true if !changefeedStmt.alterChangefeedAsOf.IsEmpty() { statementTime = changefeedStmt.alterChangefeedAsOf + // When altering a changefeed, we generate target descriptors below + // based on a timestamp in the past. For example, this may be the + // last highwater timestamp of a paused changefeed. + // This is a problem because any privilege checks done on these + // descriptors will be out of date. + // To solve this problem, we validate the descriptors + // in the alterChangefeedPlanHook at the statement time. + // Thus, we can skip the check here. + checkPrivs = false } endTime := hlc.Timestamp{} @@ -429,7 +442,8 @@ func createChangefeedJobRecord( } targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, changefeedStmt.Targets, - changefeedStmt.originalSpecs, opts.ShouldUseFullStatementTimeName()) + changefeedStmt.originalSpecs, opts.ShouldUseFullStatementTimeName(), sinkURI) + if err != nil { return nil, err } @@ -442,6 +456,8 @@ func createChangefeedJobRecord( TargetSpecifications: targets, } specs := AllTargets(details) + hasSelectPrivOnAllTables := true + hasChangefeedPrivOnAllTables := true for _, desc := range targetDescs { if table, isTable := desc.(catalog.TableDescriptor); isTable { if err := changefeedvalidators.ValidateTable(specs, table, tolerances); err != nil { @@ -450,6 +466,18 @@ func createChangefeedJobRecord( for _, warning := range changefeedvalidators.WarningsForTable(table, tolerances) { p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning)) } + + hasSelect, hasChangefeed, err := checkPrivilegesForDescriptor(ctx, p, desc) + if err != nil { + return nil, err + } + hasSelectPrivOnAllTables = hasSelectPrivOnAllTables && hasSelect + hasChangefeedPrivOnAllTables = hasChangefeedPrivOnAllTables && hasChangefeed + } + } + if checkPrivs { + if err := verifyUserCanCreateChangefeed(ctx, p, sinkURI, hasSelectPrivOnAllTables, hasChangefeedPrivOnAllTables); err != nil { + return nil, err } } @@ -704,23 +732,12 @@ func getTargetsAndTables( rawTargets tree.ChangefeedTargets, originalSpecs map[tree.ChangefeedTarget]jobspb.ChangefeedTargetSpecification, fullTableName bool, + sinkURI string, ) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) { tables := make(jobspb.ChangefeedTargets, len(targetDescs)) targets := make([]jobspb.ChangefeedTargetSpecification, len(rawTargets)) seen := make(map[jobspb.ChangefeedTargetSpecification]tree.ChangefeedTarget) - hasControlChangefeed, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED) - if err != nil { - return nil, nil, err - } - - var requiredPrivilegePerTable privilege.Kind - if hasControlChangefeed { - requiredPrivilegePerTable = privilege.SELECT - } else { - requiredPrivilegePerTable = privilege.CHANGEFEED - } - for i, ct := range rawTargets { desc, ok := targetDescs[ct.TableName] if !ok { @@ -731,10 +748,6 @@ func getTargetsAndTables( return nil, nil, errors.Errorf(`CHANGEFEED cannot target %s`, tree.AsString(&ct)) } - if err := p.CheckPrivilege(ctx, desc, requiredPrivilegePerTable); err != nil { - return nil, nil, errors.WithHint(err, `Users with CONTROLCHANGEFEED need SELECT, other users need CHANGEFEED.`) - } - if spec, ok := originalSpecs[ct]; ok { targets[i] = spec if table, ok := tables[td.GetID()]; ok { @@ -782,9 +795,127 @@ func getTargetsAndTables( } seen[targets[i]] = ct } + return targets, tables, nil } +func checkPrivilegesForDescriptor( + ctx context.Context, p sql.PlanHookState, desc catalog.Descriptor, +) (hasSelect bool, hasChangefeed bool, err error) { + if desc.GetObjectType() != privilege.Table { + return false, false, errors.AssertionFailedf("expected descriptor %d to be a table descriptor. instead found: %s ", desc.GetID(), desc.GetObjectType()) + } + + hasSelect, hasChangefeed = true, true + if err = p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil { + if !sql.IsInsufficientPrivilegeError(err) { + return false, false, err + } + hasSelect = false + } + if err = p.CheckPrivilege(ctx, desc, privilege.CHANGEFEED); err != nil { + if !sql.IsInsufficientPrivilegeError(err) { + return false, false, err + } + hasChangefeed = false + } + return hasSelect, hasChangefeed, nil +} + +// verifyUserCanCreateChangefeed performs changefeed creation privilege checks, returning a +// pgcode.InsufficientPrivilege error if the check fails. +// +// TODO(#94757): remove CONTROLCHANGEFEED entirely +// Admins can create any kind of changefeed. For non-admins: +// - The first check which is performed is checking if a user has CONTROLCHANGEFEED. If so, +// we enforce that they require privilege.SELECT on all target tables. Such as user +// can use any sink. +// - To create a core changefeed, a user requires privilege.SELECT on all targeted tables. +// - To create an enterprise changefeed, the user requires privilege.CHANGEFEED on all tables. +// If changefeedbase.EnforceExternalConnectionsForChangefeedPriv is enabled, then the changefeed +// must be used with an external connection and the user requires privilege.USAGE on it. +func verifyUserCanCreateChangefeed( + ctx context.Context, + p sql.PlanHookState, + sinkURI string, + hasSelectPrivOnAllTables bool, + hasChangefeedPrivOnAllTables bool, +) error { + isAdmin, err := p.HasAdminRole(ctx) + if err != nil { + return err + } + if isAdmin { + return nil + } + + hasControlChangefeed, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED) + if err != nil { + return err + } + if hasControlChangefeed { + if !hasSelectPrivOnAllTables { + return pgerror.Newf(pgcode.InsufficientPrivilege, + "user %s with %s role option requires the %s privilege on all target tables to be able to run an enterprise changefeed", + p.User(), roleoption.CONTROLCHANGEFEED, privilege.SELECT) + } + p.BufferClientNotice(ctx, pgnotice.Newf("You are creating a changefeed as a user with the %s role option. %s", + roleoption.CONTROLCHANGEFEED, roleoption.ControlChangefeedDeprecationNoticeMsg)) + return nil + } + + if sinkURI == "" { + if !hasSelectPrivOnAllTables { + return pgerror.Newf(pgcode.InsufficientPrivilege, + `user %s requires the %s privilege on all target tables to be able to run a core changefeed`, + p.User(), privilege.SELECT) + } + return nil + } + + if !hasChangefeedPrivOnAllTables { + return pgerror.Newf(pgcode.InsufficientPrivilege, + `user %s requires the %s privilege on all target tables to be able to run an enterprise changefeed`, + p.User(), privilege.CHANGEFEED) + } + + // Version gate for external connections. + enforceExternalConnections := changefeedbase.RequireExternalConnectionSink.Get(&p.ExecCfg().Settings.SV) + if enforceExternalConnections && !p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V22_2SystemExternalConnectionsTable) { + return errors.WithHintf(pgerror.Newf(pgcode.FeatureNotSupported, + "version %v must be finalized to create an External Connection", + clusterversion.ByKey(clusterversion.V22_2SystemExternalConnectionsTable)), + `the %s privilege on all tables is only sufficient for external connection sinks`, privilege.CHANGEFEED) + } + + if enforceExternalConnections { + url, err := url.Parse(sinkURI) + if err != nil { + return errors.Newf("failed to parse url %s", sinkURI) + } + if url.Scheme == changefeedbase.SinkSchemeExternalConnection { + ec, err := externalconn.LoadExternalConnection(ctx, url.Host, p.ExecCfg().InternalExecutor, p.Txn()) + if err != nil { + return errors.Wrap(err, "failed to load external connection object") + } + ecPriv := &syntheticprivilege.ExternalConnectionPrivilege{ + ConnectionName: ec.ConnectionName(), + } + if err := p.CheckPrivilege(ctx, ecPriv, privilege.USAGE); err != nil { + return err + } + } else { + return pgerror.Newf( + pgcode.InsufficientPrivilege, + `the %s privilege on all tables can only be used with external connection sinks. see cluster setting %s`, + privilege.CHANGEFEED, changefeedbase.RequireExternalConnectionSink.Key(), + ) + } + } + + return nil +} + func validateSink( ctx context.Context, p sql.PlanHookState, diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 566ae519327a..56cfe660ec24 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -2455,18 +2455,17 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) { cdcTest(t, testFn) } -func TestChangefeedAuthorization(t *testing.T) { +func TestCoreChangefeedRequiresSelectPrivilege(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { rootDB := sqlutils.MakeSQLRunner(s.DB) - rootDB.Exec(t, `create user guest`) - rootDB.Exec(t, `create user feedcreator with controlchangefeed`) - rootDB.Exec(t, `create type type_a as enum ('a')`) - rootDB.Exec(t, `create table table_a (id int, type type_a)`) - rootDB.Exec(t, `create table table_b (id int, type type_a)`) - rootDB.Exec(t, `insert into table_a(id) values (0)`) + rootDB.Exec(t, `CREATE USER user1`) + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) expectSuccess := func(stmt string) { successfulFeed := feed(t, f, stmt) @@ -2475,30 +2474,163 @@ func TestChangefeedAuthorization(t *testing.T) { require.NoError(t, err) } - // Users with CONTROLCHANGEFEED need SELECT privileges as well. - asUser(t, f, `feedcreator`, func() { + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a`, - `user feedcreator does not have SELECT privilege on relation table_a`) + `user user1 requires the SELECT privilege on all target tables to be able to run a core changefeed`) + }) + rootDB.Exec(t, `GRANT SELECT ON table_a TO user1`) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectSuccess(`CREATE CHANGEFEED FOR table_a`) + }) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b`, + `user user1 requires the SELECT privilege on all target tables to be able to run a core changefeed`) }) - rootDB.Exec(t, `GRANT SELECT ON table_a TO guest`) - rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO guest`) - rootDB.Exec(t, `GRANT SELECT ON table_a TO feedcreator`) - - // Users without the controlchangefeed role option need the CHANGEFEED privilege - // on every referenced table. - asUser(t, f, `guest`, func() { - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b -- as guest`, - `CHANGEFEED privilege`) + rootDB.Exec(t, `GRANT SELECT ON table_b TO user1`) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectSuccess(`CREATE CHANGEFEED FOR table_a, table_b`) }) + } + cdcTest(t, testFn, feedTestForceSink("sinkless")) +} - // Users with controlchangefeed need the SELECT privilege on every table. - asUser(t, f, `feedcreator`, func() { - expectSuccess(`CREATE CHANGEFEED FOR table_a`) +// TODO(#94757): remove CONTROLCHANGEFEED entirely +func TestControlChangefeedRoleOption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b -- as feedcreator`, - `user feedcreator does not have SELECT privilege on relation table_b`) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + rootDB := sqlutils.MakeSQLRunner(s.DB) + rootDB.Exec(t, `CREATE USER user1 WITH CONTROLCHANGEFEED`) + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) + + expectSuccess := func(stmt string) { + successfulFeed := feed(t, f, stmt) + defer closeFeed(t, successfulFeed) + _, err := successfulFeed.Next() + require.NoError(t, err) + } + + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b`, + `pq: user user1 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed`) + }) + rootDB.Exec(t, `GRANT SELECT ON table_a TO user1`) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b`, + `pq: user user1 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed`) }) + rootDB.Exec(t, `GRANT SELECT ON table_b TO user1`) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectSuccess(`CREATE CHANGEFEED FOR table_a`) + }) + } + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +func TestChangefeedCreateAuthorizationWithChangefeedPriv(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + DistSQL: &execinfra.TestingKnobs{ + Changefeed: &TestingKnobs{ + WrapSink: func(s Sink, _ jobspb.JobID) Sink { + if _, ok := s.(*externalConnectionKafkaSink); ok { + return s + } + return &externalConnectionKafkaSink{sink: s} + }, + }, + }, + }, + }) + ctx := context.Background() + s := srv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + + rootDB := sqlutils.MakeSQLRunner(db) + rootDB.Exec(t, `CREATE USER user1`) + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) + rootDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + enableEnterprise := utilccl.TestingDisableEnterprise() + enableEnterprise() + + withUser := func(t *testing.T, user string, fn func(*sqlutils.SQLRunner)) { + password := `password` + rootDB.Exec(t, fmt.Sprintf(`ALTER USER %s WITH PASSWORD '%s'`, user, password)) + + pgURL := url.URL{ + Scheme: "postgres", + User: url.UserPassword(user, password), + Host: s.SQLAddr(), + } + db2, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) + + fn(userDB) + } + + rootDB.Exec(t, `CREATE EXTERNAL CONNECTION "nope" AS 'kafka://nope'`) + + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + "CREATE CHANGEFEED for table_a, table_b INTO 'external://nope'", + ) + }) + rootDB.Exec(t, "GRANT CHANGEFEED ON table_a TO user1") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + "CREATE CHANGEFEED for table_a, table_b INTO 'external://nope'", + ) + }) + rootDB.Exec(t, "GRANT CHANGEFEED ON table_b TO user1") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + "CREATE CHANGEFEED for table_a, table_b INTO 'external://nope'", + ) + }) + + // With enforce_external_connections enabled, the user requires USAGE on the external connection. + rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.enforce_external_connections = true") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "pq: the CHANGEFEED privilege on all tables can only be used with external connection sinks", + "CREATE CHANGEFEED for table_a, table_b INTO 'kafka://nope'", + ) + }) + rootDB.Exec(t, "GRANT USAGE ON EXTERNAL CONNECTION nope to user1") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + "CREATE CHANGEFEED for table_a, table_b INTO 'external://nope'", + ) + }) + rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.enforce_external_connections = false") +} + +func TestChangefeedGrant(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + rootDB := sqlutils.MakeSQLRunner(s.DB) + rootDB.Exec(t, `create user guest`) // GRANT CHANGEFEED ON DATABASE is an error. rootDB.ExpectErr(t, `invalid privilege type CHANGEFEED for database`, `GRANT CHANGEFEED ON DATABASE d TO guest`) @@ -2510,17 +2642,6 @@ func TestChangefeedAuthorization(t *testing.T) { `INSERT INTO table_c values (0)`, ) - asUser(t, f, `guest`, func() { - expectSuccess(`CREATE CHANGEFEED FOR table_c`) - }) - - // GRANT CHANGEFED ON prefix.* grants CHANGEFEED on all current tables with that prefix. - rootDB.Exec(t, `GRANT CHANGEFEED ON d.public.* TO guest`) - rootDB.Exec(t, `GRANT SELECT ON d.* TO guest`) - asUser(t, f, `guest`, func() { - expectSuccess(`CREATE CHANGEFEED FOR table_c`) - }) - // SHOW GRANTS includes CHANGEFEED privileges. var count int rootDB.QueryRow(t, `select count(*) from [show grants] where privilege_type = 'CHANGEFEED';`).Scan(&count) @@ -2530,6 +2651,71 @@ func TestChangefeedAuthorization(t *testing.T) { cdcTest(t, testFn) } +// TestChangefeedJobControl tests if a user can control a changefeed +// based on their permissions. +func TestChangefeedJobControl(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ChangefeedJobPermissionsTestSetup(t, s) + + createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { + successfulFeed := feed(t, f, stmt) + closeCf := func() { + closeFeed(t, successfulFeed) + } + _, err := successfulFeed.Next() + require.NoError(t, err) + return successfulFeed.(cdctest.EnterpriseTestFeed), closeCf + } + + // Create a changefeed and assert who can control the job. + var currentFeed cdctest.EnterpriseTestFeed + var closeCf func() + asUser(t, f, `feedCreator`, func(_ *sqlutils.SQLRunner) { + currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + }) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "PAUSE job $1", currentFeed.JobID()) + waitForJobStatus(userDB, t, currentFeed.JobID(), "paused") + }) + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "RESUME job $1", currentFeed.JobID()) + waitForJobStatus(userDB, t, currentFeed.JobID(), "running") + }) + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "RESUME job $1", currentFeed.JobID()) + waitForJobStatus(userDB, t, currentFeed.JobID(), "running") + }) + asUser(t, f, `userWithSomeGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user userwithsomegrants does not have CHANGEFEED privilege on relation table_b", "PAUSE job $1", currentFeed.JobID()) + }) + asUser(t, f, `regularUser`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user regularuser does not have CHANGEFEED privilege on relation (table_a|table_b)", "PAUSE job $1", currentFeed.JobID()) + }) + closeCf() + + // No one can modify changefeeds created by admins, except for admins. + // In this case, the root user creates the changefeed. + currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "PAUSE job $1", currentFeed.JobID()) + waitForJobStatus(userDB, t, currentFeed.JobID(), "paused") + }) + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", "PAUSE job $1", currentFeed.JobID()) + }) + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", "PAUSE job $1", currentFeed.JobID()) + }) + closeCf() + } + + // Only enterprise sinks create jobs. + cdcTest(t, testFn, feedTestEnterpriseSinks) +} + func TestChangefeedColumnFamilyAvro(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 607acb0a35e1..2e6b01c76049 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -270,3 +270,14 @@ var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting( "determines whether changefeed event processing integrates with elastic CPU control", true, ) + +// RequireExternalConnectionSink is used to restrict non-admins with the CHANGEFEED privilege +// to create changefeeds to external connections only. +var RequireExternalConnectionSink = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.permissions.require_external_connection_sink", + "if enabled, this settings restricts users with the CHANGEFEED privilege"+ + " to create changefeeds with external connection sinks only."+ + " see https://www.cockroachlabs.com/docs/stable/create-external-connection.html", + false, +) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 70716810cc55..fa1b4a0f073b 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -630,7 +630,9 @@ func feed( return feed } -func asUser(t testing.TB, f cdctest.TestFeedFactory, user string, fn func()) { +func asUser( + t testing.TB, f cdctest.TestFeedFactory, user string, fn func(runner *sqlutils.SQLRunner), +) { t.Helper() require.NoError(t, f.AsUser(user, fn)) } @@ -1045,3 +1047,45 @@ func TestingSetIncludeParquetMetadata() func() { includeParquetTestMetadata = false } } + +// ChangefeedJobPermissionsTestSetup creates entities and users with various permissions +// for tests which test access control for changefeed jobs. +// +// This helper creates the following: +// +// UDT type_a +// TABLE table_a (with column type_a) +// TABLE table_b (with column type_a) +// USER adminUser (with admin privs) +// USER feedCreator (with CHANGEFEED priv on table_a and table_b) +// USER jobController (with the CONTROLJOB role option) +// USER userWithAllGrants (with CHANGEFEED on table_a and table b) +// USER userWithSomeGrants (with CHANGEFEED on table_a only) +// USER regularUser (with no privs) +func ChangefeedJobPermissionsTestSetup(t *testing.T, s TestServer) { + rootDB := sqlutils.MakeSQLRunner(s.DB) + + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) + rootDB.Exec(t, `INSERT INTO table_b(id) values (0)`) + + rootDB.Exec(t, `CREATE USER adminUser`) + rootDB.Exec(t, `GRANT ADMIN TO adminUser`) + + rootDB.Exec(t, `CREATE USER feedCreator`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO feedCreator`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO feedCreator`) + + rootDB.Exec(t, `CREATE USER jobController with CONTROLJOB`) + + rootDB.Exec(t, `CREATE USER userWithAllGrants`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO userWithAllGrants`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO userWithAllGrants`) + + rootDB.Exec(t, `CREATE USER userWithSomeGrants`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO userWithSomeGrants`) + + rootDB.Exec(t, `CREATE USER regularUser`) +} diff --git a/pkg/ccl/changefeedccl/scheduled_changefeed.go b/pkg/ccl/changefeedccl/scheduled_changefeed.go index 0f2d32ef574b..55895518f170 100644 --- a/pkg/ccl/changefeedccl/scheduled_changefeed.go +++ b/pkg/ccl/changefeedccl/scheduled_changefeed.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" - "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -547,13 +546,6 @@ func doCreateChangefeedSchedule( spec *scheduledChangefeedSpec, resultsCh chan<- tree.Datums, ) error { - hasControlChangefeed, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED) - if err != nil { - return err - } - if !hasControlChangefeed { - return errors.Newf("User needs CONTROLCHANGEFEED role to schedule changefeeds.") - } env := sql.JobSchedulerEnv(p.ExecCfg()) diff --git a/pkg/ccl/changefeedccl/scheduled_changefeed_test.go b/pkg/ccl/changefeedccl/scheduled_changefeed_test.go index 8aa43b15373e..01691e0fe453 100644 --- a/pkg/ccl/changefeedccl/scheduled_changefeed_test.go +++ b/pkg/ccl/changefeedccl/scheduled_changefeed_test.go @@ -21,13 +21,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/scheduledjobs/schedulebase" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -281,29 +284,54 @@ RECURRING '@hourly' WITH SCHEDULE OPTIONS on_execution_failure = 'pause', first_ } } -func TestCreateChangefeedScheduleRequiresChangefeedRole(t *testing.T) { +// TestCreateChangefeedScheduleChecksPermissionsDuringDryRun verifies +// that we perform a dry run of creating the changefeed (performs +// permissions checks). +func TestCreateChangefeedScheduleChecksPermissionsDuringDryRun(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - th, cleanup := newTestHelper(t) - defer cleanup() - - th.sqlDB.Exec(t, `CREATE USER testuser`) - pgURL, cleanupFunc := sqlutils.PGUrl( - t, th.server.ServingSQLAddr(), - "TestCreateSchedule-testuser", url.User("testuser"), - ) - defer cleanupFunc() - - testuser, err := gosql.Open("postgres", pgURL.String()) - require.NoError(t, err) - defer func() { - require.NoError(t, testuser.Close()) - }() + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + DistSQL: &execinfra.TestingKnobs{ + Changefeed: &TestingKnobs{ + WrapSink: func(s Sink, _ jobspb.JobID) Sink { + if _, ok := s.(*externalConnectionKafkaSink); ok { + return s + } + return &externalConnectionKafkaSink{sink: s} + }, + }, + }, + }, + }) + ctx := context.Background() + s := srv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + rootDB := sqlutils.MakeSQLRunner(db) + rootDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + enableEnterprise := utilccl.TestingDisableEnterprise() + enableEnterprise() + + rootDB.Exec(t, `CREATE TABLE table_a (i int)`) + rootDB.Exec(t, `CREATE USER testuser WITH PASSWORD 'test'`) + + pgURL := url.URL{ + Scheme: "postgres", + User: url.UserPassword("testuser", "test"), + Host: s.SQLAddr(), + } + db2, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) - _, err = testuser.Exec( - "CREATE SCHEDULE FOR CHANGEFEED TABLE system.jobs INTO 'somewhere' WITH initial_scan = 'only' RECURRING '@daily'") - require.Regexp(t, "needs CONTROLCHANGEFEED role", err) + userDB.ExpectErr(t, "Failed to dry run create changefeed: user testuser requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + "CREATE SCHEDULE FOR CHANGEFEED TABLE table_a INTO 'somewhere' WITH initial_scan = 'only' RECURRING '@daily'") } // TestCreateChangefeedScheduleIfNotExists: checks if adding IF NOT EXISTS will diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 52525a5c4e0d..50432acbeea8 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -13,6 +13,7 @@ import ( "fmt" "net/url" "sort" + "strconv" "strings" "testing" @@ -428,3 +429,45 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { // Force kafka to validate topics cdcTest(t, testFn, feedTestForceSink("kafka")) } + +func TestShowChangefeedJobsAuthorization(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ChangefeedJobPermissionsTestSetup(t, s) + + var jobID jobspb.JobID + createFeed := func(stmt string) { + successfulFeed := feed(t, f, stmt) + defer closeFeed(t, successfulFeed) + _, err := successfulFeed.Next() + require.NoError(t, err) + jobID = successfulFeed.(cdctest.EnterpriseTestFeed).JobID() + } + + // Create a changefeed and assert who can see it. + asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) { + createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + }) + expectedJobIDStr := strconv.Itoa(int(jobID)) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{{expectedJobIDStr}}) + }) + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{{expectedJobIDStr}}) + }) + asUser(t, f, `userWithSomeGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{}) + }) + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{{expectedJobIDStr}}) + }) + asUser(t, f, `regularUser`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{}) + }) + } + + // Only enterprise sinks create jobs. + cdcTest(t, testFn, feedTestEnterpriseSinks) +} diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index d42f8be30c3d..1fa532836641 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -74,7 +75,7 @@ func makeSinklessFeedFactory( return &sinklessFeedFactory{s: s, sink: sink, sinkForUser: sinkForUser} } -func (f *sinklessFeedFactory) AsUser(user string, fn func()) error { +func (f *sinklessFeedFactory) AsUser(user string, fn func(*sqlutils.SQLRunner)) error { prevSink := f.sink password := `hunter2` if err := setPassword(user, password, f.sink); err != nil { @@ -83,7 +84,19 @@ func (f *sinklessFeedFactory) AsUser(user string, fn func()) error { defer func() { f.sink = prevSink }() var cleanup func() f.sink, cleanup = f.sinkForUser(user, password) - fn() + pgconn := url.URL{ + Scheme: "postgres", + User: url.UserPassword(user, password), + Host: f.Server().SQLAddr(), + Path: `d`, + } + db2, err := gosql.Open("postgres", pgconn.String()) + if err != nil { + return err + } + defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) + fn(userDB) cleanup() return nil } @@ -665,7 +678,7 @@ func (e *enterpriseFeedFactory) jobsTableConn() *gosql.DB { // AsUser uses the previous (assumed to be root) connection to ensure // the user has the ability to authenticate, and saves it to poll // job status, then implements TestFeedFactory.AsUser(). -func (e *enterpriseFeedFactory) AsUser(user string, fn func()) error { +func (e *enterpriseFeedFactory) AsUser(user string, fn func(*sqlutils.SQLRunner)) error { prevDB := e.db e.rootDB = e.db defer func() { e.db = prevDB }() @@ -685,8 +698,10 @@ func (e *enterpriseFeedFactory) AsUser(user string, fn func()) error { return err } defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) + e.db = db2 - fn() + fn(userDB) return nil } diff --git a/pkg/ccl/logictestccl/testdata/logic_test/changefeed b/pkg/ccl/logictestccl/testdata/logic_test/changefeed index a6c12066df09..bec3b0165132 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/changefeed +++ b/pkg/ccl/logictestccl/testdata/logic_test/changefeed @@ -10,35 +10,42 @@ user root # Test granting CONTROLCHANGEFEED. statement ok -ALTER USER testuser CONTROLCHANGEFEED; GRANT CONNECT ON DATABASE test TO testuser +query T noticetrace +ALTER USER testuser CONTROLCHANGEFEED +---- +NOTICE: The role option CONTROLCHANGEFEED will be removed in a future release, please switch to using the CHANGEFEED privilege for target tables instead: https://www.cockroachlabs.com/docs/stable/create-changefeed.html#required-privileges + user testuser # We should pass the CONTROLCHANGEFEED permission check but error on missing # SELECT privileges. -statement error user testuser does not have SELECT privilege on relation t -CREATE CHANGEFEED FOR t +statement error pq: user testuser with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed +CREATE CHANGEFEED FOR t INTO 'null://sink' with initial_scan='only' -# Test revoking CONTROLCHANGEFEED. user root +# Test granting SELECT. statement ok -ALTER USER testuser NOCONTROLCHANGEFEED; GRANT SELECT ON TABLE t TO testuser user testuser -statement error user testuser does not have CHANGEFEED privilege on relation t -CREATE CHANGEFEED FOR t - -# The CHANGEFEED privilege can be granted granularly. +# Test the deprecation notice for CONTROLCHANGEFEED +query T noticetrace +CREATE CHANGEFEED FOR t INTO 'null://sink' with initial_scan='only' +---- +NOTICE: You are creating a changefeed as a user with the CONTROLCHANGEFEED role option. The role option CONTROLCHANGEFEED will be removed in a future release, please switch to using the CHANGEFEED privilege for target tables instead: https://www.cockroachlabs.com/docs/stable/create-changefeed.html#required-privileges +# Test revoking CONTROLCHANGEFEED. user root + statement ok -GRANT CHANGEFEED ON table t TO testuser +ALTER USER testuser NOCONTROLCHANGEFEED; +GRANT SELECT ON TABLE t TO testuser user testuser -statement ok -CREATE CHANGEFEED FOR t with initial_scan='only' +statement error user testuser requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed +CREATE CHANGEFEED FOR t INTO 'null://sink' with initial_scan='only' diff --git a/pkg/sql/alter_role.go b/pkg/sql/alter_role.go index dbe6f1d9b18d..624ded1da8c1 100644 --- a/pkg/sql/alter_role.go +++ b/pkg/sql/alter_role.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/paramparse" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -97,6 +98,10 @@ func (p *planner) AlterRoleNode( return nil, err } + if roleOptions.Contains(roleoption.CONTROLCHANGEFEED) { + p.BufferClientNotice(ctx, pgnotice.Newf(roleoption.ControlChangefeedDeprecationNoticeMsg)) + } + roleName, err := decodeusername.FromRoleSpec( p.SessionData(), username.PurposeValidation, roleSpec, ) diff --git a/pkg/sql/authorization.go b/pkg/sql/authorization.go index f72101f8727e..33aa05c2eb35 100644 --- a/pkg/sql/authorization.go +++ b/pkg/sql/authorization.go @@ -920,3 +920,9 @@ func insufficientPrivilegeError( "user %s does not have %s privilege on %s %s", user, kind, typeForError, object.GetName()) } + +// IsInsufficientPrivilegeError returns true if the error is a pgerror +// with code pgcode.InsufficientPrivilege. +func IsInsufficientPrivilegeError(err error) bool { + return pgerror.GetPGCode(err) == pgcode.InsufficientPrivilege +} diff --git a/pkg/sql/control_jobs.go b/pkg/sql/control_jobs.go index 0c3b86dd8f05..09ac2cced3de 100644 --- a/pkg/sql/control_jobs.go +++ b/pkg/sql/control_jobs.go @@ -16,9 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/errors" @@ -43,26 +40,6 @@ func (n *controlJobsNode) FastPathResults() (int, bool) { } func (n *controlJobsNode) startExec(params runParams) error { - userIsAdmin, err := params.p.HasAdminRole(params.ctx) - if err != nil { - return err - } - - // users can pause/resume/cancel jobs owned by non-admin users - // if they have CONTROLJOBS privilege. - if !userIsAdmin { - hasControlJob, err := params.p.HasRoleOption(params.ctx, roleoption.CONTROLJOB) - if err != nil { - return err - } - - if !hasControlJob { - return pgerror.Newf(pgcode.InsufficientPrivilege, - "user %s does not have %s privilege", - params.p.User(), roleoption.CONTROLJOB) - } - } - if n.desiredStatus != jobs.StatusPaused && len(n.reason) > 0 { return errors.AssertionFailedf("status %v is not %v and thus does not support a reason %v", n.desiredStatus, jobs.StatusPaused, n.reason) @@ -93,21 +70,14 @@ func (n *controlJobsNode) startExec(params runParams) error { return err } - if job != nil { - owner := job.Payload().UsernameProto.Decode() - - if !userIsAdmin { - ok, err := params.p.UserHasAdminRole(params.ctx, owner) - if err != nil { - return err - } - - // Owner is an admin but user executing the statement is not. - if ok { - return pgerror.Newf(pgcode.InsufficientPrivilege, - "only admins can control jobs owned by other admins") - } - } + payload := job.Payload() + canAccess, userErr, err := JobTypeSpecificPrivilegeCheck(params.ctx, params.p, + job.ID(), &payload, false) + if err != nil { + return err + } + if !canAccess { + return userErr } switch n.desiredStatus { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 7acc6f5add92..61272d1f94fb 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -945,16 +945,6 @@ func populateSystemJobsTableRows( matched := false - user := p.User() - userIsAdmin, err := p.UserHasAdminRole(ctx, user) - if err != nil { - return matched, err - } - userHasControlJobRoleOption, err := p.HasRoleOption(ctx, roleoption.CONTROLJOB) - if err != nil { - return matched, err - } - // Note: we query system.jobs as root, so we must be careful about which rows we return. it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx(ctx, "system-jobs-scan", @@ -988,24 +978,22 @@ func populateSystemJobsTableRows( } currentRow := it.Cur() + jobID, err := strconv.Atoi(currentRow[jobIdIdx].String()) + if err != nil { + return matched, err + } payloadBytes := currentRow[jobPayloadIdx] payload, err := jobs.UnmarshalPayload(payloadBytes) if err != nil { return matched, wrapPayloadUnMarshalError(err, currentRow[jobIdIdx]) } - jobOwnerUser := payload.UsernameProto.Decode() - jobOwnerIsAdmin, err := p.UserHasAdminRole(ctx, jobOwnerUser) + + canAccess, _, err := JobTypeSpecificPrivilegeCheck(ctx, p, jobspb.JobID(jobID), payload, true) + if err != nil { return matched, err } - - // The user can access the row if the meet one of the conditions: - // 1. The user is an admin. - // 2. The job is owned by the user. - // 3. The user has CONTROLJOB privilege and the job is not owned by - // an admin. - if canAccess := userIsAdmin || (!jobOwnerIsAdmin && - userHasControlJobRoleOption) || user == jobOwnerUser; !canAccess { + if !canAccess { continue } @@ -1021,6 +1009,84 @@ func wrapPayloadUnMarshalError(err error, jobID tree.Datum) error { " consider deleting this job from system.jobs", jobID) } +// changefeedPrivilegeCheck determines if a user has access to the changefeed defined +// by the supplied payload. +func changefeedPrivilegeCheck( + ctx context.Context, p PlanHookState, payload *jobspb.Payload, +) (canAccess bool, userErr error, err error) { + privToCheck := privilege.CHANGEFEED + + specs := payload.UnwrapDetails().(jobspb.ChangefeedDetails).TargetSpecifications + + for _, spec := range specs { + tableDesc, err := p.(*planner).LookupTableByID(ctx, spec.TableID) + if err != nil { + return false, nil, err + } + + // We must distinguish between errors due to the privilege + // check failing and other errors. + err = p.CheckPrivilege(ctx, tableDesc, privToCheck) + if err != nil { + // Return the privilege error as a user error. + if IsInsufficientPrivilegeError(err) { + return false, err, nil + } + return false, nil, err + } + } + return true, nil, nil +} + +// JobTypeSpecificPrivilegeCheck returns true if the user should be able to access +// the job. If the returned value is false and err is nil, then userErr will be +// returned with an appropriate error that can be passed up to the user. +// allowSameUserAccess specifies if users can access their own jobs. +func JobTypeSpecificPrivilegeCheck( + ctx context.Context, + p PlanHookState, + jobID jobspb.JobID, + payload *jobspb.Payload, + allowSameUserAccess bool, +) (canAccess bool, userErr error, err error) { + userIsAdmin, err := p.HasAdminRole(ctx) + if err != nil { + return false, nil, err + } + + userHasControlJob, err := p.HasRoleOption(ctx, roleoption.CONTROLJOB) + if err != nil { + return false, nil, err + } + + jobOwnerUser := payload.UsernameProto.Decode() + jobOwnerIsAdmin, err := p.UserHasAdminRole(ctx, jobOwnerUser) + if err != nil { + return false, nil, err + } + + if jobOwnerIsAdmin { + if !userIsAdmin { + return false, pgerror.Newf(pgcode.InsufficientPrivilege, + "only admins can control jobs owned by other admins"), nil + } + return true, nil, nil + } + + if userHasControlJob || (allowSameUserAccess && p.User() == jobOwnerUser) { + return true, nil, nil + } + + switch payload.Type() { + case jobspb.TypeChangefeed: + return changefeedPrivilegeCheck(ctx, p, payload) + default: + return false, pgerror.Newf(pgcode.InsufficientPrivilege, + "user %s does not have %s privilege for job $d", + p.User(), roleoption.CONTROLJOB, jobID), nil + } +} + const ( jobsQSelect = `SELECT id, status, created, payload, progress, claim_session_id, claim_instance_id` // Note that we are querying crdb_internal.system_jobs instead of system.jobs directly. diff --git a/pkg/sql/create_role.go b/pkg/sql/create_role.go index edda320745f3..465083bf8ad5 100644 --- a/pkg/sql/create_role.go +++ b/pkg/sql/create_role.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/decodeusername" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -77,6 +78,10 @@ func (p *planner) CreateRoleNode( return nil, err } + if roleOptions.Contains(roleoption.CONTROLCHANGEFEED) { + p.BufferClientNotice(ctx, pgnotice.Newf(roleoption.ControlChangefeedDeprecationNoticeMsg)) + } + if err := roleOptions.CheckRoleOptionConflicts(); err != nil { return nil, err } diff --git a/pkg/sql/delegate/show_changefeed_jobs.go b/pkg/sql/delegate/show_changefeed_jobs.go index 45c45e294874..748d12738bf1 100644 --- a/pkg/sql/delegate/show_changefeed_jobs.go +++ b/pkg/sql/delegate/show_changefeed_jobs.go @@ -13,7 +13,6 @@ package delegate import ( "fmt" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" ) @@ -33,7 +32,8 @@ WITH payload AS ( payload, false, true )->'changefeed' AS changefeed_details FROM - system.jobs + crdb_internal.system_jobs + WHERE job_type = 'CHANGEFEED' ) SELECT job_id, @@ -70,20 +70,18 @@ FROM ) var whereClause, orderbyClause string - typePredicate := fmt.Sprintf("job_type = '%s'", jobspb.TypeChangefeed) - if n.Jobs == nil { // The query intends to present: // - first all the running jobs sorted in order of start time, // - then all completed jobs sorted in order of completion time. - whereClause = fmt.Sprintf( - `WHERE %s AND (finished IS NULL OR finished > now() - '12h':::interval)`, typePredicate) + whereClause = + `WHERE (finished IS NULL OR finished > now() - '12h':::interval)` // The "ORDER BY" clause below exploits the fact that all // running jobs have finished = NULL. orderbyClause = `ORDER BY COALESCE(finished, now()) DESC, started DESC` } else { // Limit the jobs displayed to the select statement in n.Jobs. - whereClause = fmt.Sprintf(`WHERE %s AND job_id in (%s)`, typePredicate, n.Jobs.String()) + whereClause = fmt.Sprintf(`WHERE job_id in (%s)`, n.Jobs.String()) } sqlStmt := fmt.Sprintf("%s %s %s", selectClause, whereClause, orderbyClause) diff --git a/pkg/sql/logictest/testdata/logic_test/jobs b/pkg/sql/logictest/testdata/logic_test/jobs index b88a951e1892..2d40bfb2e407 100644 --- a/pkg/sql/logictest/testdata/logic_test/jobs +++ b/pkg/sql/logictest/testdata/logic_test/jobs @@ -175,8 +175,14 @@ SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCH user testuser # testuser should no longer have the ability to control jobs. -statement error pq: user testuser does not have CONTROLJOB privilege -PAUSE JOB (SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC' AND description LIKE 'GC for DROP%') +statement error pq: user testuser does not have CONTROLJOB privilege for job $job_id +PAUSE JOB (SELECT $job_id) + +statement error pq: user testuser does not have CONTROLJOB privilege for job $job_id +CANCEL JOB (SELECT $job_id) + +statement error pq: user testuser does not have CONTROLJOB privilege for job $job_id +RESUME JOB (SELECT $job_id) user root diff --git a/pkg/sql/logictest/testdata/logic_test/run_control b/pkg/sql/logictest/testdata/logic_test/run_control index 3582529fc4aa..25cc0159a3d7 100644 --- a/pkg/sql/logictest/testdata/logic_test/run_control +++ b/pkg/sql/logictest/testdata/logic_test/run_control @@ -98,17 +98,6 @@ CANCEL SESSION 'aaa'::NAME query error odd length hex string CANCEL QUERY 'aaa'::NAME -user testuser - -query error pq: user testuser does not have CONTROLJOB privilege -CANCEL JOB 1 - -query error pq: user testuser does not have CONTROLJOB privilege -PAUSE JOB 1 - -query error pq: user testuser does not have CONTROLJOB privilege -RESUME JOB 1 - user root query T rowsort diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index c973ccf19cc9..ebec8ad8d9f6 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -515,6 +515,7 @@ func (p *planner) EvalContext() *eval.Context { return &p.extendedEvalCtx.Context } +// Descriptors implements the PlanHookState interface. func (p *planner) Descriptors() *descs.Collection { return p.extendedEvalCtx.Descs } diff --git a/pkg/sql/roleoption/role_option.go b/pkg/sql/roleoption/role_option.go index 0c30d98b06a9..66240bce37e6 100644 --- a/pkg/sql/roleoption/role_option.go +++ b/pkg/sql/roleoption/role_option.go @@ -68,6 +68,13 @@ const ( NOVIEWCLUSTERSETTING ) +// ControlChangefeedDeprecationNoticeMsg is a user friendly notice which should be shown when CONTROLCHANGEFEED is used +// +// TODO(#94757): remove CONTROLCHANGEFEED entirely +const ControlChangefeedDeprecationNoticeMsg = "The role option CONTROLCHANGEFEED will be removed in a future release," + + " please switch to using the CHANGEFEED privilege for target tables instead:" + + " https://www.cockroachlabs.com/docs/stable/create-changefeed.html#required-privileges" + // toSQLStmts is a map of Kind -> SQL statement string for applying the // option to the role. var toSQLStmts = map[Option]string{