Skip to content

Commit

Permalink
changefeedccl: add new fine-grained permissions
Browse files Browse the repository at this point in the history
This change updates permissions semantics related to creating and managing changefeeds.

The `CONTROLCHANGEFEED` role option will be deprecated in the future (see cockroachdb#94757). With this change, usages of `CONTROLCHANGEFEED` will come with a deprecation warning. Its existing behavior (see rules for creating changefeeds below) remains the same.

The `SELECT` and `CHANGEFEED` privileges will be used for changefeeds henceforth:

The `SELECT` privilege on a set of tables allows a user to run core changefeeds against them.

The `CHANGEFEED` privilege on a set of tables allows a user to run enterprise changefeeds on them, and also manage the underlying changefeed job (ie. view, pause, cancel, and resume the job). Notably, a new cluster setting `changefeed.permissions.enforce_external_connections` is added and set to `false` by default. Enabling this setting restricts users with `CHANGEFEED` on a set of tables to create enterprise changefeeds into external connections only. To use a given external connection, a user typically needs the `USAGE` privilege on it.

Note `ALTER DEFAULT PRIVILEGES` can be used with both the `CHANGEFEED` and `SELECT` privileges to assign course-grained permissions (ie. assign permissions to all tables in a schema rather than manually assign them for each table).

Before this change, to create a changefeed, these checks were made in order:
(a) If the user has the `CONTROLCHANGEFEED` role, then they require the `SELECT`privilege
     on all targeted tables
(b) Otherwise, the user requires  the `CHANGEFEED` privilege on all targeted tables
With this change, these checks are updated:
(a) If the user has the `CONTROLCHANGEFEED` role, then they require the `SELECT`
     privilege on all targeted tables. Note: creating a changefeed this way will now produce a
     deprecation notice.
(b) If the changefeed is a core changefeed, they require the `SELECT` privilege on all targeted
     tables
(c) Otherwise, the user requires the `CHANGEFEED` privilege on all targeted tables. Note: If
     `changefeed.permissions.enforce_external_connections` (disabled by default) is set to true,
     then the user will only be able to create a changefeed into an external connection which they
     have the `USAGE` privilege on.

Before this change, to manage a changefeed job `J` (defined a viewing, pausing, resuming, and canceling), a user `U` could do so if they met at least one of the following conditions:
(a) `U` is an admin
(b) `U` is not an admin and `J` is owned by `U`  (only for SHOW JOBS)
(c) `U` is not an admin, `J` is not owned by an admin, and `U` has the `CONTROLJOB` role
With this change, the conditions are updated:
(a) `U` is an admin
(b) `U` is not an admin and `J` is owned by `U`  (only for `SHOW JOBS` or `SHOW
     CHANGEFEED JOBS`)
(c) `U` is not an admin, `J` is not owned by an admin, and `U` has the `CONTROLJOB` role
(d) `U` is not an admin, `J` is not owned by an admin, `J` is a changefeed job, and `U` has
     the `CHANGEFEED` privilege on targeted tables

Before this change, permissions related to altering changefeeds with `ALTER CHANGEFEED` were not explicitly defined (we did not have tests to assert its behavior, but there were some permissions checks regardless). Basically, a user needed access to view a job (ie. look up it’s job ID via `SHOW JOBS`) and they needed to be able to create a new job. After all, `ALTER CHANGEFEED` is essentially the same as creating a new job after stopping the old one.

With this change, the same rules apply: the user needs to be able to access the existing job and to be able to create a new changefeed with the new rules introduced in this change respectively.

Fixes: cockroachdb#94756
Fixes: cockroachdb#92261
Fixes: cockroachdb#87884
Fixes: cockroachdb#85082
Informs: cockroachdb#94759
Informs: cockroachdb#94757
Epic: CRDB-21508
Epic: CRDB-19709

Release note (enterprise change):

The `CONTROLCHANGEFEED` role option will be deprecated in the future (see cockroachdb#94757). With this change, usages of `CONTROLCHANGEFEED` will come with a deprecation warning. Its existing behavior (see rules for creating changefeeds above) remains the same.

The `SELECT` and `CHANGEFEED` privileges will be used for changefeeds henceforth:

The `SELECT` privilege on a set of tables allows a user to run core changefeeds against them.

The `CHANGEFEED` privilege on a set of tables allows a user to run enterprise changefeeds on them, and also manage the underlying changefeed job (ie. view, pause, cancel, and resume the job). Notably, a new cluster setting `changefeed.permissions.enforce_external_connections` is added and set to `false` by default. Enabling this setting restricts users with `CHANGEFEED` on a set of tables to create enterprise changefeeds into external connections only. To use a given external connection, a user typically needs the `USAGE` privilege on it.

Note `ALTER DEFAULT PRIVILEGES` can be used with both both the `CHANGEFEED` and `SELECT` privileges to assign coarse-grained permissions (ie. assign permissions to all tables in a schema rather than manually assign them for each table).
  • Loading branch information
jayshrivastava committed Jan 9, 2023
1 parent 7317b25 commit f8607e3
Show file tree
Hide file tree
Showing 24 changed files with 950 additions and 182 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ go_library(
"//pkg/sql/sem/volatility",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlutil",
"//pkg/sql/syntheticprivilege",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/admission",
Expand Down Expand Up @@ -234,6 +235,7 @@ go_test(
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descbuilder",
"//pkg/sql/catalog/descpb",
Expand Down
34 changes: 32 additions & 2 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ func alterChangefeedPlanHook(
return err
}

jobPayload := job.Payload()

// Having control job is not enough to allow them to modify the changefeed.
canAccess, userErr, err := sql.JobTypeSpecificPrivilegeCheck(ctx, p, jobID, &jobPayload, false)
if err != nil {
return err
}
if !canAccess {
return userErr
}

prevDetails, ok := job.Details().(jobspb.ChangefeedDetails)
if !ok {
return errors.Errorf(`job %d is not changefeed job`, jobID)
Expand All @@ -123,11 +134,12 @@ func alterChangefeedPlanHook(
return err
}

newTargets, newProgress, newStatementTime, originalSpecs, err := generateNewTargets(
newTargets, newProgress, newStatementTime, originalSpecs, err := generateAndValidateNewTargets(
ctx, exprEval, p,
alterChangefeedStmt.Cmds,
newOptions.AsMap(), // TODO: Remove .AsMap()
prevDetails, job.Progress(),
newSinkURI,
)
if err != nil {
return err
Expand Down Expand Up @@ -320,14 +332,15 @@ func generateNewOpts(
return changefeedbase.MakeStatementOptions(newOptions), sinkURI, nil
}

func generateNewTargets(
func generateAndValidateNewTargets(
ctx context.Context,
exprEval exprutil.Evaluator,
p sql.PlanHookState,
alterCmds tree.AlterChangefeedCmds,
opts map[string]string,
prevDetails jobspb.ChangefeedDetails,
prevProgress jobspb.Progress,
sinkURI string,
) (
tree.ChangefeedTargets,
*jobspb.Progress,
Expand Down Expand Up @@ -490,6 +503,7 @@ func generateNewTargets(
existingTargetSpans := fetchSpansForDescs(p, existingTargetDescs)
var newTargetDescs []catalog.Descriptor
for _, target := range v.Targets {

desc, found, err := getTargetDesc(ctx, p, descResolver, target.TableName)
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
Expand All @@ -501,6 +515,7 @@ func generateNewTargets(
tree.ErrString(&target),
)
}

k := targetKey{TableID: desc.GetID(), FamilyName: target.FamilyName}
newTargets[k] = target
newTableDescs[desc.GetID()] = desc
Expand Down Expand Up @@ -546,6 +561,7 @@ func generateNewTargets(
tree.ErrString(&target),
)
}
newTableDescs[desc.GetID()] = desc
delete(newTargets, k)
}
telemetry.CountBucketed(telemetryPath+`.dropped_targets`, int64(len(v.Targets)))
Expand Down Expand Up @@ -580,6 +596,20 @@ func generateNewTargets(
newTargetList = append(newTargetList, target)
}

hasSelectPrivOnAllTables := true
hasChangefeedPrivOnAllTables := true
for _, desc := range newTableDescs {
hasSelect, hasChangefeed, err := checkPrivilegesForDescriptor(ctx, p, desc)
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}
hasSelectPrivOnAllTables = hasSelectPrivOnAllTables && hasSelect
hasChangefeedPrivOnAllTables = hasChangefeedPrivOnAllTables && hasChangefeed
}
if err := verifyUserCanCreateChangefeed(ctx, p, sinkURI, hasSelectPrivOnAllTables, hasChangefeedPrivOnAllTables); err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}

if err := validateNewTargets(ctx, p, newTargetList, newJobProgress, newJobStatementTime); err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}
Expand Down
225 changes: 225 additions & 0 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,25 @@ package changefeedccl

import (
"context"
gosql "database/sql"
"fmt"
"net/url"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
Expand All @@ -40,6 +46,155 @@ import (
"github.com/stretchr/testify/require"
)

// TestAlterChangefeedAddTargetPrivileges tests permissions for
// users creating new changefeeds while altering them.
func TestAlterChangefeedAddTargetPrivileges(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

srv, db, _ := serverutils.StartServer(t, base.TestServerArgs{
DisableDefaultTestTenant: true,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
DistSQL: &execinfra.TestingKnobs{
Changefeed: &TestingKnobs{
WrapSink: func(s Sink, _ jobspb.JobID) Sink {
if _, ok := s.(*externalConnectionKafkaSink); ok {
return s
}
return &externalConnectionKafkaSink{sink: s}
},
},
},
},
})
ctx := context.Background()
s := srv.(*server.TestServer)
defer s.Stopper().Stop(ctx)

rootDB := sqlutils.MakeSQLRunner(db)
rootDB.Exec(t, `CREATE TYPE type_a as enum ('a')`)
rootDB.Exec(t, `CREATE TABLE table_a (id int, type type_a)`)
rootDB.Exec(t, `CREATE TABLE table_b (id int, type type_a)`)
rootDB.Exec(t, `CREATE TABLE table_c (id int, type type_a)`)
rootDB.Exec(t, `CREATE USER feedCreator`)
rootDB.Exec(t, `GRANT SELECT ON table_a TO feedCreator`)
rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO feedCreator`)
rootDB.Exec(t, `CREATE EXTERNAL CONNECTION "first" AS 'kafka://nope'`)
rootDB.Exec(t, `GRANT USAGE ON EXTERNAL CONNECTION first TO feedCreator`)
rootDB.Exec(t, `INSERT INTO table_a(id) values (0)`)

rootDB.Exec(t, `SET CLUSTER SETTING kv.rangefeed.enabled = true`)
enableEnterprise := utilccl.TestingDisableEnterprise()
enableEnterprise()

withUser := func(t *testing.T, user string, fn func(*sqlutils.SQLRunner)) {
password := `password`
rootDB.Exec(t, fmt.Sprintf(`ALTER USER %s WITH PASSWORD '%s'`, user, password))

pgURL := url.URL{
Scheme: "postgres",
User: url.UserPassword(user, password),
Host: s.SQLAddr(),
}
db2, err := gosql.Open("postgres", pgURL.String())
if err != nil {
t.Fatal(err)
}
defer db2.Close()
userDB := sqlutils.MakeSQLRunner(db2)

fn(userDB)
}

t.Run("using-changefeed-grant", func(t *testing.T) {
rootDB.Exec(t, `CREATE EXTERNAL CONNECTION "second" AS 'kafka://nope'`)
rootDB.Exec(t, `CREATE USER user1`)
rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO user1`)

var jobID int
withUser(t, "feedCreator", func(userDB *sqlutils.SQLRunner) {
row := userDB.QueryRow(t, "CREATE CHANGEFEED for table_a INTO 'external://first'")
row.Scan(&jobID)
userDB.Exec(t, `PAUSE JOB $1`, jobID)
waitForJobStatus(userDB, t, catpb.JobID(jobID), `paused`)
})

// user1 is missing the CHANGEFEED privilege on table_b and table_c.
withUser(t, "user1", func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t,
"user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed",
fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID),
)
})
rootDB.Exec(t, `GRANT CHANGEFEED ON table_b TO user1`)
withUser(t, "user1", func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t,
"user user1 requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed",
fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID),
)
})
rootDB.Exec(t, `GRANT CHANGEFEED ON table_c TO user1`)
withUser(t, "user1", func(userDB *sqlutils.SQLRunner) {
userDB.Exec(t,
fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID),
)
})

// With require_external_connection_sink enabled, the user requires USAGE on the external connection.
rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.require_external_connection_sink = true")
withUser(t, "user1", func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t,
"user user1 does not have USAGE privilege on external_connection second",
fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID),
)
})
rootDB.Exec(t, `GRANT USAGE ON EXTERNAL CONNECTION second TO user1`)
withUser(t, "user1", func(userDB *sqlutils.SQLRunner) {
userDB.Exec(t,
fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='external://second'", jobID),
)
})
rootDB.Exec(t, "SET CLUSTER SETTING changefeed.permissions.require_external_connection_sink = false")
})

// TODO(#94757): remove CONTROLCHANGEFEED entirely
t.Run("using-controlchangefeed-roleoption", func(t *testing.T) {
rootDB.Exec(t, `CREATE USER user2 WITH CONTROLCHANGEFEED`)
rootDB.Exec(t, `GRANT CHANGEFEED ON table_a TO user2`)
rootDB.Exec(t, `GRANT SELECT ON table_a TO user2`)

var jobID int
withUser(t, "feedCreator", func(userDB *sqlutils.SQLRunner) {
row := userDB.QueryRow(t, "CREATE CHANGEFEED for table_a INTO 'kafka://foo'")
row.Scan(&jobID)
userDB.Exec(t, `PAUSE JOB $1`, jobID)
waitForJobStatus(userDB, t, catpb.JobID(jobID), `paused`)
})

// user2 is missing the SELECT privilege on table_b and table_c.
withUser(t, "user2", func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t,
"pq: user user2 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed",
fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID),
)
})
rootDB.Exec(t, `GRANT SELECT ON table_b TO user2`)
withUser(t, "user2", func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t,
"pq: user user2 with CONTROLCHANGEFEED role option requires the SELECT privilege on all target tables to be able to run an enterprise changefeed",
fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID),
)
})
rootDB.Exec(t, `GRANT SELECT ON table_c TO user2`)
withUser(t, "user2", func(userDB *sqlutils.SQLRunner) {
userDB.Exec(t,
fmt.Sprintf("ALTER CHANGEFEED %d ADD table_b, table_c set sink='kafka://bar'", jobID),
)
})
})
}

func TestAlterChangefeedAddTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1419,3 +1574,73 @@ func TestAlterChangefeedWithOldCursorFromCreateChangefeed(t *testing.T) {

cdcTest(t, testFn, feedTestEnterpriseSinks)
}

// TestChangefeedJobControl tests if a user can modify and existing changefeed
// based on their privileges.
func TestAlterChangefeedAccessControl(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
ChangefeedJobPermissionsTestSetup(t, s)
rootDB := sqlutils.MakeSQLRunner(s.DB)

createFeed := func(stmt string) (cdctest.EnterpriseTestFeed, func()) {
successfulFeed := feed(t, f, stmt)
closeCf := func() {
closeFeed(t, successfulFeed)
}
_, err := successfulFeed.Next()
require.NoError(t, err)
return successfulFeed.(cdctest.EnterpriseTestFeed), closeCf
}

// Create a changefeed and pause it.
var currentFeed cdctest.EnterpriseTestFeed
var closeCf func()
asUser(t, f, `feedCreator`, func(_ *sqlutils.SQLRunner) {
currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`)
})
rootDB.Exec(t, "PAUSE job $1", currentFeed.JobID())
waitForJobStatus(rootDB, t, currentFeed.JobID(), `paused`)

// Verify who can modify the existing changefeed.
asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) {
userDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d DROP table_b`, currentFeed.JobID()))
})
asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) {
userDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID()))
})
// jobController can access the job, but will hit an error re-creating the changefeed.
asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t, "pq: user jobcontroller requires the CHANGEFEED privilege on all target tables to be able to run an enterprise changefeed", fmt.Sprintf(`ALTER CHANGEFEED %d DROP table_b`, currentFeed.JobID()))
})
asUser(t, f, `userWithSomeGrants`, func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t, "pq: user userwithsomegrants does not have CHANGEFEED privilege on relation table_b", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID()))
})
asUser(t, f, `regularUser`, func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t, "pq: user regularuser does not have CHANGEFEED privilege on relation (table_a|table_b)", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID()))
})
closeCf()

// No one can modify changefeeds created by admins, except for admins.
// In this case, the root user creates the changefeed.
currentFeed, closeCf = createFeed(`CREATE CHANGEFEED FOR table_a, table_b`)
asUser(t, f, `adminUser`, func(userDB *sqlutils.SQLRunner) {
userDB.Exec(t, "PAUSE job $1", currentFeed.JobID())
require.NoError(t, currentFeed.WaitForStatus(func(s jobs.Status) bool {
return s == jobs.StatusPaused
}))
})
asUser(t, f, `userWithAllGrants`, func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID()))
})
asUser(t, f, `jobController`, func(userDB *sqlutils.SQLRunner) {
userDB.ExpectErr(t, "pq: only admins can control jobs owned by other admins", fmt.Sprintf(`ALTER CHANGEFEED %d ADD table_b`, currentFeed.JobID()))
})
closeCf()
}

// Only enterprise sinks create jobs.
cdcTest(t, testFn, feedTestEnterpriseSinks)
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/sql/catalog",
"//pkg/sql/catalog/descs",
"//pkg/sql/sem/tree",
"//pkg/testutils/sqlutils",
"//pkg/util",
"//pkg/util/fsm",
"//pkg/util/hlc",
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/cdctest/testfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

Expand All @@ -21,12 +22,11 @@ type TestFeedFactory interface {
// Feed creates a new TestFeed.
Feed(create string, args ...interface{}) (TestFeed, error)

// AsUser connects to the database as the specified user,
// calls fn(), then goes back to using the same root
// connection. Will return an error if the initial connection
// to the database fails, but fn is responsible for failing
// the test on other errors.
AsUser(user string, fn func()) error
// AsUser connects to the database as the specified user, calls fn() with the
// user's connection, then goes back to using the same root connection. Will
// return an error if the initial connection to the database fails, but fn is
// responsible for failing the test on other errors.
AsUser(user string, fn func(runner *sqlutils.SQLRunner)) error
}

// TestFeedMessage represents one row update or resolved timestamp message from
Expand Down
Loading

0 comments on commit f8607e3

Please sign in to comment.