Skip to content

Commit

Permalink
sql/schemachanger: create function declarative
Browse files Browse the repository at this point in the history
This commits implements CREATE FUNCTION statements in the
declarative schema changer.

It introduces a few new things:
(1) A new `DESCRIPTOR_ADDING` state for a newly created descritpor.
In this state the descriptor is visible only in a cache of
`mutationVisitorState` so that other operations in the same statement
can see the new descriptor. At this state, descriptor is not written
to collection and storage yet.
(2) Introduce operations to update `ObjectParent`, `Owner` and
`UserPrivileges` elements.
(3) add helper to able to create user privileges from default privileges
of database and schemas.

The rest of the commit is mainly implementing the business logic.

Release note: None
  • Loading branch information
chengxiong-ruan committed Feb 1, 2023
1 parent 58f4bb3 commit e0648f0
Show file tree
Hide file tree
Showing 103 changed files with 6,671 additions and 240 deletions.
10 changes: 10 additions & 0 deletions pkg/ccl/schemachangerccl/backup_base_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions pkg/ccl/schemachangerccl/schemachanger_ccl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/sctest"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

func newCluster(t *testing.T, knobs *scexec.TestingKnobs) (*gosql.DB, func()) {
_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
func newCluster(
t *testing.T, knobs *scexec.TestingKnobs,
) (serverutils.TestServerInterface, *gosql.DB, func()) {
c, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, base.TestingKnobs{
SQLDeclarativeSchemaChanger: knobs,
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Expand All @@ -34,7 +37,7 @@ func newCluster(t *testing.T, knobs *scexec.TestingKnobs) (*gosql.DB, func()) {
},
},
)
return sqlDB, cleanup
return c.Server(0), sqlDB, cleanup
}

func TestDecomposeToElements(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ upsert descriptor #104
+ statementTag: DROP DATABASE
+ targetRanks: <redacted>
+ targets: <redacted>
defaultPrivileges: {}
id: 104
modificationTime: {}
...
regionEnumId: 106
survivalGoal: REGION_FAILURE
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ go_test(
"//pkg/sql/rowenc/valueside",
"//pkg/sql/rowexec",
"//pkg/sql/rowinfra",
"//pkg/sql/schemachanger/scexec",
"//pkg/sql/scrub",
"//pkg/sql/scrub/scrubtestutils",
"//pkg/sql/sem/builtins",
Expand Down
39 changes: 27 additions & 12 deletions pkg/sql/catalog/catprivilege/default_privilege.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,40 +412,55 @@ func applyDefaultPrivileges(
grantOptionList privilege.List,
) {
userPriv := p.FindOrCreateUser(user)
if privilege.ALL.IsSetIn(userPriv.WithGrantOption) && privilege.ALL.IsSetIn(userPriv.Privileges) {
newUserPrivs, newUserGrantOptions := ApplyDefaultPrivileges(
userPriv.Privileges, userPriv.WithGrantOption, privList, grantOptionList,
)
userPriv.Privileges = newUserPrivs
userPriv.WithGrantOption = newUserGrantOptions
}

func ApplyDefaultPrivileges(
userPrivs uint64,
userGrantOptions uint64,
defaultPrivs privilege.List,
defaultGrantOptions privilege.List,
) (newUserPrivs uint64, newUserGrantOptions uint64) {
if privilege.ALL.IsSetIn(userGrantOptions) && privilege.ALL.IsSetIn(userPrivs) {
// User already has 'ALL' privilege: no-op.
// If userPriv.WithGrantOption has ALL, then userPriv.Privileges must also have ALL.
// It is possible however for userPriv.Privileges to have ALL but userPriv.WithGrantOption to not have ALL
return
return userPrivs, userGrantOptions
}

privBits := privList.ToBitField()
grantBits := grantOptionList.ToBitField()
privBits := defaultPrivs.ToBitField()
grantBits := defaultGrantOptions.ToBitField()

// Should not be possible for a privilege to be in grantOptionList that is not in privList.
if !privilege.ALL.IsSetIn(privBits) {
for _, grantOption := range grantOptionList {
for _, grantOption := range defaultGrantOptions {
if privBits&grantOption.Mask() == 0 {
return
return userPrivs, userGrantOptions
}
}
}

if privilege.ALL.IsSetIn(privBits) {
userPriv.Privileges = privilege.ALL.Mask()
userPrivs = privilege.ALL.Mask()
} else {
if !privilege.ALL.IsSetIn(userPriv.Privileges) {
userPriv.Privileges |= privBits
if !privilege.ALL.IsSetIn(userPrivs) {
userPrivs |= privBits
}
}

if privilege.ALL.IsSetIn(grantBits) {
userPriv.WithGrantOption = privilege.ALL.Mask()
userGrantOptions = privilege.ALL.Mask()
} else {
if !privilege.ALL.IsSetIn(userPriv.WithGrantOption) {
userPriv.WithGrantOption |= grantBits
if !privilege.ALL.IsSetIn(userGrantOptions) {
userGrantOptions |= grantBits
}
}

return userPrivs, userGrantOptions
}

func setRoleHasAllOnTargetObject(
Expand Down
7 changes: 1 addition & 6 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,12 +955,7 @@ func FilterAddingDescriptor(desc Descriptor) error {
if !desc.Adding() {
return nil
}
// For the time being, only table descriptors can be in the adding state.
tbl, err := AsTableDescriptor(desc)
if err != nil {
return errors.HandleAsAssertionFailure(err)
}
return pgerror.WithCandidateCode(newAddingTableError(tbl), pgcode.ObjectNotInPrerequisiteState)
return pgerror.WithCandidateCode(newAddingDescriptorError(desc), pgcode.ObjectNotInPrerequisiteState)
}

// TableLookupFn is used to resolve a table from an ID, particularly when
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/catalog/descriptor_id_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,8 @@ func (d *DescriptorIDSet) Remove(id descpb.ID) {
func (d DescriptorIDSet) Difference(o DescriptorIDSet) DescriptorIDSet {
return DescriptorIDSet{set: d.set.Difference(o.set)}
}

// Union returns the union of d and o as a new set.
func (d DescriptorIDSet) Union(o DescriptorIDSet) DescriptorIDSet {
return DescriptorIDSet{set: d.set.Union(o.set)}
}
29 changes: 18 additions & 11 deletions pkg/sql/catalog/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,30 @@ type inactiveDescriptorError struct {
// Only tables (or materialized view) can be in the adding state, and this will
// be true for the foreseeable future, so the error message remains a
// table-specific version.
type addingTableError struct {
type addingDescriptorError struct {
cause error
}

func newAddingTableError(desc TableDescriptor) error {
typStr := "table"
if desc.IsView() && desc.IsPhysicalTable() {
typStr = "materialized view"
func newAddingDescriptorError(desc Descriptor) error {
var typStr string
desc.DescriptorType()
switch t := desc.(type) {
case TableDescriptor:
typStr = "table"
if t.IsView() && t.IsPhysicalTable() {
typStr = "materialized view"
}
default:
typStr = string(desc.DescriptorType())
}
return &addingTableError{
return &addingDescriptorError{
cause: errors.Errorf("%s %q is being added", typStr, desc.GetName()),
}
}

func (a *addingTableError) Error() string { return a.cause.Error() }
func (a *addingDescriptorError) Error() string { return a.cause.Error() }

func (a *addingTableError) Unwrap() error { return a.cause }
func (a *addingDescriptorError) Unwrap() error { return a.cause }

// ErrDescriptorDropped is returned when the descriptor is being dropped.
// TODO (lucy): Make the error message specific to each descriptor type (e.g.,
Expand All @@ -62,9 +69,9 @@ func (i *inactiveDescriptorError) Error() string { return i.cause.Error() }

func (i *inactiveDescriptorError) Unwrap() error { return i.cause }

// HasAddingTableError returns true if the error contains errTableAdding.
func HasAddingTableError(err error) bool {
return errors.HasType(err, (*addingTableError)(nil))
// HasAddingDescriptorError returns true if the error contains errTableAdding.
func HasAddingDescriptorError(err error) bool {
return errors.HasType(err, (*addingDescriptorError)(nil))
}

// NewInactiveDescriptorError wraps an error in a new inactiveDescriptorError.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/funcdesc/func_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (desc *immutable) Public() bool {

// Adding implements the catalog.Descriptor interface.
func (desc *immutable) Adding() bool {
return false
return desc.State == descpb.DescriptorState_ADD
}

// Dropped implements the catalog.Descriptor interface.
Expand Down
66 changes: 62 additions & 4 deletions pkg/sql/create_function_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
Expand All @@ -43,6 +44,7 @@ func TestCreateFunction(t *testing.T) {
tDB := sqlutils.MakeSQLRunner(sqlDB)

tDB.Exec(t, `
SET use_declarative_schema_changer = 'on';
CREATE TABLE t(
a INT PRIMARY KEY,
b INT,
Expand All @@ -54,16 +56,20 @@ CREATE SEQUENCE sq1;
CREATE TABLE t2(a INT PRIMARY KEY);
CREATE VIEW v AS SELECT a FROM t2;
CREATE TYPE notmyworkday AS ENUM ('Monday', 'Tuesday');
`,
)

tDB.Exec(t, `
CREATE FUNCTION f(a notmyworkday) RETURNS INT IMMUTABLE LANGUAGE SQL AS $$
SELECT a FROM t;
SELECT b FROM t@t_idx_b;
SELECT c FROM t@t_idx_c;
SELECT a FROM v;
SELECT nextval('sq1');
$$;
CREATE SCHEMA test_sc;
`,
)
`)

tDB.Exec(t, `CREATE SCHEMA test_sc;`)

err := sql.TestingDescsTxn(ctx, s, func(ctx context.Context, txn isql.Txn, col *descs.Collection) error {
funcDesc, err := col.ByIDWithLeased(txn.KV()).WithoutNonPublic().Get().Function(ctx, 110)
Expand Down Expand Up @@ -146,7 +152,7 @@ SELECT nextval(105:::REGCLASS);`,
require.NoError(t, err)
}

func TestCreateFunctionGating(t *testing.T) {
func TestGatingCreateFunction(t *testing.T) {
defer leaktest.AfterTest(t)()

t.Run("new_schema_changer_version_enabled", func(t *testing.T) {
Expand Down Expand Up @@ -326,3 +332,55 @@ SELECT nextval(107:::REGCLASS);`,
})
require.NoError(t, err)
}

func TestCreateFunctionVisibilityInExplicitTransaction(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testingKnob := &scexec.TestingKnobs{}
ctx := context.Background()
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLDeclarativeSchemaChanger: testingKnob,
},
})
defer s.Stopper().Stop(ctx)
tDB := sqlutils.MakeSQLRunner(sqlDB)

tDB.Exec(t, `SET use_declarative_schema_changer = 'unsafe_always'`)
tDB.Exec(t, `CREATE TABLE t (a INT PRIMARY KEY, b INT NOT NULL)`)
tDB.Exec(t, `INSERT INTO t VALUES (1,1), (2,1)`)

// Make sure that everything is rolled back if post commit job fails.
_, err := sqlDB.Exec(`
BEGIN;
CREATE FUNCTION f() RETURNS INT LANGUAGE SQL AS $$ SELECT 1 $$;
CREATE UNIQUE INDEX idx ON t(b);
COMMIT;
`)
require.Error(t, err, "")
require.Contains(t, err.Error(), "transaction committed but schema change aborted")
_, err = sqlDB.Exec(`SELECT f()`)
require.Error(t, err, "")
require.Contains(t, err.Error(), "unknown function: f(): function undefined")

// Make data valid for the unique index so that the job won't fail.
tDB.Exec(t, `DELETE FROM t WHERE a = 2`)

// Make sure function cannot be used before job completes.
testingKnob.RunBeforeBackfill = func() error {
_, err = sqlDB.Exec(`SELECT f()`)
require.Error(t, err, "")
require.Contains(t, err.Error(), `function "f" is being added`)
return nil
}

//tDB.Exec(t, `SET CLUSTER SETTING jobs.debug.pausepoints='newschemachanger.before.exec'`)
_, err = sqlDB.Exec(`
BEGIN;
CREATE FUNCTION f() RETURNS INT LANGUAGE SQL AS $$ SELECT 1 $$;
CREATE UNIQUE INDEX idx ON t(b);
COMMIT;
`)
tDB.Exec(t, `SELECT f()`)
}
11 changes: 10 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/udf
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CREATE TABLE ab (
statement error pq: unimplemented: user-defined functions with SETOF return types are not supported
CREATE FUNCTION f(a int) RETURNS SETOF INT LANGUAGE SQL AS 'SELECT 1'

statement error pq: cannot set leakproof on function with non-immutable volatility: STABLE
statement error .*cannot set leakproof on function with non-immutable volatility: STABLE.*
CREATE FUNCTION f(a int) RETURNS INT LEAKPROOF STABLE LANGUAGE SQL AS 'SELECT 1'

statement error pq: return type mismatch in function declared to return int\nDETAIL: Actual return type is string
Expand Down Expand Up @@ -1788,6 +1788,9 @@ DELETE FROM system.eventlog;
statement ok
CREATE FUNCTION f_test_log() RETURNS INT LANGUAGE SQL AS $$ SELECT 1 $$;

# TODO(chengxiong): remove this test condition when event logging is moved build
# time in declarative schema changer.
onlyif config local-legacy-schema-changer
query TTTT retry
WITH tmp AS (
SELECT "eventType" AS etype, info::JSONB AS info_json
Expand All @@ -1801,6 +1804,7 @@ create_function 203 "test.public.f_test_log" "CREATE FUNCTION test.public.f_t
statement ok
CREATE OR REPLACE FUNCTION f_test_log() RETURNS INT LANGUAGE SQL AS $$ SELECT 2 $$;

onlyif config local-legacy-schema-changer
query TTTT retry
WITH tmp AS (
SELECT "eventType" AS etype, info::JSONB AS info_json
Expand All @@ -1815,6 +1819,7 @@ create_function 203 "test.public.f_test_log" "CREATE OR REPLACE FUNCTION test
statement ok
ALTER FUNCTION f_test_log RENAME TO f_test_log_new;

onlyif config local-legacy-schema-changer
query TTTTT retry
WITH tmp AS (
SELECT "eventType" AS etype, info::JSONB AS info_json
Expand All @@ -1831,6 +1836,7 @@ ALTER FUNCTION f_test_log_new RENAME TO f_test_log;
statement ok
ALTER FUNCTION f_test_log OWNER TO u_test_event;

onlyif config local-legacy-schema-changer
query TTTTT retry
WITH tmp AS (
SELECT "eventType" AS etype, info::JSONB AS info_json
Expand All @@ -1844,6 +1850,7 @@ alter_function_owner 203 "test.public.f_test_log" "u_test_event" "ALTER FUNC
statement ok
ALTER FUNCTION f_test_log SET SCHEMA sc_test_event;

onlyif config local-legacy-schema-changer
query TTTTT retry
WITH tmp AS (
SELECT "eventType" AS etype, info::JSONB AS info_json
Expand All @@ -1859,6 +1866,7 @@ ALTER FUNCTION sc_test_event.f_test_log SET SCHEMA public;
ALTER FUNCTION f_test_log IMMUTABLE;
DROP FUNCTION f_test_log;

onlyif config local-legacy-schema-changer
query TTTT retry
WITH tmp AS (
SELECT "eventType" AS etype, info::JSONB AS info_json
Expand All @@ -1869,6 +1877,7 @@ SELECT etype, info_json->'DescriptorID', info_json->'FunctionName', info_json->'
----
alter_function_options 203 "test.public.f_test_log" "ALTER FUNCTION \"\".\"\".f_test_log IMMUTABLE"

onlyif config local-legacy-schema-changer
query TTTT retry
WITH tmp AS (
SELECT "eventType" AS etype, info::JSONB AS info_json
Expand Down
Loading

0 comments on commit e0648f0

Please sign in to comment.