From ba3a621d00419c9201938e911005838e552e039c Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 6 Jan 2023 11:23:47 -0500 Subject: [PATCH] changefeedccl: add new fine-grained permissions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change updates permissions semantics related to creating and managing changefeeds. The `CONTROLCHANGEFEED` role option will be deprecated in the future (see https://github.com/cockroachdb/cockroach/issues/94757). With this change, usages of `CONTROLCHANGEFEED` will come with a deprecation warning. Its existing behavior (see rules for creating changefeeds below) remains the same. The `SELECT` and `CHANGEFEED` privileges will be used for changefeeds henceforth: The `SELECT` privilege on a set of tables allows a user to run core changefeeds against them. The `CHANGEFEED` privilege on a set of tables allows a user to run enterprise changefeeds on them, and also manage the underlying changefeed job (ie. view, pause, cancel, and resume the job). Notably, a new cluster setting `changefeed.permissions.enforce_external_connections` is added and set to `false` by default. Enabling this setting restricts users with `CHANGEFEED` on a set of tables to create enterprise changefeeds into external connections only. To use a given external connection, a user typically needs the `USAGE` privilege on it. Note `ALTER DEFAULT PRIVILEGES` can be used with both the `CHANGEFEED` and `SELECT` privileges to assign course-grained permissions (ie. assign permissions to all tables in a schema rather than manually assign them for each table). Before this change, to create a changefeed, these checks were made in order: (a) If the user has the `CONTROLCHANGEFEED` role, then they require the `SELECT`privilege on all targeted tables (b) Otherwise, the user requires the `CHANGEFEED` privilege on all targeted tables With this change, these checks are updated: (a) If the user has the `CONTROLCHANGEFEED` role, then they require the `SELECT` privilege on all targeted tables. Note: creating a changefeed this way will now produce a deprecation notice. (b) If the changefeed is a core changefeed, they require the `SELECT` privilege on all targeted tables (c) Otherwise, the user requires the `CHANGEFEED` privilege on all targeted tables. Note: If `changefeed.permissions.enforce_external_connections` (disabled by default) is set to true, then the user will only be able to create a changefeed into an external connection which they have the `USAGE` privilege on. Before this change, to manage a changefeed job `J` (defined a viewing, pausing, resuming, and canceling), a user `U` could do so if they met at least one of the following conditions: (a) `U` is an admin (b) `U` is not an admin and `J` is owned by `U` (only for SHOW JOBS) (c) `U` is not an admin, `J` is not owned by an admin, and `U` has the `CONTROLJOB` role With this change, the conditions are updated: (a) `U` is an admin (b) `U` is not an admin and `J` is owned by `U` (only for `SHOW JOBS` or `SHOW CHANGEFEED JOBS`) (c) `U` is not an admin, `J` is not owned by an admin, and `U` has the `CONTROLJOB` role (d) `U` is not an admin, `J` is not owned by an admin, `J` is a changefeed job, and `U` has the `CHANGEFEED` privilege on targeted tables Before this change, permissions related to altering changefeeds with `ALTER CHANGEFEED` were not explicitly defined (we did not have tests to assert its behavior, but there were some permissions checks regardless). Basically, a user needed access to view a job (ie. look up it’s job ID via `SHOW JOBS`) and they needed to be able to create a new job. After all, `ALTER CHANGEFEED` is essentially the same as creating a new job after stopping the old one. With this change, the same rules apply: the user needs to be able to access the existing job and to be able to create a new changefeed with the new rules introduced in this change respectively. Fixes: https://github.com/cockroachdb/cockroach/issues/94756 Fixes: https://github.com/cockroachdb/cockroach/issues/92261 Fixes: https://github.com/cockroachdb/cockroach/issues/87884 Fixes: https://github.com/cockroachdb/cockroach/issues/85082 Informs: https://github.com/cockroachdb/cockroach/issues/94759 Informs: https://github.com/cockroachdb/cockroach/issues/94757 Epic: CRDB-21508 Epic: CRDB-19709 Release note (enterprise change): The `CONTROLCHANGEFEED` role option will be deprecated in the future (see https://github.com/cockroachdb/cockroach/issues/94757). With this change, usages of `CONTROLCHANGEFEED` will come with a deprecation warning. Its existing behavior (see rules for creating changefeeds above) remains the same. The `SELECT` and `CHANGEFEED` privileges will be used for changefeeds henceforth: The `SELECT` privilege on a set of tables allows a user to run core changefeeds against them. The `CHANGEFEED` privilege on a set of tables allows a user to run enterprise changefeeds on them, and also manage the underlying changefeed job (ie. view, pause, cancel, and resume the job). Notably, a new cluster setting `changefeed.permissions.enforce_external_connections` is added and set to `false` by default. Enabling this setting restricts users with `CHANGEFEED` on a set of tables to create enterprise changefeeds into external connections only. To use a given external connection, a user typically needs the `USAGE` privilege on it. Note `ALTER DEFAULT PRIVILEGES` can be used with both both the `CHANGEFEED` and `SELECT` privileges to assign coarse-grained permissions (ie. assign permissions to all tables in a schema rather than manually assign them for each table). --- pkg/BUILD.bazel | 4 + pkg/ccl/changefeedccl/BUILD.bazel | 3 + .../changefeedccl/alter_changefeed_stmt.go | 30 +- .../changefeedccl/alter_changefeed_test.go | 225 +++++++++++++++ pkg/ccl/changefeedccl/cdctest/BUILD.bazel | 1 + pkg/ccl/changefeedccl/cdctest/testfeed.go | 12 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 165 +++++++++-- pkg/ccl/changefeedccl/changefeed_test.go | 256 +++++++++++++++--- .../changefeedccl/changefeedbase/settings.go | 11 + pkg/ccl/changefeedccl/helpers_test.go | 46 +++- pkg/ccl/changefeedccl/scheduled_changefeed.go | 8 - .../scheduled_changefeed_test.go | 66 +++-- .../show_changefeed_jobs_test.go | 54 ++++ pkg/ccl/changefeedccl/testfeed_test.go | 23 +- .../testdata/logic_test/changefeed | 31 ++- pkg/jobs/privilege/BUILD.bazel | 42 +++ pkg/jobs/privilege/privilege.go | 145 ++++++++++ pkg/jobs/privilege/privilege_test.go | 236 ++++++++++++++++ pkg/sql/BUILD.bazel | 1 + pkg/sql/alter_role.go | 5 + pkg/sql/authorization.go | 12 + pkg/sql/control_jobs.go | 43 +-- pkg/sql/crdb_internal.go | 33 +-- pkg/sql/create_role.go | 5 + pkg/sql/delegate/show_changefeed_jobs.go | 12 +- pkg/sql/logictest/testdata/logic_test/jobs | 8 +- .../logictest/testdata/logic_test/run_control | 11 - pkg/sql/planhook.go | 3 + pkg/sql/planner.go | 14 + pkg/sql/roleoption/role_option.go | 7 + 30 files changed, 1328 insertions(+), 184 deletions(-) create mode 100644 pkg/jobs/privilege/BUILD.bazel create mode 100644 pkg/jobs/privilege/privilege.go create mode 100644 pkg/jobs/privilege/privilege_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index b243e8fb6a48..43a6268abb1f 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -169,6 +169,7 @@ ALL_TESTS = [ "//pkg/internal/team:team_test", "//pkg/jobs/joberror:joberror_test", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", + "//pkg/jobs/privilege:privilege_test", "//pkg/jobs:jobs_test", "//pkg/keys:keys_test", "//pkg/kv/bulk:bulk_test", @@ -1114,6 +1115,8 @@ GO_TARGETS = [ "//pkg/jobs/jobsprotectedts:jobsprotectedts", "//pkg/jobs/jobsprotectedts:jobsprotectedts_test", "//pkg/jobs/jobstest:jobstest", + "//pkg/jobs/privilege:privilege", + "//pkg/jobs/privilege:privilege_test", "//pkg/jobs:jobs", "//pkg/jobs:jobs_test", "//pkg/keys:keys", @@ -2513,6 +2516,7 @@ GET_X_DATA_TARGETS = [ "//pkg/jobs/jobspb:get_x_data", "//pkg/jobs/jobsprotectedts:get_x_data", "//pkg/jobs/jobstest:get_x_data", + "//pkg/jobs/privilege:get_x_data", "//pkg/keys:get_x_data", "//pkg/keysbase:get_x_data", "//pkg/kv:get_x_data", diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index da22e7e8285f..d99442563767 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -62,6 +62,7 @@ go_library( "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprotectedts", + "//pkg/jobs/privilege", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", @@ -105,6 +106,7 @@ go_library( "//pkg/sql/sem/volatility", "//pkg/sql/sessiondatapb", "//pkg/sql/sqlutil", + "//pkg/sql/syntheticprivilege", "//pkg/sql/types", "//pkg/util", "//pkg/util/admission", @@ -234,6 +236,7 @@ go_test( "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", + "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descbuilder", "//pkg/sql/catalog/descpb", diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index d3b9fc399f3c..48c41e67c5f1 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + jobsprivilege "github.com/cockroachdb/cockroach/pkg/jobs/privilege" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" @@ -100,6 +101,12 @@ func alterChangefeedPlanHook( return err } + jobPayload := job.Payload() + + if err := jobsprivilege.Authorize(ctx, p, jobID, &jobPayload, jobsprivilege.NoFlag); err != nil { + return err + } + prevDetails, ok := job.Details().(jobspb.ChangefeedDetails) if !ok { return errors.Errorf(`job %d is not changefeed job`, jobID) @@ -123,11 +130,12 @@ func alterChangefeedPlanHook( return err } - newTargets, newProgress, newStatementTime, originalSpecs, err := generateNewTargets( + newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets( ctx, exprEval, p, alterChangefeedStmt.Cmds, newOptions.AsMap(), // TODO: Remove .AsMap() prevDetails, job.Progress(), + newSinkURI, ) if err != nil { return err @@ -320,7 +328,7 @@ func generateNewOpts( return changefeedbase.MakeStatementOptions(newOptions), sinkURI, nil } -func generateNewTargets( +func generateAndValidateNewTargets( ctx context.Context, exprEval exprutil.Evaluator, p sql.PlanHookState, @@ -328,6 +336,7 @@ func generateNewTargets( opts map[string]string, prevDetails jobspb.ChangefeedDetails, prevProgress jobspb.Progress, + sinkURI string, ) ( tree.ChangefeedTargets, *jobspb.Progress, @@ -490,6 +499,7 @@ func generateNewTargets( existingTargetSpans := fetchSpansForDescs(p, existingTargetDescs) var newTargetDescs []catalog.Descriptor for _, target := range v.Targets { + desc, found, err := getTargetDesc(ctx, p, descResolver, target.TableName) if err != nil { return nil, nil, hlc.Timestamp{}, nil, err @@ -501,6 +511,7 @@ func generateNewTargets( tree.ErrString(&target), ) } + k := targetKey{TableID: desc.GetID(), FamilyName: target.FamilyName} newTargets[k] = target newTableDescs[desc.GetID()] = desc @@ -546,6 +557,7 @@ func generateNewTargets( tree.ErrString(&target), ) } + newTableDescs[desc.GetID()] = desc delete(newTargets, k) } telemetry.CountBucketed(telemetryPath+`.dropped_targets`, int64(len(v.Targets))) @@ -580,6 +592,20 @@ func generateNewTargets( newTargetList = append(newTargetList, target) } + hasSelectPrivOnAllTables := true + hasChangefeedPrivOnAllTables := true + for _, desc := range newTableDescs { + hasSelect, hasChangefeed, err := checkPrivilegesForDescriptor(ctx, p, desc) + if err != nil { + return nil, nil, hlc.Timestamp{}, nil, err + } + hasSelectPrivOnAllTables = hasSelectPrivOnAllTables && hasSelect + hasChangefeedPrivOnAllTables = hasChangefeedPrivOnAllTables && hasChangefeed + } + if err := verifyUserCanCreateChangefeed(ctx, p, sinkURI, hasSelectPrivOnAllTables, hasChangefeedPrivOnAllTables); err != nil { + return nil, nil, hlc.Timestamp{}, nil, err + } + if err := validateNewTargets(ctx, p, newTargetList, newJobProgress, newJobStatementTime); err != nil { return nil, nil, hlc.Timestamp{}, nil, err } diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 367c0966f030..a673397111fb 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -10,19 +10,25 @@ package changefeedccl import ( "context" + gosql "database/sql" "fmt" + "net/url" "sync/atomic" "testing" "time" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/tests" @@ -40,6 +46,155 @@ import ( "github.com/stretchr/testify/require" ) +// TestAlterChangefeedAddTargetPrivileges tests permissions for +// users creating new changefeeds while altering them. +func TestAlterChangefeedAddTargetPrivileges(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + DistSQL: &execinfra.TestingKnobs{ + Changefeed: &TestingKnobs{ + WrapSink: func(s Sink, _ jobspb.JobID) Sink { + if _, ok := s.(*externalConnectionKafkaSink); ok { + return s + } + return &externalConnectionKafkaSink{sink: s} + }, + }, + }, + }, + }) + ctx := context.Background() + s := srv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + + rootDB := sqlutils.MakeSQLRunner(db) + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_c (id int, type type_a)`) + rootDB.Exec(t, `CREATE USER feedCreator`) + rootDB.Exec(t, `GRANT SELECT ON table_a TO feedCreator`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO feedCreator`) + rootDB.Exec(t, `CREATE EXTERNAL CONNECTION "first" AS 'kafka://nope'`) + rootDB.Exec(t, `GRANT USAGE ON EXTERNAL CONNECTION first TO feedCreator`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) + + rootDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + enableEnterprise := utilccl.TestingDisableEnterprise() + enableEnterprise() + + withUser := func(t *testing.T, user string, fn func(*sqlutils.SQLRunner)) { + password := `password` + rootDB.Exec(t, fmt.Sprintf(`ALTER USER %s WITH PASSWORD '%s'`, user, password)) + + pgURL := url.URL{ + Scheme: "postgres", + User: url.UserPassword(user, password), + Host: s.SQLAddr(), + } + db2, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) + + fn(userDB) + } + + t.Run("using-changefeed-grant", func(t *testing.T) { + rootDB.Exec(t, `CREATE EXTERNAL CONNECTION "second" AS 'kafka://nope'`) + rootDB.Exec(t, `CREATE USER user1`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO user1`) + + var jobID int + withUser(t, "feedCreator", func(userDB *sqlutils.SQLRunner) { + row := userDB.QueryRow(t, "CREATE CHANGEFEED for table_a INTO 'external://first'") + row.Scan(&jobID) + userDB.Exec(t, `PAUSE JOB $1`, jobID) + waitForJobStatus(userDB, t, catpb.JobID(jobID), `paused`) + }) + + // user1 is missing the CHANGEFEED privilege on table_b and table_c. + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO user1`) + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_c TO user1`) + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + + // With require_external_connection_sink enabled, the user requires USAGE on the external connection. + rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.require_external_connection_sink = true") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 does not have USAGE privilege on external_connection second", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + rootDB.Exec(t, `GRANT USAGE ON EXTERNAL CONNECTION second TO user1`) + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID), + ) + }) + rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.require_external_connection_sink = false") + }) + + // TODO(#94757): remove CONTROLCHANGEFEED entirely + t.Run("using-controlchangefeed-roleoption", func(t *testing.T) { + rootDB.Exec(t, `CREATE USER user2 WITH CONTROLCHANGEFEED`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO user2`) + rootDB.Exec(t, `GRANT SELECT ON table_a TO user2`) + + var jobID int + withUser(t, "feedCreator", func(userDB *sqlutils.SQLRunner) { + row := userDB.QueryRow(t, "CREATE CHANGEFEED for table_a INTO 'kafka://foo'") + row.Scan(&jobID) + userDB.Exec(t, `PAUSE JOB $1`, jobID) + waitForJobStatus(userDB, t, catpb.JobID(jobID), `paused`) + }) + + // user2 is missing the SELECT privilege on table_b and table_c. + withUser(t, "user2", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "pq: user user2 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID), + ) + }) + rootDB.Exec(t, `GRANT SELECT ON table_b TO user2`) + withUser(t, "user2", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "pq: user user2 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed", + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID), + ) + }) + rootDB.Exec(t, `GRANT SELECT ON table_c TO user2`) + withUser(t, "user2", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID), + ) + }) + }) +} + func TestAlterChangefeedAddTarget(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1419,3 +1574,73 @@ func TestAlterChangefeedWithOldCursorFromCreateChangefeed(t *testing.T) { cdcTest(t, testFn, feedTestEnterpriseSinks) } + +// TestChangefeedJobControl tests if a user can modify and existing changefeed +// based on their privileges. +func TestAlterChangefeedAccessControl(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ChangefeedJobPermissionsTestSetup(t, s) + rootDB := sqlutils.MakeSQLRunner(s.DB) + + createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { + successfulFeed := feed(t, f, stmt) + closeCf := func() { + closeFeed(t, successfulFeed) + } + _, err := successfulFeed.Next() + require.NoError(t, err) + return successfulFeed.(cdctest.EnterpriseTestFeed), closeCf + } + + // Create a changefeed and pause it. + var currentFeed cdctest.EnterpriseTestFeed + var closeCf func() + asUser(t, f, `feedCreator`, func(_ *sqlutils.SQLRunner) { + currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + }) + rootDB.Exec(t, "PAUSE job $1", currentFeed.JobID()) + waitForJobStatus(rootDB, t, currentFeed.JobID(), `paused`) + + // Verify who can modify the existing changefeed. + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP table_b`, currentFeed.JobID())) + }) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + // jobController can access the job, but will hit an error re-creating the changefeed. + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user jobcontroller requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", fmt.Sprintf(`ALTER CHANGEFEED %d DROP table_b`, currentFeed.JobID())) + }) + asUser(t, f, `userWithSomeGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user userwithsomegrants does not have CHANGEFEED privilege on relation table_b", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + asUser(t, f, `regularUser`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user regularuser does not have CHANGEFEED privilege on relation (table_a|table_b)", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + closeCf() + + // No one can modify changefeeds created by admins, except for admins. + // In this case, the root user creates the changefeed. + currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "PAUSE job $1", currentFeed.JobID()) + require.NoError(t, currentFeed.WaitForStatus(func(s jobs.Status) bool { + return s == jobs.StatusPaused + })) + }) + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID())) + }) + closeCf() + } + + // Only enterprise sinks create jobs. + cdcTest(t, testFn, feedTestEnterpriseSinks) +} diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index e77a2feb2cb8..7555a3595208 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "//pkg/sql/catalog", "//pkg/sql/catalog/descs", "//pkg/sql/sem/tree", + "//pkg/testutils/sqlutils", "//pkg/util", "//pkg/util/fsm", "//pkg/util/hlc", diff --git a/pkg/ccl/changefeedccl/cdctest/testfeed.go b/pkg/ccl/changefeedccl/cdctest/testfeed.go index d61124291778..91e52d5d13dd 100644 --- a/pkg/ccl/changefeedccl/cdctest/testfeed.go +++ b/pkg/ccl/changefeedccl/cdctest/testfeed.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) @@ -21,12 +22,11 @@ type TestFeedFactory interface { // Feed creates a new TestFeed. Feed(create string, args ...interface{}) (TestFeed, error) - // AsUser connects to the database as the specified user, - // calls fn(), then goes back to using the same root - // connection. Will return an error if the initial connection - // to the database fails, but fn is responsible for failing - // the test on other errors. - AsUser(user string, fn func()) error + // AsUser connects to the database as the specified user, calls fn() with the + // user's connection, then goes back to using the same root connection. Will + // return an error if the initial connection to the database fails, but fn is + // responsible for failing the test on other errors. + AsUser(user string, fn func(runner *sqlutils.SQLRunner)) error } // TestFeedMessage represents one row update or resolved timestamp message from diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 9c43e5907ac6..bbb09fd61047 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -22,6 +22,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/cloud" + "github.com/cockroachdb/cockroach/pkg/cloud/externalconn" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/docs" "github.com/cockroachdb/cockroach/pkg/featureflag" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -45,6 +47,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/asof" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -390,8 +393,18 @@ func createChangefeedJobRecord( statementTime = initialHighWater } + checkPrivs := true if !changefeedStmt.alterChangefeedAsOf.IsEmpty() { statementTime = changefeedStmt.alterChangefeedAsOf + // When altering a changefeed, we generate target descriptors below + // based on a timestamp in the past. For example, this may be the + // last highwater timestamp of a paused changefeed. + // This is a problem because any privilege checks done on these + // descriptors will be out of date. + // To solve this problem, we validate the descriptors + // in the alterChangefeedPlanHook at the statement time. + // Thus, we can skip the check here. + checkPrivs = false } endTime := hlc.Timestamp{} @@ -429,7 +442,8 @@ func createChangefeedJobRecord( } targets, tables, err := getTargetsAndTables(ctx, p, targetDescs, changefeedStmt.Targets, - changefeedStmt.originalSpecs, opts.ShouldUseFullStatementTimeName()) + changefeedStmt.originalSpecs, opts.ShouldUseFullStatementTimeName(), sinkURI) + if err != nil { return nil, err } @@ -442,6 +456,8 @@ func createChangefeedJobRecord( TargetSpecifications: targets, } specs := AllTargets(details) + hasSelectPrivOnAllTables := true + hasChangefeedPrivOnAllTables := true for _, desc := range targetDescs { if table, isTable := desc.(catalog.TableDescriptor); isTable { if err := changefeedvalidators.ValidateTable(specs, table, tolerances); err != nil { @@ -450,6 +466,18 @@ func createChangefeedJobRecord( for _, warning := range changefeedvalidators.WarningsForTable(table, tolerances) { p.BufferClientNotice(ctx, pgnotice.Newf("%s", warning)) } + + hasSelect, hasChangefeed, err := checkPrivilegesForDescriptor(ctx, p, desc) + if err != nil { + return nil, err + } + hasSelectPrivOnAllTables = hasSelectPrivOnAllTables && hasSelect + hasChangefeedPrivOnAllTables = hasChangefeedPrivOnAllTables && hasChangefeed + } + } + if checkPrivs { + if err := verifyUserCanCreateChangefeed(ctx, p, sinkURI, hasSelectPrivOnAllTables, hasChangefeedPrivOnAllTables); err != nil { + return nil, err } } @@ -704,23 +732,12 @@ func getTargetsAndTables( rawTargets tree.ChangefeedTargets, originalSpecs map[tree.ChangefeedTarget]jobspb.ChangefeedTargetSpecification, fullTableName bool, + sinkURI string, ) ([]jobspb.ChangefeedTargetSpecification, jobspb.ChangefeedTargets, error) { tables := make(jobspb.ChangefeedTargets, len(targetDescs)) targets := make([]jobspb.ChangefeedTargetSpecification, len(rawTargets)) seen := make(map[jobspb.ChangefeedTargetSpecification]tree.ChangefeedTarget) - hasControlChangefeed, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED) - if err != nil { - return nil, nil, err - } - - var requiredPrivilegePerTable privilege.Kind - if hasControlChangefeed { - requiredPrivilegePerTable = privilege.SELECT - } else { - requiredPrivilegePerTable = privilege.CHANGEFEED - } - for i, ct := range rawTargets { desc, ok := targetDescs[ct.TableName] if !ok { @@ -731,10 +748,6 @@ func getTargetsAndTables( return nil, nil, errors.Errorf(`CHANGEFEED cannot target %s`, tree.AsString(&ct)) } - if err := p.CheckPrivilege(ctx, desc, requiredPrivilegePerTable); err != nil { - return nil, nil, errors.WithHint(err, `Users with CONTROLCHANGEFEED need SELECT, other users need CHANGEFEED.`) - } - if spec, ok := originalSpecs[ct]; ok { targets[i] = spec if table, ok := tables[td.GetID()]; ok { @@ -782,9 +795,127 @@ func getTargetsAndTables( } seen[targets[i]] = ct } + return targets, tables, nil } +func checkPrivilegesForDescriptor( + ctx context.Context, p sql.PlanHookState, desc catalog.Descriptor, +) (hasSelect bool, hasChangefeed bool, err error) { + if desc.GetObjectType() != privilege.Table { + return false, false, errors.AssertionFailedf("expected descriptor %d to be a table descriptor. instead found: %s ", desc.GetID(), desc.GetObjectType()) + } + + hasSelect, hasChangefeed = true, true + if err = p.CheckPrivilege(ctx, desc, privilege.SELECT); err != nil { + if !sql.IsInsufficientPrivilegeError(err) { + return false, false, err + } + hasSelect = false + } + if err = p.CheckPrivilege(ctx, desc, privilege.CHANGEFEED); err != nil { + if !sql.IsInsufficientPrivilegeError(err) { + return false, false, err + } + hasChangefeed = false + } + return hasSelect, hasChangefeed, nil +} + +// verifyUserCanCreateChangefeed performs changefeed creation privilege checks, returning a +// pgcode.InsufficientPrivilege error if the check fails. +// +// TODO(#94757): remove CONTROLCHANGEFEED entirely +// Admins can create any kind of changefeed. For non-admins: +// - The first check which is performed is checking if a user has CONTROLCHANGEFEED. If so, +// we enforce that they require privilege.SELECT on all target tables. Such as user +// can use any sink. +// - To create a core changefeed, a user requires privilege.SELECT on all targeted tables. +// - To create an enterprise changefeed, the user requires privilege.CHANGEFEED on all tables. +// If changefeedbase.EnforceExternalConnectionsForChangefeedPriv is enabled, then the changefeed +// must be used with an external connection and the user requires privilege.USAGE on it. +func verifyUserCanCreateChangefeed( + ctx context.Context, + p sql.PlanHookState, + sinkURI string, + hasSelectPrivOnAllTables bool, + hasChangefeedPrivOnAllTables bool, +) error { + isAdmin, err := p.HasAdminRole(ctx) + if err != nil { + return err + } + if isAdmin { + return nil + } + + hasControlChangefeed, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED) + if err != nil { + return err + } + if hasControlChangefeed { + if !hasSelectPrivOnAllTables { + return pgerror.Newf(pgcode.InsufficientPrivilege, + "user %s with %s role option requires the %s privilege on all target tables to be able to run an enterprise changefeed", + p.User(), roleoption.CONTROLCHANGEFEED, privilege.SELECT) + } + p.BufferClientNotice(ctx, pgnotice.Newf("You are creating a changefeed as a user with the %s role option. %s", + roleoption.CONTROLCHANGEFEED, roleoption.ControlChangefeedDeprecationNoticeMsg)) + return nil + } + + if sinkURI == "" { + if !hasSelectPrivOnAllTables { + return pgerror.Newf(pgcode.InsufficientPrivilege, + `user %s requires the %s privilege on all target tables to be able to run a core changefeed`, + p.User(), privilege.SELECT) + } + return nil + } + + if !hasChangefeedPrivOnAllTables { + return pgerror.Newf(pgcode.InsufficientPrivilege, + `user %s requires the %s privilege on all target tables to be able to run an enterprise changefeed`, + p.User(), privilege.CHANGEFEED) + } + + // Version gate for external connections. + enforceExternalConnections := changefeedbase.RequireExternalConnectionSink.Get(&p.ExecCfg().Settings.SV) + if enforceExternalConnections && !p.ExecCfg().Settings.Version.IsActive(ctx, clusterversion.V22_2SystemExternalConnectionsTable) { + return errors.WithHintf(pgerror.Newf(pgcode.FeatureNotSupported, + "version %v must be finalized to create an External Connection", + clusterversion.ByKey(clusterversion.V22_2SystemExternalConnectionsTable)), + `the %s privilege on all tables is only sufficient for external connection sinks`, privilege.CHANGEFEED) + } + + if enforceExternalConnections { + url, err := url.Parse(sinkURI) + if err != nil { + return errors.Newf("failed to parse url %s", sinkURI) + } + if url.Scheme == changefeedbase.SinkSchemeExternalConnection { + ec, err := externalconn.LoadExternalConnection(ctx, url.Host, p.ExecCfg().InternalExecutor, p.Txn()) + if err != nil { + return errors.Wrap(err, "failed to load external connection object") + } + ecPriv := &syntheticprivilege.ExternalConnectionPrivilege{ + ConnectionName: ec.ConnectionName(), + } + if err := p.CheckPrivilege(ctx, ecPriv, privilege.USAGE); err != nil { + return err + } + } else { + return pgerror.Newf( + pgcode.InsufficientPrivilege, + `the %s privilege on all tables can only be used with external connection sinks. see cluster setting %s`, + privilege.CHANGEFEED, changefeedbase.RequireExternalConnectionSink.Key(), + ) + } + } + + return nil +} + func validateSink( ctx context.Context, p sql.PlanHookState, diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 566ae519327a..ef7646ae1fbd 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -2455,18 +2455,17 @@ func TestChangefeedEachColumnFamilySchemaChanges(t *testing.T) { cdcTest(t, testFn) } -func TestChangefeedAuthorization(t *testing.T) { +func TestCoreChangefeedRequiresSelectPrivilege(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { rootDB := sqlutils.MakeSQLRunner(s.DB) - rootDB.Exec(t, `create user guest`) - rootDB.Exec(t, `create user feedcreator with controlchangefeed`) - rootDB.Exec(t, `create type type_a as enum ('a')`) - rootDB.Exec(t, `create table table_a (id int, type type_a)`) - rootDB.Exec(t, `create table table_b (id int, type type_a)`) - rootDB.Exec(t, `insert into table_a(id) values (0)`) + rootDB.Exec(t, `CREATE USER user1`) + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) expectSuccess := func(stmt string) { successfulFeed := feed(t, f, stmt) @@ -2475,30 +2474,163 @@ func TestChangefeedAuthorization(t *testing.T) { require.NoError(t, err) } - // Users with CONTROLCHANGEFEED need SELECT privileges as well. - asUser(t, f, `feedcreator`, func() { + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a`, - `user feedcreator does not have SELECT privilege on relation table_a`) + `user user1 requires the SELECT privilege on all target tables to be able to run a core changefeed`) + }) + rootDB.Exec(t, `GRANT SELECT ON table_a TO user1`) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectSuccess(`CREATE CHANGEFEED FOR table_a`) + }) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b`, + `user user1 requires the SELECT privilege on all target tables to be able to run a core changefeed`) }) - rootDB.Exec(t, `GRANT SELECT ON table_a TO guest`) - rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO guest`) - rootDB.Exec(t, `GRANT SELECT ON table_a TO feedcreator`) - - // Users without the controlchangefeed role option need the CHANGEFEED privilege - // on every referenced table. - asUser(t, f, `guest`, func() { - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b -- as guest`, - `CHANGEFEED privilege`) + rootDB.Exec(t, `GRANT SELECT ON table_b TO user1`) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectSuccess(`CREATE CHANGEFEED FOR table_a, table_b`) }) + } + cdcTest(t, testFn, feedTestForceSink("sinkless")) +} - // Users with controlchangefeed need the SELECT privilege on every table. - asUser(t, f, `feedcreator`, func() { - expectSuccess(`CREATE CHANGEFEED FOR table_a`) +// TODO(#94757): remove CONTROLCHANGEFEED entirely +func TestControlChangefeedRoleOption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) - expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b -- as feedcreator`, - `user feedcreator does not have SELECT privilege on relation table_b`) + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + rootDB := sqlutils.MakeSQLRunner(s.DB) + rootDB.Exec(t, `CREATE USER user1 WITH CONTROLCHANGEFEED`) + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) + + expectSuccess := func(stmt string) { + successfulFeed := feed(t, f, stmt) + defer closeFeed(t, successfulFeed) + _, err := successfulFeed.Next() + require.NoError(t, err) + } + + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b`, + `pq: user user1 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed`) + }) + rootDB.Exec(t, `GRANT SELECT ON table_a TO user1`) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectErrCreatingFeed(t, f, `CREATE CHANGEFEED FOR table_a, table_b`, + `pq: user user1 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed`) }) + rootDB.Exec(t, `GRANT SELECT ON table_b TO user1`) + asUser(t, f, `user1`, func(_ *sqlutils.SQLRunner) { + expectSuccess(`CREATE CHANGEFEED FOR table_a`) + }) + } + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +func TestChangefeedCreateAuthorizationWithChangefeedPriv(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + DistSQL: &execinfra.TestingKnobs{ + Changefeed: &TestingKnobs{ + WrapSink: func(s Sink, _ jobspb.JobID) Sink { + if _, ok := s.(*externalConnectionKafkaSink); ok { + return s + } + return &externalConnectionKafkaSink{sink: s} + }, + }, + }, + }, + }) + ctx := context.Background() + s := srv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + + rootDB := sqlutils.MakeSQLRunner(db) + rootDB.Exec(t, `CREATE USER user1`) + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) + rootDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + enableEnterprise := utilccl.TestingDisableEnterprise() + enableEnterprise() + + withUser := func(t *testing.T, user string, fn func(*sqlutils.SQLRunner)) { + password := `password` + rootDB.Exec(t, fmt.Sprintf(`ALTER USER %s WITH PASSWORD '%s'`, user, password)) + + pgURL := url.URL{ + Scheme: "postgres", + User: url.UserPassword(user, password), + Host: s.SQLAddr(), + } + db2, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) + + fn(userDB) + } + + rootDB.Exec(t, `CREATE EXTERNAL CONNECTION "nope" AS 'kafka://nope'`) + + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + "CREATE CHANGEFEED for table_a, table_b INTO 'external://nope'", + ) + }) + rootDB.Exec(t, "GRANT CHANGEFEED ON table_a TO user1") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + "CREATE CHANGEFEED for table_a, table_b INTO 'external://nope'", + ) + }) + rootDB.Exec(t, "GRANT CHANGEFEED ON table_b TO user1") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + "CREATE CHANGEFEED for table_a, table_b INTO 'external://nope'", + ) + }) + + // With require_external_connection_sink enabled, the user requires USAGE on the external connection. + rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.require_external_connection_sink = true") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, + "pq: the CHANGEFEED privilege on all tables can only be used with external connection sinks", + "CREATE CHANGEFEED for table_a, table_b INTO 'kafka://nope'", + ) + }) + rootDB.Exec(t, "GRANT USAGE ON EXTERNAL CONNECTION nope to user1") + withUser(t, "user1", func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, + "CREATE CHANGEFEED for table_a, table_b INTO 'external://nope'", + ) + }) + rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.require_external_connection_sink = false") +} + +func TestChangefeedGrant(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + rootDB := sqlutils.MakeSQLRunner(s.DB) + rootDB.Exec(t, `create user guest`) // GRANT CHANGEFEED ON DATABASE is an error. rootDB.ExpectErr(t, `invalid privilege type CHANGEFEED for database`, `GRANT CHANGEFEED ON DATABASE d TO guest`) @@ -2510,17 +2642,6 @@ func TestChangefeedAuthorization(t *testing.T) { `INSERT INTO table_c values (0)`, ) - asUser(t, f, `guest`, func() { - expectSuccess(`CREATE CHANGEFEED FOR table_c`) - }) - - // GRANT CHANGEFED ON prefix.* grants CHANGEFEED on all current tables with that prefix. - rootDB.Exec(t, `GRANT CHANGEFEED ON d.public.* TO guest`) - rootDB.Exec(t, `GRANT SELECT ON d.* TO guest`) - asUser(t, f, `guest`, func() { - expectSuccess(`CREATE CHANGEFEED FOR table_c`) - }) - // SHOW GRANTS includes CHANGEFEED privileges. var count int rootDB.QueryRow(t, `select count(*) from [show grants] where privilege_type = 'CHANGEFEED';`).Scan(&count) @@ -2530,6 +2651,71 @@ func TestChangefeedAuthorization(t *testing.T) { cdcTest(t, testFn) } +// TestChangefeedJobControl tests if a user can control a changefeed +// based on their permissions. +func TestChangefeedJobControl(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ChangefeedJobPermissionsTestSetup(t, s) + + createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) { + successfulFeed := feed(t, f, stmt) + closeCf := func() { + closeFeed(t, successfulFeed) + } + _, err := successfulFeed.Next() + require.NoError(t, err) + return successfulFeed.(cdctest.EnterpriseTestFeed), closeCf + } + + // Create a changefeed and assert who can control the job. + var currentFeed cdctest.EnterpriseTestFeed + var closeCf func() + asUser(t, f, `feedCreator`, func(_ *sqlutils.SQLRunner) { + currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + }) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "PAUSE job $1", currentFeed.JobID()) + waitForJobStatus(userDB, t, currentFeed.JobID(), "paused") + }) + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "RESUME job $1", currentFeed.JobID()) + waitForJobStatus(userDB, t, currentFeed.JobID(), "running") + }) + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "RESUME job $1", currentFeed.JobID()) + waitForJobStatus(userDB, t, currentFeed.JobID(), "running") + }) + asUser(t, f, `userWithSomeGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user userwithsomegrants does not have CHANGEFEED privilege on relation table_b", "PAUSE job $1", currentFeed.JobID()) + }) + asUser(t, f, `regularUser`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: user regularuser does not have CHANGEFEED privilege on relation (table_a|table_b)", "PAUSE job $1", currentFeed.JobID()) + }) + closeCf() + + // No one can modify changefeeds created by admins, except for admins. + // In this case, the root user creates the changefeed. + currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.Exec(t, "PAUSE job $1", currentFeed.JobID()) + waitForJobStatus(userDB, t, currentFeed.JobID(), "paused") + }) + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", "PAUSE job $1", currentFeed.JobID()) + }) + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", "PAUSE job $1", currentFeed.JobID()) + }) + closeCf() + } + + // Only enterprise sinks create jobs. + cdcTest(t, testFn, feedTestEnterpriseSinks) +} + func TestChangefeedColumnFamilyAvro(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 607acb0a35e1..2e6b01c76049 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -270,3 +270,14 @@ var EventConsumerElasticCPUControlEnabled = settings.RegisterBoolSetting( "determines whether changefeed event processing integrates with elastic CPU control", true, ) + +// RequireExternalConnectionSink is used to restrict non-admins with the CHANGEFEED privilege +// to create changefeeds to external connections only. +var RequireExternalConnectionSink = settings.RegisterBoolSetting( + settings.TenantWritable, + "changefeed.permissions.require_external_connection_sink", + "if enabled, this settings restricts users with the CHANGEFEED privilege"+ + " to create changefeeds with external connection sinks only."+ + " see https://www.cockroachlabs.com/docs/stable/create-external-connection.html", + false, +) diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 70716810cc55..fa1b4a0f073b 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -630,7 +630,9 @@ func feed( return feed } -func asUser(t testing.TB, f cdctest.TestFeedFactory, user string, fn func()) { +func asUser( + t testing.TB, f cdctest.TestFeedFactory, user string, fn func(runner *sqlutils.SQLRunner), +) { t.Helper() require.NoError(t, f.AsUser(user, fn)) } @@ -1045,3 +1047,45 @@ func TestingSetIncludeParquetMetadata() func() { includeParquetTestMetadata = false } } + +// ChangefeedJobPermissionsTestSetup creates entities and users with various permissions +// for tests which test access control for changefeed jobs. +// +// This helper creates the following: +// +// UDT type_a +// TABLE table_a (with column type_a) +// TABLE table_b (with column type_a) +// USER adminUser (with admin privs) +// USER feedCreator (with CHANGEFEED priv on table_a and table_b) +// USER jobController (with the CONTROLJOB role option) +// USER userWithAllGrants (with CHANGEFEED on table_a and table b) +// USER userWithSomeGrants (with CHANGEFEED on table_a only) +// USER regularUser (with no privs) +func ChangefeedJobPermissionsTestSetup(t *testing.T, s TestServer) { + rootDB := sqlutils.MakeSQLRunner(s.DB) + + rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`) + rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`) + rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`) + rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`) + rootDB.Exec(t, `INSERT INTO table_b(id) values (0)`) + + rootDB.Exec(t, `CREATE USER adminUser`) + rootDB.Exec(t, `GRANT ADMIN TO adminUser`) + + rootDB.Exec(t, `CREATE USER feedCreator`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO feedCreator`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO feedCreator`) + + rootDB.Exec(t, `CREATE USER jobController with CONTROLJOB`) + + rootDB.Exec(t, `CREATE USER userWithAllGrants`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO userWithAllGrants`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO userWithAllGrants`) + + rootDB.Exec(t, `CREATE USER userWithSomeGrants`) + rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO userWithSomeGrants`) + + rootDB.Exec(t, `CREATE USER regularUser`) +} diff --git a/pkg/ccl/changefeedccl/scheduled_changefeed.go b/pkg/ccl/changefeedccl/scheduled_changefeed.go index 0f2d32ef574b..55895518f170 100644 --- a/pkg/ccl/changefeedccl/scheduled_changefeed.go +++ b/pkg/ccl/changefeedccl/scheduled_changefeed.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" - "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" @@ -547,13 +546,6 @@ func doCreateChangefeedSchedule( spec *scheduledChangefeedSpec, resultsCh chan<- tree.Datums, ) error { - hasControlChangefeed, err := p.HasRoleOption(ctx, roleoption.CONTROLCHANGEFEED) - if err != nil { - return err - } - if !hasControlChangefeed { - return errors.Newf("User needs CONTROLCHANGEFEED role to schedule changefeeds.") - } env := sql.JobSchedulerEnv(p.ExecCfg()) diff --git a/pkg/ccl/changefeedccl/scheduled_changefeed_test.go b/pkg/ccl/changefeedccl/scheduled_changefeed_test.go index 8aa43b15373e..01691e0fe453 100644 --- a/pkg/ccl/changefeedccl/scheduled_changefeed_test.go +++ b/pkg/ccl/changefeedccl/scheduled_changefeed_test.go @@ -21,13 +21,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedpb" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/scheduledjobs/schedulebase" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -281,29 +284,54 @@ RECURRING '@hourly' WITH SCHEDULE OPTIONS on_execution_failure = 'pause', first_ } } -func TestCreateChangefeedScheduleRequiresChangefeedRole(t *testing.T) { +// TestCreateChangefeedScheduleChecksPermissionsDuringDryRun verifies +// that we perform a dry run of creating the changefeed (performs +// permissions checks). +func TestCreateChangefeedScheduleChecksPermissionsDuringDryRun(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - th, cleanup := newTestHelper(t) - defer cleanup() - - th.sqlDB.Exec(t, `CREATE USER testuser`) - pgURL, cleanupFunc := sqlutils.PGUrl( - t, th.server.ServingSQLAddr(), - "TestCreateSchedule-testuser", url.User("testuser"), - ) - defer cleanupFunc() - - testuser, err := gosql.Open("postgres", pgURL.String()) - require.NoError(t, err) - defer func() { - require.NoError(t, testuser.Close()) - }() + srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + DisableDefaultTestTenant: true, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + DistSQL: &execinfra.TestingKnobs{ + Changefeed: &TestingKnobs{ + WrapSink: func(s Sink, _ jobspb.JobID) Sink { + if _, ok := s.(*externalConnectionKafkaSink); ok { + return s + } + return &externalConnectionKafkaSink{sink: s} + }, + }, + }, + }, + }) + ctx := context.Background() + s := srv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + rootDB := sqlutils.MakeSQLRunner(db) + rootDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`) + enableEnterprise := utilccl.TestingDisableEnterprise() + enableEnterprise() + + rootDB.Exec(t, `CREATE TABLE table_a (i int)`) + rootDB.Exec(t, `CREATE USER testuser WITH PASSWORD 'test'`) + + pgURL := url.URL{ + Scheme: "postgres", + User: url.UserPassword("testuser", "test"), + Host: s.SQLAddr(), + } + db2, err := gosql.Open("postgres", pgURL.String()) + if err != nil { + t.Fatal(err) + } + defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) - _, err = testuser.Exec( - "CREATE SCHEDULE FOR CHANGEFEED TABLE system.jobs INTO 'somewhere' WITH initial_scan = 'only' RECURRING '@daily'") - require.Regexp(t, "needs CONTROLCHANGEFEED role", err) + userDB.ExpectErr(t, "Failed to dry run create changefeed: user testuser requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", + "CREATE SCHEDULE FOR CHANGEFEED TABLE table_a INTO 'somewhere' WITH initial_scan = 'only' RECURRING '@daily'") } // TestCreateChangefeedScheduleIfNotExists: checks if adding IF NOT EXISTS will diff --git a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go index 52525a5c4e0d..c62389027c83 100644 --- a/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go +++ b/pkg/ccl/changefeedccl/show_changefeed_jobs_test.go @@ -13,6 +13,7 @@ import ( "fmt" "net/url" "sort" + "strconv" "strings" "testing" @@ -428,3 +429,56 @@ func TestShowChangefeedJobsAlterChangefeed(t *testing.T) { // Force kafka to validate topics cdcTest(t, testFn, feedTestForceSink("kafka")) } + +func TestShowChangefeedJobsAuthorization(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + ChangefeedJobPermissionsTestSetup(t, s) + + var jobID jobspb.JobID + createFeed := func(stmt string) { + successfulFeed := feed(t, f, stmt) + defer closeFeed(t, successfulFeed) + _, err := successfulFeed.Next() + require.NoError(t, err) + jobID = successfulFeed.(cdctest.EnterpriseTestFeed).JobID() + } + rootDB := sqlutils.MakeSQLRunner(s.DB) + + // Create a changefeed and assert who can see it. + asUser(t, f, `feedCreator`, func(userDB *sqlutils.SQLRunner) { + createFeed(`CREATE CHANGEFEED FOR table_a, table_b`) + }) + expectedJobIDStr := strconv.Itoa(int(jobID)) + asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{{expectedJobIDStr}}) + }) + asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{{expectedJobIDStr}}) + }) + asUser(t, f, `userWithSomeGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{}) + }) + asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{{expectedJobIDStr}}) + }) + asUser(t, f, `regularUser`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{}) + }) + + // Assert behavior when one of the tables is dropped. + rootDB.Exec(t, "DROP TABLE table_b") + // Having CHANGEFEED on only table_a is now sufficient. + asUser(t, f, `userWithSomeGrants`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{{expectedJobIDStr}}) + }) + asUser(t, f, `regularUser`, func(userDB *sqlutils.SQLRunner) { + userDB.CheckQueryResults(t, `SELECT job_id FROM [SHOW CHANGEFEED JOBS]`, [][]string{}) + }) + } + + // Only enterprise sinks create jobs. + cdcTest(t, testFn, feedTestEnterpriseSinks) +} diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go index d42f8be30c3d..1fa532836641 100644 --- a/pkg/ccl/changefeedccl/testfeed_test.go +++ b/pkg/ccl/changefeedccl/testfeed_test.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -74,7 +75,7 @@ func makeSinklessFeedFactory( return &sinklessFeedFactory{s: s, sink: sink, sinkForUser: sinkForUser} } -func (f *sinklessFeedFactory) AsUser(user string, fn func()) error { +func (f *sinklessFeedFactory) AsUser(user string, fn func(*sqlutils.SQLRunner)) error { prevSink := f.sink password := `hunter2` if err := setPassword(user, password, f.sink); err != nil { @@ -83,7 +84,19 @@ func (f *sinklessFeedFactory) AsUser(user string, fn func()) error { defer func() { f.sink = prevSink }() var cleanup func() f.sink, cleanup = f.sinkForUser(user, password) - fn() + pgconn := url.URL{ + Scheme: "postgres", + User: url.UserPassword(user, password), + Host: f.Server().SQLAddr(), + Path: `d`, + } + db2, err := gosql.Open("postgres", pgconn.String()) + if err != nil { + return err + } + defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) + fn(userDB) cleanup() return nil } @@ -665,7 +678,7 @@ func (e *enterpriseFeedFactory) jobsTableConn() *gosql.DB { // AsUser uses the previous (assumed to be root) connection to ensure // the user has the ability to authenticate, and saves it to poll // job status, then implements TestFeedFactory.AsUser(). -func (e *enterpriseFeedFactory) AsUser(user string, fn func()) error { +func (e *enterpriseFeedFactory) AsUser(user string, fn func(*sqlutils.SQLRunner)) error { prevDB := e.db e.rootDB = e.db defer func() { e.db = prevDB }() @@ -685,8 +698,10 @@ func (e *enterpriseFeedFactory) AsUser(user string, fn func()) error { return err } defer db2.Close() + userDB := sqlutils.MakeSQLRunner(db2) + e.db = db2 - fn() + fn(userDB) return nil } diff --git a/pkg/ccl/logictestccl/testdata/logic_test/changefeed b/pkg/ccl/logictestccl/testdata/logic_test/changefeed index a6c12066df09..bec3b0165132 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/changefeed +++ b/pkg/ccl/logictestccl/testdata/logic_test/changefeed @@ -10,35 +10,42 @@ user root # Test granting CONTROLCHANGEFEED. statement ok -ALTER USER testuser CONTROLCHANGEFEED; GRANT CONNECT ON DATABASE test TO testuser +query T noticetrace +ALTER USER testuser CONTROLCHANGEFEED +---- +NOTICE: The role option CONTROLCHANGEFEED will be removed in a future release, please switch to using the CHANGEFEED privilege for target tables instead: https://www.cockroachlabs.com/docs/stable/create-changefeed.html#required-privileges + user testuser # We should pass the CONTROLCHANGEFEED permission check but error on missing # SELECT privileges. -statement error user testuser does not have SELECT privilege on relation t -CREATE CHANGEFEED FOR t +statement error pq: user testuser with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed +CREATE CHANGEFEED FOR t INTO 'null://sink' with initial_scan='only' -# Test revoking CONTROLCHANGEFEED. user root +# Test granting SELECT. statement ok -ALTER USER testuser NOCONTROLCHANGEFEED; GRANT SELECT ON TABLE t TO testuser user testuser -statement error user testuser does not have CHANGEFEED privilege on relation t -CREATE CHANGEFEED FOR t - -# The CHANGEFEED privilege can be granted granularly. +# Test the deprecation notice for CONTROLCHANGEFEED +query T noticetrace +CREATE CHANGEFEED FOR t INTO 'null://sink' with initial_scan='only' +---- +NOTICE: You are creating a changefeed as a user with the CONTROLCHANGEFEED role option. The role option CONTROLCHANGEFEED will be removed in a future release, please switch to using the CHANGEFEED privilege for target tables instead: https://www.cockroachlabs.com/docs/stable/create-changefeed.html#required-privileges +# Test revoking CONTROLCHANGEFEED. user root + statement ok -GRANT CHANGEFEED ON table t TO testuser +ALTER USER testuser NOCONTROLCHANGEFEED; +GRANT SELECT ON TABLE t TO testuser user testuser -statement ok -CREATE CHANGEFEED FOR t with initial_scan='only' +statement error user testuser requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed +CREATE CHANGEFEED FOR t INTO 'null://sink' with initial_scan='only' diff --git a/pkg/jobs/privilege/BUILD.bazel b/pkg/jobs/privilege/BUILD.bazel new file mode 100644 index 000000000000..de6fc7b28cda --- /dev/null +++ b/pkg/jobs/privilege/BUILD.bazel @@ -0,0 +1,42 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "privilege", + srcs = ["privilege.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/jobs/privilege", + visibility = ["//visibility:public"], + deps = [ + "//pkg/jobs/jobspb", + "//pkg/security/username", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/privilege", + "//pkg/sql/roleoption", + "//pkg/util/intsets", + "@com_github_cockroachdb_errors//:errors", + ], +) + +go_test( + name = "privilege_test", + srcs = ["privilege_test.go"], + args = ["-test.timeout=295s"], + deps = [ + ":privilege", + "//pkg/jobs/jobspb", + "//pkg/security/username", + "//pkg/sql/catalog", + "//pkg/sql/catalog/descpb", + "//pkg/sql/pgwire/pgcode", + "//pkg/sql/pgwire/pgerror", + "//pkg/sql/privilege", + "//pkg/sql/roleoption", + "//pkg/util/randutil", + "@com_github_stretchr_testify//assert", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/jobs/privilege/privilege.go b/pkg/jobs/privilege/privilege.go new file mode 100644 index 000000000000..f190081fd490 --- /dev/null +++ b/pkg/jobs/privilege/privilege.go @@ -0,0 +1,145 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package privilege + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/roleoption" + "github.com/cockroachdb/cockroach/pkg/util/intsets" + "github.com/cockroachdb/errors" +) + +// An AccessFlag is used determine to if a special rule should be applied +// when checking if a user can access a job. +type AccessFlag int + +const ( + // NoFlag is a placeholder for no special rule. + NoFlag AccessFlag = iota + // UserCanAccessOwnJob permits users to access jobs if they are the job owner. + UserCanAccessOwnJob +) + +type accessFlagSet struct { + inner intsets.Fast +} + +func (s *accessFlagSet) contains(flag AccessFlag) bool { + return s.inner.Contains(int(flag)) +} + +func makeAccessFlagSet(flags []AccessFlag) accessFlagSet { + flagsSet := intsets.MakeFast() + for _, f := range flags { + flagsSet.Add(int(f)) + } + return accessFlagSet{inner: flagsSet} +} + +// AuthorizationAccessor is an interface for checking authorization on jobs. +type AuthorizationAccessor interface { + // CheckPrivilegeForTableID mirrors sql.AuthorizationAccessor. + CheckPrivilegeForTableID(ctx context.Context, tableID descpb.ID, privilege privilege.Kind) error + + // HasRoleOption mirrors sql.AuthorizationAccessor. + HasRoleOption(ctx context.Context, roleOption roleoption.Option) (bool, error) + + // UserHasAdminRole mirrors sql.AuthorizationAccessor. + UserHasAdminRole(ctx context.Context, user username.SQLUsername) (bool, error) + + // HasAdminRole mirrors sql.AuthorizationAccessor. + HasAdminRole(ctx context.Context) (bool, error) + + // User mirrors sql.PlanHookState. + User() username.SQLUsername +} + +// changefeedPrivilegeCheck determines if a user has access to the changefeed defined +// by the supplied payload. +func changefeedPrivilegeCheck( + ctx context.Context, a AuthorizationAccessor, specs []jobspb.ChangefeedTargetSpecification, +) error { + + for _, spec := range specs { + err := a.CheckPrivilegeForTableID(ctx, spec.TableID, privilege.CHANGEFEED) + if err != nil { + // When performing SHOW JOBS or SHOW CHANGEFEED JOBS, there may be old changefeed + // records that reference tables which have been dropped or are being + // dropped. In this case, we would prefer to skip the permissions check on + // the dropped descriptor. + if pgerror.GetPGCode(err) == pgcode.UndefinedTable || errors.Is(err, catalog.ErrDescriptorDropped) { + continue + } + + return err + } + } + return nil +} + +// Authorize returns an error if the user should not be able to access the job. +func Authorize( + ctx context.Context, + a AuthorizationAccessor, + jobID jobspb.JobID, + payload *jobspb.Payload, + flag AccessFlag, +) error { + userIsAdmin, err := a.HasAdminRole(ctx) + if err != nil { + return err + } + + userHasControlJob, err := a.HasRoleOption(ctx, roleoption.CONTROLJOB) + if err != nil { + return err + } + + jobOwnerUser := payload.UsernameProto.Decode() + jobOwnerIsAdmin, err := a.UserHasAdminRole(ctx, jobOwnerUser) + if err != nil { + return err + } + + if jobOwnerIsAdmin { + if !userIsAdmin { + return pgerror.Newf(pgcode.InsufficientPrivilege, + "only admins can control jobs owned by other admins") + } + return nil + } + + if (userHasControlJob) || (flag == UserCanAccessOwnJob && a.User() == jobOwnerUser) { + return nil + } + + switch payload.Type() { + case jobspb.TypeChangefeed: + specs, ok := payload.UnwrapDetails().(jobspb.ChangefeedDetails) + if !ok { + return errors.Newf("could not unwrap details from the payload of job %d", jobID) + } + + return changefeedPrivilegeCheck(ctx, a, specs.TargetSpecifications) + default: + return pgerror.Newf(pgcode.InsufficientPrivilege, + "user %s does not have %s privilege for job $d", + a.User(), roleoption.CONTROLJOB, jobID) + } +} diff --git a/pkg/jobs/privilege/privilege_test.go b/pkg/jobs/privilege/privilege_test.go new file mode 100644 index 000000000000..51cf56a87106 --- /dev/null +++ b/pkg/jobs/privilege/privilege_test.go @@ -0,0 +1,236 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package privilege_test + +import ( + "context" + "math/rand" + "testing" + + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + jobsprivilege "github.com/cockroachdb/cockroach/pkg/jobs/privilege" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" + "github.com/cockroachdb/cockroach/pkg/sql/roleoption" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/assert" +) + +type testAuthAccessor struct { + user username.SQLUsername + + // set of role options that the user has + roleOptions map[roleoption.Option]struct{} + + // set of descriptors which the user has privilege.CHANGEFEED on + changeFeedPrivileges map[descpb.ID]struct{} + // set of descriptors which are dropped + droppedDescriptors map[descpb.ID]struct{} + + // set of all usernames who are admins + admins map[string]struct{} + + rand *rand.Rand +} + +var _ jobsprivilege.AuthorizationAccessor = &testAuthAccessor{} + +func (a *testAuthAccessor) CheckPrivilegeForTableID( + _ context.Context, tableID descpb.ID, _ privilege.Kind, +) error { + if _, ok := a.droppedDescriptors[tableID]; ok { + if a.rand.Int31n(2) == 0 { + return catalog.ErrDescriptorDropped + } else { + return pgerror.New(pgcode.UndefinedTable, "foo") + } + } + + if _, ok := a.changeFeedPrivileges[tableID]; !ok { + return pgerror.New(pgcode.InsufficientPrivilege, "foo") + } + return nil +} + +func (a *testAuthAccessor) HasRoleOption( + _ context.Context, roleOption roleoption.Option, +) (bool, error) { + _, ok := a.roleOptions[roleOption] + return ok, nil +} + +func (a *testAuthAccessor) UserHasAdminRole( + _ context.Context, user username.SQLUsername, +) (bool, error) { + _, ok := a.admins[user.Normalized()] + return ok, nil +} + +func (a *testAuthAccessor) HasAdminRole(ctx context.Context) (bool, error) { + _, ok := a.admins[a.user.Normalized()] + return ok, nil +} + +func (a *testAuthAccessor) User() username.SQLUsername { + return a.user +} + +func makeChangefeedPayload(owner string, tableIDs []descpb.ID) *jobspb.Payload { + specs := make([]jobspb.ChangefeedTargetSpecification, len(tableIDs)) + for i, tableID := range tableIDs { + specs[i] = jobspb.ChangefeedTargetSpecification{ + TableID: tableID, + } + } + return &jobspb.Payload{ + Details: jobspb.WrapPayloadDetails(jobspb.ChangefeedDetails{ + TargetSpecifications: specs, + }), + UsernameProto: username.MakeSQLUsernameFromPreNormalizedString(owner).EncodeProto(), + } +} + +func makeBackupPayload(owner string) *jobspb.Payload { + return &jobspb.Payload{ + Details: jobspb.WrapPayloadDetails(jobspb.BackupDetails{}), + UsernameProto: username.MakeSQLUsernameFromPreNormalizedString(owner).EncodeProto(), + } +} + +func TestPrivileges(t *testing.T) { + rng, seed := randutil.NewTestRand() + t.Logf("random seed: %d", seed) + + for _, tc := range []struct { + name string + + user username.SQLUsername + roleOptions map[roleoption.Option]struct{} + changeFeedPrivileges map[descpb.ID]struct{} + droppedDescriptors map[descpb.ID]struct{} + admins map[string]struct{} + + payload *jobspb.Payload + flag jobsprivilege.AccessFlag + + canAccess bool + userErr error + }{ + { + name: "controljob-sufficient-for-non-admin-jobs", + + user: username.MakeSQLUsernameFromPreNormalizedString("user1"), + roleOptions: map[roleoption.Option]struct{}{ + roleoption.CONTROLJOB: {}, + }, + admins: map[string]struct{}{}, + payload: makeBackupPayload("user2"), + flag: jobsprivilege.NoFlag, + + canAccess: true, + }, + { + name: "controljob-insufficient-for-admin-jobs", + user: username.MakeSQLUsernameFromPreNormalizedString("user1"), + roleOptions: map[roleoption.Option]struct{}{ + roleoption.CONTROLJOB: {}, + }, + admins: map[string]struct{}{"user2": {}}, + payload: makeBackupPayload("user2"), + flag: jobsprivilege.NoFlag, + canAccess: false, + userErr: pgerror.New(pgcode.InsufficientPrivilege, "foo"), + }, + { + name: "users-access-their-own-jobs", + user: username.MakeSQLUsernameFromPreNormalizedString("user1"), + roleOptions: map[roleoption.Option]struct{}{}, + admins: map[string]struct{}{}, + payload: makeBackupPayload("user1"), + flag: jobsprivilege.UserCanAccessOwnJob, + canAccess: true, + }, + { + name: "users-cannot-access-their-own-jobs", + user: username.MakeSQLUsernameFromPreNormalizedString("user1"), + roleOptions: map[roleoption.Option]struct{}{}, + admins: map[string]struct{}{}, + payload: makeBackupPayload("user1"), + flag: jobsprivilege.NoFlag, + canAccess: false, + userErr: pgerror.New(pgcode.InsufficientPrivilege, "foo"), + }, + { + name: "admins-see-admin-jobs", + user: username.MakeSQLUsernameFromPreNormalizedString("user1"), + roleOptions: map[roleoption.Option]struct{}{}, + admins: map[string]struct{}{"user2": {}, "user1": {}}, + payload: makeBackupPayload("user2"), + flag: jobsprivilege.NoFlag, + canAccess: true, + }, + { + name: "changefeed-privilege-on-all-tables", + user: username.MakeSQLUsernameFromPreNormalizedString("user1"), + roleOptions: map[roleoption.Option]struct{}{}, + admins: map[string]struct{}{}, + changeFeedPrivileges: map[descpb.ID]struct{}{0: {}, 1: {}, 2: {}}, + + payload: makeChangefeedPayload("user2", []descpb.ID{0, 1, 2}), + flag: jobsprivilege.NoFlag, + canAccess: true, + }, + { + name: "changefeed-privilege-on-some-tables", + user: username.MakeSQLUsernameFromPreNormalizedString("user1"), + roleOptions: map[roleoption.Option]struct{}{}, + admins: map[string]struct{}{}, + changeFeedPrivileges: map[descpb.ID]struct{}{0: {}, 1: {}}, + + payload: makeChangefeedPayload("user2", []descpb.ID{0, 1, 2}), + flag: jobsprivilege.NoFlag, + canAccess: false, + userErr: pgerror.New(pgcode.InsufficientPrivilege, "foo"), + }, + { + name: "changefeed-priv-on-some-tables-with-dropped", + user: username.MakeSQLUsernameFromPreNormalizedString("user1"), + roleOptions: map[roleoption.Option]struct{}{}, + admins: map[string]struct{}{}, + changeFeedPrivileges: map[descpb.ID]struct{}{0: {}, 1: {}}, + droppedDescriptors: map[descpb.ID]struct{}{2: {}}, + + payload: makeChangefeedPayload("user2", []descpb.ID{0, 1, 2}), + flag: jobsprivilege.NoFlag, + + canAccess: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + testAuth := &testAuthAccessor{ + user: tc.user, + roleOptions: tc.roleOptions, + changeFeedPrivileges: tc.changeFeedPrivileges, + droppedDescriptors: tc.droppedDescriptors, + admins: tc.admins, + rand: rng, + } + + ctx := context.Background() + err := jobsprivilege.Authorize(ctx, testAuth, 0, tc.payload, tc.flag) + assert.Equal(t, pgerror.GetPGCode(tc.userErr), pgerror.GetPGCode(err)) + }) + } +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 590d64225ad9..c19f699eb3de 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -290,6 +290,7 @@ go_library( "//pkg/gossip", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/jobs/privilege", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", diff --git a/pkg/sql/alter_role.go b/pkg/sql/alter_role.go index dbe6f1d9b18d..624ded1da8c1 100644 --- a/pkg/sql/alter_role.go +++ b/pkg/sql/alter_role.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/paramparse" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -97,6 +98,10 @@ func (p *planner) AlterRoleNode( return nil, err } + if roleOptions.Contains(roleoption.CONTROLCHANGEFEED) { + p.BufferClientNotice(ctx, pgnotice.Newf(roleoption.ControlChangefeedDeprecationNoticeMsg)) + } + roleName, err := decodeusername.FromRoleSpec( p.SessionData(), username.PurposeValidation, roleSpec, ) diff --git a/pkg/sql/authorization.go b/pkg/sql/authorization.go index f72101f8727e..b4ea87d034f7 100644 --- a/pkg/sql/authorization.go +++ b/pkg/sql/authorization.go @@ -74,6 +74,12 @@ type userRoleMembership map[username.SQLUsername]bool // AuthorizationAccessor for checking authorization (e.g. desc privileges). type AuthorizationAccessor interface { + // CheckPrivilegeForTableID verifies that the user has `privilege` on the table + // denoted by `tableID`. + CheckPrivilegeForTableID( + ctx context.Context, tableID descpb.ID, privilege privilege.Kind, + ) error + // CheckPrivilege verifies that the user has `privilege` on `descriptor`. CheckPrivilegeForUser( ctx context.Context, privilegeObject privilege.Object, privilege privilege.Kind, user username.SQLUsername, @@ -920,3 +926,9 @@ func insufficientPrivilegeError( "user %s does not have %s privilege on %s %s", user, kind, typeForError, object.GetName()) } + +// IsInsufficientPrivilegeError returns true if the error is a pgerror +// with code pgcode.InsufficientPrivilege. +func IsInsufficientPrivilegeError(err error) bool { + return pgerror.GetPGCode(err) == pgcode.InsufficientPrivilege +} diff --git a/pkg/sql/control_jobs.go b/pkg/sql/control_jobs.go index 0c3b86dd8f05..fc335233b606 100644 --- a/pkg/sql/control_jobs.go +++ b/pkg/sql/control_jobs.go @@ -15,10 +15,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + jobsprivilege "github.com/cockroachdb/cockroach/pkg/jobs/privilege" "github.com/cockroachdb/cockroach/pkg/server/telemetry" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" - "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" - "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/errors" @@ -43,26 +41,6 @@ func (n *controlJobsNode) FastPathResults() (int, bool) { } func (n *controlJobsNode) startExec(params runParams) error { - userIsAdmin, err := params.p.HasAdminRole(params.ctx) - if err != nil { - return err - } - - // users can pause/resume/cancel jobs owned by non-admin users - // if they have CONTROLJOBS privilege. - if !userIsAdmin { - hasControlJob, err := params.p.HasRoleOption(params.ctx, roleoption.CONTROLJOB) - if err != nil { - return err - } - - if !hasControlJob { - return pgerror.Newf(pgcode.InsufficientPrivilege, - "user %s does not have %s privilege", - params.p.User(), roleoption.CONTROLJOB) - } - } - if n.desiredStatus != jobs.StatusPaused && len(n.reason) > 0 { return errors.AssertionFailedf("status %v is not %v and thus does not support a reason %v", n.desiredStatus, jobs.StatusPaused, n.reason) @@ -93,21 +71,10 @@ func (n *controlJobsNode) startExec(params runParams) error { return err } - if job != nil { - owner := job.Payload().UsernameProto.Decode() - - if !userIsAdmin { - ok, err := params.p.UserHasAdminRole(params.ctx, owner) - if err != nil { - return err - } - - // Owner is an admin but user executing the statement is not. - if ok { - return pgerror.Newf(pgcode.InsufficientPrivilege, - "only admins can control jobs owned by other admins") - } - } + payload := job.Payload() + if err := jobsprivilege.Authorize(params.ctx, params.p, + job.ID(), &payload, jobsprivilege.NoFlag); err != nil { + return err } switch n.desiredStatus { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 7acc6f5add92..62dc8c72b893 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + jobsprivilege "github.com/cockroachdb/cockroach/pkg/jobs/privilege" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" @@ -945,16 +946,6 @@ func populateSystemJobsTableRows( matched := false - user := p.User() - userIsAdmin, err := p.UserHasAdminRole(ctx, user) - if err != nil { - return matched, err - } - userHasControlJobRoleOption, err := p.HasRoleOption(ctx, roleoption.CONTROLJOB) - if err != nil { - return matched, err - } - // Note: we query system.jobs as root, so we must be careful about which rows we return. it, err := p.ExtendedEvalContext().ExecCfg.InternalExecutor.QueryIteratorEx(ctx, "system-jobs-scan", @@ -988,25 +979,21 @@ func populateSystemJobsTableRows( } currentRow := it.Cur() + jobID, err := strconv.Atoi(currentRow[jobIdIdx].String()) + if err != nil { + return matched, err + } payloadBytes := currentRow[jobPayloadIdx] payload, err := jobs.UnmarshalPayload(payloadBytes) if err != nil { return matched, wrapPayloadUnMarshalError(err, currentRow[jobIdIdx]) } - jobOwnerUser := payload.UsernameProto.Decode() - jobOwnerIsAdmin, err := p.UserHasAdminRole(ctx, jobOwnerUser) - if err != nil { - return matched, err - } - // The user can access the row if the meet one of the conditions: - // 1. The user is an admin. - // 2. The job is owned by the user. - // 3. The user has CONTROLJOB privilege and the job is not owned by - // an admin. - if canAccess := userIsAdmin || (!jobOwnerIsAdmin && - userHasControlJobRoleOption) || user == jobOwnerUser; !canAccess { - continue + if err := jobsprivilege.Authorize(ctx, p, jobspb.JobID(jobID), payload, jobsprivilege.UserCanAccessOwnJob); err != nil { + if IsInsufficientPrivilegeError(err) { + continue + } + return matched, err } if err := addRow(currentRow...); err != nil { diff --git a/pkg/sql/create_role.go b/pkg/sql/create_role.go index edda320745f3..465083bf8ad5 100644 --- a/pkg/sql/create_role.go +++ b/pkg/sql/create_role.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/decodeusername" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -77,6 +78,10 @@ func (p *planner) CreateRoleNode( return nil, err } + if roleOptions.Contains(roleoption.CONTROLCHANGEFEED) { + p.BufferClientNotice(ctx, pgnotice.Newf(roleoption.ControlChangefeedDeprecationNoticeMsg)) + } + if err := roleOptions.CheckRoleOptionConflicts(); err != nil { return nil, err } diff --git a/pkg/sql/delegate/show_changefeed_jobs.go b/pkg/sql/delegate/show_changefeed_jobs.go index 45c45e294874..748d12738bf1 100644 --- a/pkg/sql/delegate/show_changefeed_jobs.go +++ b/pkg/sql/delegate/show_changefeed_jobs.go @@ -13,7 +13,6 @@ package delegate import ( "fmt" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" ) @@ -33,7 +32,8 @@ WITH payload AS ( payload, false, true )->'changefeed' AS changefeed_details FROM - system.jobs + crdb_internal.system_jobs + WHERE job_type = 'CHANGEFEED' ) SELECT job_id, @@ -70,20 +70,18 @@ FROM ) var whereClause, orderbyClause string - typePredicate := fmt.Sprintf("job_type = '%s'", jobspb.TypeChangefeed) - if n.Jobs == nil { // The query intends to present: // - first all the running jobs sorted in order of start time, // - then all completed jobs sorted in order of completion time. - whereClause = fmt.Sprintf( - `WHERE %s AND (finished IS NULL OR finished > now() - '12h':::interval)`, typePredicate) + whereClause = + `WHERE (finished IS NULL OR finished > now() - '12h':::interval)` // The "ORDER BY" clause below exploits the fact that all // running jobs have finished = NULL. orderbyClause = `ORDER BY COALESCE(finished, now()) DESC, started DESC` } else { // Limit the jobs displayed to the select statement in n.Jobs. - whereClause = fmt.Sprintf(`WHERE %s AND job_id in (%s)`, typePredicate, n.Jobs.String()) + whereClause = fmt.Sprintf(`WHERE job_id in (%s)`, n.Jobs.String()) } sqlStmt := fmt.Sprintf("%s %s %s", selectClause, whereClause, orderbyClause) diff --git a/pkg/sql/logictest/testdata/logic_test/jobs b/pkg/sql/logictest/testdata/logic_test/jobs index b88a951e1892..64c9c89a729e 100644 --- a/pkg/sql/logictest/testdata/logic_test/jobs +++ b/pkg/sql/logictest/testdata/logic_test/jobs @@ -176,7 +176,13 @@ user testuser # testuser should no longer have the ability to control jobs. statement error pq: user testuser does not have CONTROLJOB privilege -PAUSE JOB (SELECT job_id FROM [SHOW JOBS] WHERE user_name = 'testuser2' AND job_type = 'SCHEMA CHANGE GC' AND description LIKE 'GC for DROP%') +PAUSE JOB (SELECT $job_id) + +statement error pq: user testuser does not have CONTROLJOB privilege +CANCEL JOB (SELECT $job_id) + +statement error pq: user testuser does not have CONTROLJOB privilege +RESUME JOB (SELECT $job_id) user root diff --git a/pkg/sql/logictest/testdata/logic_test/run_control b/pkg/sql/logictest/testdata/logic_test/run_control index 3582529fc4aa..25cc0159a3d7 100644 --- a/pkg/sql/logictest/testdata/logic_test/run_control +++ b/pkg/sql/logictest/testdata/logic_test/run_control @@ -98,17 +98,6 @@ CANCEL SESSION 'aaa'::NAME query error odd length hex string CANCEL QUERY 'aaa'::NAME -user testuser - -query error pq: user testuser does not have CONTROLJOB privilege -CANCEL JOB 1 - -query error pq: user testuser does not have CONTROLJOB privilege -PAUSE JOB 1 - -query error pq: user testuser does not have CONTROLJOB privilege -RESUME JOB 1 - user root query T rowsort diff --git a/pkg/sql/planhook.go b/pkg/sql/planhook.go index ab6a2e5cbdfa..a22e699e396d 100644 --- a/pkg/sql/planhook.go +++ b/pkg/sql/planhook.go @@ -13,6 +13,7 @@ package sql import ( "context" + jobsprivilege "github.com/cockroachdb/cockroach/pkg/jobs/privilege" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" @@ -126,6 +127,8 @@ type PlanHookState interface { CreateTenant(ctx context.Context, name roachpb.TenantName) (roachpb.TenantID, error) } +var _ jobsprivilege.AuthorizationAccessor = PlanHookState(nil) + // AddPlanHook adds a hook used to short-circuit creating a planNode from a // tree.Statement. If the func returned by the hook is non-nil, it is used to // construct a planNode that runs that func in a goroutine during Start. diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index c973ccf19cc9..e922b9cc65eb 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -35,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/exprutil" "github.com/cockroachdb/cockroach/pkg/sql/idxusage" "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/querycache" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" @@ -515,6 +516,7 @@ func (p *planner) EvalContext() *eval.Context { return &p.extendedEvalCtx.Context } +// Descriptors implements the PlanHookState interface. func (p *planner) Descriptors() *descs.Collection { return p.extendedEvalCtx.Descs } @@ -602,6 +604,18 @@ func (p *planner) ResolveTableName(ctx context.Context, tn *tree.TableName) (tre return tree.ID(desc.GetID()), nil } +// CheckPrivilegeForTableID implements the AuthorizationAccessor interface. +// Requires a valid transaction to be open. +func (p *planner) CheckPrivilegeForTableID( + ctx context.Context, tableID descpb.ID, privilege privilege.Kind, +) error { + desc, err := p.LookupTableByID(ctx, tableID) + if err != nil { + return err + } + return p.CheckPrivilegeForUser(ctx, desc, privilege, p.User()) +} + // LookupTableByID looks up a table, by the given descriptor ID. Based on the // CommonLookupFlags, it could use or skip the Collection cache. func (p *planner) LookupTableByID( diff --git a/pkg/sql/roleoption/role_option.go b/pkg/sql/roleoption/role_option.go index 0c30d98b06a9..66240bce37e6 100644 --- a/pkg/sql/roleoption/role_option.go +++ b/pkg/sql/roleoption/role_option.go @@ -68,6 +68,13 @@ const ( NOVIEWCLUSTERSETTING ) +// ControlChangefeedDeprecationNoticeMsg is a user friendly notice which should be shown when CONTROLCHANGEFEED is used +// +// TODO(#94757): remove CONTROLCHANGEFEED entirely +const ControlChangefeedDeprecationNoticeMsg = "The role option CONTROLCHANGEFEED will be removed in a future release," + + " please switch to using the CHANGEFEED privilege for target tables instead:" + + " https://www.cockroachlabs.com/docs/stable/create-changefeed.html#required-privileges" + // toSQLStmts is a map of Kind -> SQL statement string for applying the // option to the role. var toSQLStmts = map[Option]string{