From 032a3f7519fa809d5f593a7e6bd8962d0339d092 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Sat, 24 Jun 2023 00:19:53 -0400 Subject: [PATCH 1/5] sqltelemetry: add missing schema telemetry CREATE [ SCHEMA | INDEX | FUNCTION | TYPE ] and ALTER FUNCTION did not have any telemetry, but they should. Release note: None --- .../create_index/create_index.side_effects | 1 + pkg/sql/alter_function.go | 7 +++ pkg/sql/create_function.go | 5 +++ pkg/sql/create_type.go | 2 + .../internal/scbuildstmt/create_function.go | 1 + .../internal/scbuildstmt/create_index.go | 1 + .../internal/scbuildstmt/create_schema.go | 1 + .../internal/scbuildstmt/dependencies.go | 4 ++ pkg/sql/schemachanger/scdeps/build_deps.go | 6 +++ .../scdeps/sctestdeps/test_deps.go | 6 +++ .../create_function.side_effects | 1 + .../create_function_in_txn.side_effects | 2 + .../create_index/create_index.side_effects | 1 + ...te_schema_separate_statements.side_effects | 2 + .../create_schema/create_schema.side_effects | 1 + ...op_schema_separate_statements.side_effects | 2 + ...ate_index_separate_statements.side_effects | 1 + pkg/sql/testdata/telemetry/schema | 45 +++++++++++++++++++ 18 files changed, 89 insertions(+) diff --git a/pkg/ccl/schemachangerccl/testdata/end_to_end/create_index/create_index.side_effects b/pkg/ccl/schemachangerccl/testdata/end_to_end/create_index/create_index.side_effects index d431d285a633..7b5a580fb895 100644 --- a/pkg/ccl/schemachangerccl/testdata/end_to_end/create_index/create_index.side_effects +++ b/pkg/ccl/schemachangerccl/testdata/end_to_end/create_index/create_index.side_effects @@ -13,6 +13,7 @@ CREATE INDEX id1 begin transaction #1 # begin StatementPhase checking for feature: CREATE INDEX +increment telemetry for sql.schema.create_index write *eventpb.CreateIndex to event log: indexName: id1 mutationId: 1 diff --git a/pkg/sql/alter_function.go b/pkg/sql/alter_function.go index 1a742d384dd1..3fa2bcb95a69 100644 --- a/pkg/sql/alter_function.go +++ b/pkg/sql/alter_function.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc" @@ -22,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" @@ -63,6 +65,8 @@ func (p *planner) AlterFunctionOptions( } func (n *alterFunctionOptionsNode) startExec(params runParams) error { + telemetry.Inc(sqltelemetry.SchemaChangeAlterCounter("function")) + fnDesc, err := params.p.mustGetMutableFunctionForAlter(params.ctx, &n.n.Function) if err != nil { return err @@ -147,6 +151,7 @@ func (p *planner) AlterFunctionRename( } func (n *alterFunctionRenameNode) startExec(params runParams) error { + telemetry.Inc(sqltelemetry.SchemaChangeAlterCounter("function")) // TODO(chengxiong): add validation that a function can not be altered if it's // referenced by other objects. This is needed when want to allow function // references. @@ -220,6 +225,7 @@ func (p *planner) AlterFunctionSetOwner( } func (n *alterFunctionSetOwnerNode) startExec(params runParams) error { + telemetry.Inc(sqltelemetry.SchemaChangeAlterCounter("function")) fnDesc, err := params.p.mustGetMutableFunctionForAlter(params.ctx, &n.n.Function) if err != nil { return err @@ -280,6 +286,7 @@ func (p *planner) AlterFunctionSetSchema( } func (n *alterFunctionSetSchemaNode) startExec(params runParams) error { + telemetry.Inc(sqltelemetry.SchemaChangeAlterCounter("function")) // TODO(chengxiong): add validation that a function can not be altered if it's // referenced by other objects. This is needed when want to allow function // references. diff --git a/pkg/sql/create_function.go b/pkg/sql/create_function.go index e0b9a147d9cf..a80b194db6aa 100644 --- a/pkg/sql/create_function.go +++ b/pkg/sql/create_function.go @@ -15,6 +15,7 @@ import ( "fmt" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catprivilege" @@ -28,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" + "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" @@ -68,6 +70,9 @@ func (n *createFunctionNode) startExec(params runParams) error { if scDesc.SchemaKind() == catalog.SchemaTemporary { return unimplemented.NewWithIssue(104687, "cannot create UDFs under a temporary schema") } + + telemetry.Inc(sqltelemetry.SchemaChangeCreateCounter("function")) + mutScDesc, err := params.p.descCollection.MutableByName(params.p.Txn()).Schema(params.ctx, n.dbDesc, n.scDesc.GetName()) if err != nil { return err diff --git a/pkg/sql/create_type.go b/pkg/sql/create_type.go index 383b524190e8..83bd3910171b 100644 --- a/pkg/sql/create_type.go +++ b/pkg/sql/create_type.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catprivilege" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -80,6 +81,7 @@ func (p *planner) CreateType(ctx context.Context, n *tree.CreateType) (planNode, } func (n *createTypeNode) startExec(params runParams) error { + telemetry.Inc(sqltelemetry.SchemaChangeCreateCounter("type")) // Check if a type with the same name exists already. g := params.p.Descriptors().ByName(params.p.Txn()).MaybeGet() _, typ, err := descs.PrefixAndType(params.ctx, g, n.typeName) diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_function.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_function.go index e484b64bcedb..7a24cf3ea076 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_function.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_function.go @@ -28,6 +28,7 @@ func CreateFunction(b BuildCtx, n *tree.CreateFunction) { if n.Replace { panic(scerrors.NotImplementedError(n)) } + b.IncrementSchemaChangeCreateCounter("function") dbElts, scElts := b.ResolvePrefix(n.FuncName.ObjectNamePrefix, privilege.CREATE) _, _, sc := scpb.FindSchema(scElts) diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_index.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_index.go index 15f3ed781b3e..c9ec566fe59e 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_index.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_index.go @@ -47,6 +47,7 @@ import ( // CreateIndex implements CREATE INDEX. func CreateIndex(b BuildCtx, n *tree.CreateIndex) { + b.IncrementSchemaChangeCreateCounter("index") // Resolve the table name and start building the new index element. relationElements := b.ResolveRelation(n.Table.ToUnresolvedObjectName(), ResolveParams{ IsExistenceOptional: false, diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_schema.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_schema.go index 6a16e4a54854..3b9dafa795b6 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_schema.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/create_schema.go @@ -40,6 +40,7 @@ func CreateSchema(b BuildCtx, n *tree.CreateSchema) { } } + b.IncrementSchemaChangeCreateCounter("schema") sqltelemetry.IncrementUserDefinedSchemaCounter(sqltelemetry.UserDefinedSchemaCreate) dbElts := b.ResolveDatabase(n.Schema.CatalogName, ResolveParams{ IsExistenceOptional: false, diff --git a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/dependencies.go b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/dependencies.go index d3565f4e755c..b5e1ec3d6088 100644 --- a/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/dependencies.go +++ b/pkg/sql/schemachanger/scbuild/internal/scbuildstmt/dependencies.go @@ -148,6 +148,10 @@ type TreeAnnotator interface { // Telemetry allows incrementing schema change telemetry counters. type Telemetry interface { + // IncrementSchemaChangeCreateCounter increments the selected CREATE telemetry + // counter. + IncrementSchemaChangeCreateCounter(counterType string) + // IncrementSchemaChangeAlterCounter increments the selected ALTER telemetry // counter. IncrementSchemaChangeAlterCounter(counterType string, extra ...string) diff --git a/pkg/sql/schemachanger/scdeps/build_deps.go b/pkg/sql/schemachanger/scdeps/build_deps.go index c5ac37b7223f..4a30c829e643 100644 --- a/pkg/sql/schemachanger/scdeps/build_deps.go +++ b/pkg/sql/schemachanger/scdeps/build_deps.go @@ -385,6 +385,12 @@ func (d *buildDeps) IncrementSchemaChangeAlterCounter(counterType string, extra telemetry.Inc(sqltelemetry.SchemaChangeAlterCounterWithExtra(counterType, maybeExtra)) } +// IncrementSchemaChangeCreateCounter implements the scbuild.Dependencies +// interface. +func (d *buildDeps) IncrementSchemaChangeCreateCounter(counterType string) { + telemetry.Inc(sqltelemetry.SchemaChangeCreateCounter(counterType)) +} + // IncrementSchemaChangeDropCounter implements the scbuild.Dependencies // interface. func (d *buildDeps) IncrementSchemaChangeDropCounter(counterType string) { diff --git a/pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go b/pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go index ace119fc7602..59c156658ca5 100644 --- a/pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go +++ b/pkg/sql/schemachanger/scdeps/sctestdeps/test_deps.go @@ -110,6 +110,12 @@ func (s *TestState) IncrementSchemaChangeDropCounter(counterType string) { s.LogSideEffectf("increment telemetry for sql.schema.drop_%s", counterType) } +// IncrementSchemaChangeCreateCounter implements the scbuild.Dependencies +// interface. +func (s *TestState) IncrementSchemaChangeCreateCounter(counterType string) { + s.LogSideEffectf("increment telemetry for sql.schema.create_%s", counterType) +} + // IncrementSchemaChangeAddColumnTypeCounter implements the scbuild.Dependencies // interface. func (s *TestState) IncrementSchemaChangeAddColumnTypeCounter(typeName string) { diff --git a/pkg/sql/schemachanger/testdata/end_to_end/create_function/create_function.side_effects b/pkg/sql/schemachanger/testdata/end_to_end/create_function/create_function.side_effects index 00dddd1d89ca..ef8a75e8ade2 100644 --- a/pkg/sql/schemachanger/testdata/end_to_end/create_function/create_function.side_effects +++ b/pkg/sql/schemachanger/testdata/end_to_end/create_function/create_function.side_effects @@ -31,6 +31,7 @@ $$; begin transaction #1 # begin StatementPhase checking for feature: CREATE FUNCTION +increment telemetry for sql.schema.create_function write *eventpb.CreateFunction to event log: functionName: defaultdb.public.f sql: diff --git a/pkg/sql/schemachanger/testdata/end_to_end/create_function_in_txn/create_function_in_txn.side_effects b/pkg/sql/schemachanger/testdata/end_to_end/create_function_in_txn/create_function_in_txn.side_effects index 1288ff20cad4..26c119bc2ab5 100644 --- a/pkg/sql/schemachanger/testdata/end_to_end/create_function_in_txn/create_function_in_txn.side_effects +++ b/pkg/sql/schemachanger/testdata/end_to_end/create_function_in_txn/create_function_in_txn.side_effects @@ -11,6 +11,7 @@ CREATE UNIQUE INDEX idx ON t(b); begin transaction #1 # begin StatementPhase checking for feature: CREATE FUNCTION +increment telemetry for sql.schema.create_function write *eventpb.CreateFunction to event log: functionName: defaultdb.public.t sql: @@ -68,6 +69,7 @@ upsert descriptor #101 - version: "1" + version: "2" checking for feature: CREATE INDEX +increment telemetry for sql.schema.create_index write *eventpb.CreateIndex to event log: indexName: idx mutationId: 1 diff --git a/pkg/sql/schemachanger/testdata/end_to_end/create_index/create_index.side_effects b/pkg/sql/schemachanger/testdata/end_to_end/create_index/create_index.side_effects index ca0540455c7f..5d486d1d5823 100644 --- a/pkg/sql/schemachanger/testdata/end_to_end/create_index/create_index.side_effects +++ b/pkg/sql/schemachanger/testdata/end_to_end/create_index/create_index.side_effects @@ -13,6 +13,7 @@ CREATE INDEX idx1 ON t (v) WHERE (v = 'a'); begin transaction #1 # begin StatementPhase checking for feature: CREATE INDEX +increment telemetry for sql.schema.create_index increment telemetry for sql.schema.partial_index write *eventpb.CreateIndex to event log: indexName: idx1 diff --git a/pkg/sql/schemachanger/testdata/end_to_end/create_index_create_schema_separate_statements/create_index_create_schema_separate_statements.side_effects b/pkg/sql/schemachanger/testdata/end_to_end/create_index_create_schema_separate_statements/create_index_create_schema_separate_statements.side_effects index c1e76d6a6bf7..3ac79baea285 100644 --- a/pkg/sql/schemachanger/testdata/end_to_end/create_index_create_schema_separate_statements/create_index_create_schema_separate_statements.side_effects +++ b/pkg/sql/schemachanger/testdata/end_to_end/create_index_create_schema_separate_statements/create_index_create_schema_separate_statements.side_effects @@ -12,6 +12,7 @@ CREATE SCHEMA sc; begin transaction #1 # begin StatementPhase checking for feature: CREATE INDEX +increment telemetry for sql.schema.create_index increment telemetry for sql.schema.partial_index write *eventpb.CreateIndex to event log: indexName: idx @@ -95,6 +96,7 @@ upsert descriptor #104 - version: "1" + version: "2" checking for feature: CREATE SCHEMA +increment telemetry for sql.schema.create_schema write *eventpb.CreateSchema to event log: owner: root schemaName: defaultdb.sc diff --git a/pkg/sql/schemachanger/testdata/end_to_end/create_schema/create_schema.side_effects b/pkg/sql/schemachanger/testdata/end_to_end/create_schema/create_schema.side_effects index 45aa1e5bf42b..bfd2dc696c35 100644 --- a/pkg/sql/schemachanger/testdata/end_to_end/create_schema/create_schema.side_effects +++ b/pkg/sql/schemachanger/testdata/end_to_end/create_schema/create_schema.side_effects @@ -8,6 +8,7 @@ CREATE SCHEMA sc; begin transaction #1 # begin StatementPhase checking for feature: CREATE SCHEMA +increment telemetry for sql.schema.create_schema write *eventpb.CreateSchema to event log: owner: root schemaName: defaultdb.sc diff --git a/pkg/sql/schemachanger/testdata/end_to_end/create_schema_drop_schema_separate_statements/create_schema_drop_schema_separate_statements.side_effects b/pkg/sql/schemachanger/testdata/end_to_end/create_schema_drop_schema_separate_statements/create_schema_drop_schema_separate_statements.side_effects index d816c814614b..aa2694c4301b 100644 --- a/pkg/sql/schemachanger/testdata/end_to_end/create_schema_drop_schema_separate_statements/create_schema_drop_schema_separate_statements.side_effects +++ b/pkg/sql/schemachanger/testdata/end_to_end/create_schema_drop_schema_separate_statements/create_schema_drop_schema_separate_statements.side_effects @@ -10,6 +10,7 @@ CREATE SCHEMA sc; begin transaction #1 # begin StatementPhase checking for feature: CREATE SCHEMA +increment telemetry for sql.schema.create_schema write *eventpb.CreateSchema to event log: owner: root schemaName: defaultdb.sc @@ -74,6 +75,7 @@ upsert descriptor #104 + state: DROP version: "1" checking for feature: CREATE SCHEMA +increment telemetry for sql.schema.create_schema write *eventpb.CreateSchema to event log: owner: root schemaName: defaultdb.sc diff --git a/pkg/sql/schemachanger/testdata/end_to_end/drop_column_create_index_separate_statements/drop_column_create_index_separate_statements.side_effects b/pkg/sql/schemachanger/testdata/end_to_end/drop_column_create_index_separate_statements/drop_column_create_index_separate_statements.side_effects index 251910b4f3a9..844afc147524 100644 --- a/pkg/sql/schemachanger/testdata/end_to_end/drop_column_create_index_separate_statements/drop_column_create_index_separate_statements.side_effects +++ b/pkg/sql/schemachanger/testdata/end_to_end/drop_column_create_index_separate_statements/drop_column_create_index_separate_statements.side_effects @@ -207,6 +207,7 @@ upsert descriptor #104 - version: "1" + version: "2" checking for feature: CREATE INDEX +increment telemetry for sql.schema.create_index write *eventpb.CreateIndex to event log: indexName: idx mutationId: 1 diff --git a/pkg/sql/testdata/telemetry/schema b/pkg/sql/testdata/telemetry/schema index 968be5093bb0..afc7bb704f05 100644 --- a/pkg/sql/testdata/telemetry/schema +++ b/pkg/sql/testdata/telemetry/schema @@ -124,9 +124,54 @@ sql.schema.schema_changer_mode.legacy feature-usage CREATE FUNCTION f() RETURNS INT AS $$ SELECT 1 $$ LANGUAGE SQL IMMUTABLE ---- +sql.schema.create_function sql.schema.schema_changer_mode.declarative feature-usage ALTER FUNCTION f() OWNER TO admin ---- +sql.schema.alter_function +sql.schema.schema_changer_mode.legacy + +feature-usage +ALTER FUNCTION f() SET SCHEMA public +---- +sql.schema.alter_function +sql.schema.schema_changer_mode.legacy + +feature-usage +ALTER FUNCTION f() VOLATILE +---- +sql.schema.alter_function +sql.schema.schema_changer_mode.legacy + +feature-usage +ALTER FUNCTION f() RENAME TO g +---- +sql.schema.alter_function +sql.schema.schema_changer_mode.legacy + +feature-usage +CREATE SCHEMA a +---- +sql.schema.create_schema +sql.schema.schema_changer_mode.declarative + +feature-usage +CREATE INDEX ON t(b) +---- +sql.schema.create_index +sql.schema.get_virtual_table.pg_catalog.pg_attribute +sql.schema.schema_changer_mode.declarative + +feature-usage +CREATE TYPE enum_typ AS ENUM ('a', 'b') +---- +sql.schema.create_type +sql.schema.schema_changer_mode.legacy + +feature-usage +CREATE TYPE composite_typ AS (a int, b int) +---- +sql.schema.create_type sql.schema.schema_changer_mode.legacy From 5bbf4e3dbb9a8be93b652fd8e52bee459cebf950 Mon Sep 17 00:00:00 2001 From: Chengxiong Ruan Date: Mon, 26 Jun 2023 16:41:36 -0400 Subject: [PATCH 2/5] sql: disallow cross-database type references in CTAS Fixes: #105393 Release note (bug fix): reviously, cross-database type references could sneaked in through `CREATE TABLE...AS` statements if the source table is from another database and any of its columns is of a user defined type. This introduced bug where the source table can be dropped and type could not be found for the CTAS table. This commit disallow such CTAS as a fix. --- pkg/sql/create_table.go | 22 ++++++++++++++++ .../logictest/testdata/logic_test/create_as | 26 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 69954750b5d1..8fafc599f905 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -39,6 +39,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/paramparse" "github.com/cockroachdb/cockroach/pkg/sql/parser" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -1223,6 +1224,27 @@ func newTableDescIfAs( } } + // Check if there is any reference to a user defined type that belongs to + // another database which is not allowed. + for _, def := range p.Defs { + if d, ok := def.(*tree.ColumnTableDef); ok { + // In CTAS, ColumnTableDef are generated from resultColumns which are + // resolved already. So we may cast it to *types.T directly without + // resolving it again. + typ := d.Type.(*types.T) + if typ.UserDefined() { + tn, typDesc, err := params.p.GetTypeDescriptor(params.ctx, typedesc.UserDefinedTypeOIDToID(typ.Oid())) + if err != nil { + return nil, err + } + if typDesc.GetParentID() != db.GetID() { + return nil, pgerror.Newf( + pgcode.FeatureNotSupported, "cross database type references are not supported: %s", tn.String()) + } + } + } + } + desc, err = newTableDesc( params, p, diff --git a/pkg/sql/logictest/testdata/logic_test/create_as b/pkg/sql/logictest/testdata/logic_test/create_as index 2aeb3b033081..6c74254bea15 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_as +++ b/pkg/sql/logictest/testdata/logic_test/create_as @@ -382,3 +382,29 @@ query I SELECT * FROM tab_from_seq ---- 2 + +# Regression test for #105393 +subtest regression_105393 + +statement ok +CREATE DATABASE db105393_1; +CREATE DATABASE db105393_2; +USE db105393_1; +CREATE TYPE e105393 AS ENUM ('a'); +CREATE TABLE t105393 (a INT PRIMARY KEY, b e105393); +USE db105393_2; + +statement error pq: cross database type references are not supported: db105393_1.public.e105393 +CREATE TABLE e105393 AS TABLE db105393_1.public.t105393; + +statement error pq: cross database type references are not supported: db105393_1.public.e105393 +CREATE TABLE e105393 AS SELECT * FROM db105393_1.public.t105393; + +statement error pq: cross database type references are not supported: db105393_1.public.e105393 +CREATE TABLE e105393 AS SELECT b FROM db105393_1.public.t105393; + +statement error pq: cross database type references are not supported: db105393_1.public.e105393 +CREATE TABLE e105393 (a PRIMARY KEY, b) AS TABLE db105393_1.public.t105393; + +statement error pq: cross database type references are not supported: db105393_1.public.e105393 +CREATE TABLE e105393 (b) AS SELECT b FROM db105393_1.public.t105393; From 5879ef785b5599a47fa5e70cafaf4d1d94382070 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Tue, 27 Jun 2023 07:22:16 +0000 Subject: [PATCH 3/5] storage: make `storage.max_sync_duration` public and `TenantReadOnly` Users have asked why this setting is not public, this patch makes it so. Furthermore, these settings were `TenantWritable`. We do not want these to be writable by tenants, where they can potentially cause problems on SQL nodes, considering e.g. SQL disk spilling uses Pebble. Epic: none Release note: None --- docs/generated/settings/settings-for-tenants.txt | 2 ++ docs/generated/settings/settings.html | 2 ++ pkg/storage/pebble.go | 8 ++++---- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index dd0e1f995881..45edefdb0c43 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -292,6 +292,8 @@ sql.ttl.default_delete_rate_limit integer 0 default delete rate limit for all TT sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job tenant-rw sql.ttl.job.enabled boolean true whether the TTL job is enabled tenant-rw sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored tenant-rw +storage.max_sync_duration duration 20s maximum duration for disk operations; any operations that take longer than this setting trigger a warning log entry or process crash tenant-ro +storage.max_sync_duration.fatal.enabled boolean true if true, fatal the process when a disk operation exceeds storage.max_sync_duration tenant-ro timeseries.storage.enabled boolean true if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere tenant-rw timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. tenant-rw timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. tenant-rw diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 1d1131109f16..b0dfa63e450e 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -245,6 +245,8 @@
sql.ttl.default_select_batch_size
integer500default amount of rows to select in a single query during a TTL jobServerless/Dedicated/Self-Hosted
sql.ttl.job.enabled
booleantruewhether the TTL job is enabledServerless/Dedicated/Self-Hosted
sql.txn_fingerprint_id_cache.capacity
integer100the maximum number of txn fingerprint IDs storedServerless/Dedicated/Self-Hosted +
storage.max_sync_duration
duration20smaximum duration for disk operations; any operations that take longer than this setting trigger a warning log entry or process crashServerless/Dedicated/Self-Hosted (read-only) +
storage.max_sync_duration.fatal.enabled
booleantrueif true, fatal the process when a disk operation exceeds storage.max_sync_durationServerless/Dedicated/Self-Hosted (read-only)
storage.value_blocks.enabled
booleantrueset to true to enable writing of value blocks in sstablesDedicated/Self-Hosted
timeseries.storage.enabled
booleantrueif set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhereServerless/Dedicated/Self-Hosted
timeseries.storage.resolution_10s.ttl
duration240h0m0sthe maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.Serverless/Dedicated/Self-Hosted diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 878ffa6091e4..1a0e5be0c685 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -65,21 +65,21 @@ var maxSyncDurationDefault = envutil.EnvOrDefaultDuration("COCKROACH_ENGINE_MAX_ // MaxSyncDuration is the threshold above which an observed engine sync duration // triggers either a warning or a fatal error. var MaxSyncDuration = settings.RegisterDurationSetting( - settings.TenantWritable, + settings.TenantReadOnly, "storage.max_sync_duration", "maximum duration for disk operations; any operations that take longer"+ " than this setting trigger a warning log entry or process crash", maxSyncDurationDefault, -) +).WithPublic() // MaxSyncDurationFatalOnExceeded governs whether disk stalls longer than // MaxSyncDuration fatal the Cockroach process. Defaults to true. var MaxSyncDurationFatalOnExceeded = settings.RegisterBoolSetting( - settings.TenantWritable, + settings.TenantReadOnly, "storage.max_sync_duration.fatal.enabled", "if true, fatal the process when a disk operation exceeds storage.max_sync_duration", true, -) +).WithPublic() // ValueBlocksEnabled controls whether older versions of MVCC keys in the same // sstable will have their values written to value blocks. This only affects From a2aa8b72b9233b35ce8e2393b5b4fc7a36a2b8ac Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Thu, 8 Jun 2023 14:28:40 -0400 Subject: [PATCH 4/5] concurrency: use lock modes to find conflicts during lock table scans This patch majorly refactors the lock table scan codepath, all in the name of shared locks. At its core is a desire to use lock modes to perform conflict resolution between an incoming request and locks held on one particular key. In doing so, we rip out tryActiveWait. At a high level (for a particular key), a request's journey looks like the following: - It first checks if the transaction already holds a lock at a equal or higher lock strength (read: It's good enough for its use). If this is the case, it can proceed without any bookkeeping. - It then checks if any finalized transactions hold locks on the key. Such locks do not conflict, but need to be resolved before the transaction can evaluate. They're thus accumulated for later. - The request then enqueues itself in the appropriate wait queue. - It then determines if it needs to actively wait at this lock because of a conflict. If that's the case, the lock table scan short circuits. - Otherwise, the request lays a claim (if it can) before proceeding with its scan. Closes #102210 Release note: None --- pkg/kv/kvserver/concurrency/lock_table.go | 738 +++++++++++------- .../testdata/lock_table/queue_length_exceeded | 8 +- 2 files changed, 462 insertions(+), 284 deletions(-) diff --git a/pkg/kv/kvserver/concurrency/lock_table.go b/pkg/kv/kvserver/concurrency/lock_table.go index 02ac77a9eaa8..4e46637a03a1 100644 --- a/pkg/kv/kvserver/concurrency/lock_table.go +++ b/pkg/kv/kvserver/concurrency/lock_table.go @@ -19,6 +19,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/lockspanset" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -582,6 +583,18 @@ func (g *lockTableGuardImpl) updateStateToDoneWaitingLocked() { g.mu.state = waitingState{kind: doneWaiting} } +// startWaitingWithWaitingState modifies state on the request's guard to let it +// start waiting. +func (g *lockTableGuardImpl) startWaitingWithWaitingState(ws waitingState, notify bool) { + g.key = ws.key + + g.mu.Lock() + defer g.mu.Unlock() + g.mu.startWait = true + g.mu.curLockWaitStart = g.lt.clock.PhysicalTime() + g.maybeUpdateWaitingStateLocked(ws, notify) +} + // maybeUpdateWaitingStateLocked updates the request's waiting state if the // supplied state is meaningfully different[1]. The request's state change // channel is signaled if the waiting state is updated and the caller has @@ -614,7 +627,7 @@ func (g *lockTableGuardImpl) updateWaitingStateLocked(newState waitingState) { if newState.kind == doneWaiting { panic(errors.AssertionFailedf("unexpected waiting state kind: %d", newState.kind)) } - newState.guardStrength = g.str // copy over the strength which caused the conflict + newState.guardStrength = g.curStrength() // copy over the strength which caused the conflict g.mu.state = newState } @@ -654,7 +667,7 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts( ltRange := &lockState{key: startKey, endKey: span.EndKey} for iter.FirstOverlap(ltRange); iter.Valid(); iter.NextOverlap(ltRange) { l := iter.Cur() - if !l.isNonConflictingLock(g, g.str) { + if !l.isNonConflictingLock(g, g.curStrength()) { return false } } @@ -739,9 +752,28 @@ func (g *lockTableGuardImpl) isSameTxn(txn *enginepb.TxnMeta) bool { return g.txn != nil && g.txn.ID == txn.ID } -// TODO(arul): get rid of this once tryActiveWait is cleaned up. -func (g *lockTableGuardImpl) isSameTxnAsReservation(ws waitingState) bool { - return !ws.held && g.isSameTxn(ws.txn) +// curStrength returns the lock strength of the current lock span being scanned +// by the request. Lock spans declared by a request are iterated from strongest +// to weakest, and the return value of this method is mutable as the request's +// scan progresses from lock to lock. +func (g *lockTableGuardImpl) curStrength() lock.Strength { + return g.str +} + +// getCurLockMode returns the lock mode of the current lock being scanned by the +// request. The value returned by this method are mutable as the request's scan +// of the lock table progresses from lock to lock. +func (g *lockTableGuardImpl) curLockMode() lock.Mode { + var reqMode lock.Mode + switch g.curStrength() { + case lock.None: + reqMode = lock.MakeModeNone(g.ts, isolation.Serializable) + case lock.Intent: + reqMode = lock.MakeModeIntent(g.ts) + default: + panic(fmt.Sprintf("unhandled request strength: %s", g.curStrength())) + } + return reqMode } // takeToResolveUnreplicated returns the list of unreplicated locks accumulated @@ -767,7 +799,7 @@ func (g *lockTableGuardImpl) takeToResolveUnreplicated() []roachpb.LockUpdate { // // ACQUIRES: g.mu. func (g *lockTableGuardImpl) resumeScan(notify bool) { - spans := g.spans.GetSpans(g.str) + spans := g.spans.GetSpans(g.curStrength()) var span *roachpb.Span resumingInSameSpan := false if g.index == -1 || len(spans[g.index].EndKey) == 0 { @@ -815,14 +847,15 @@ func (g *lockTableGuardImpl) resumeScan(notify bool) { // Else, past the lock where it stopped waiting. We may not // encounter that lock since it may have been garbage collected. } - wait := l.tryActiveWait(g, g.str, notify, g.lt.clock) - if wait { + conflicts := l.scanAndMaybeEnqueue(g, notify) + if conflicts { return } } resumingInSameSpan = false span = stepToNextSpan(g) } + if len(g.toResolve) > 0 { j := 0 // Some of the locks in g.toResolve may already have been claimed by @@ -1310,6 +1343,7 @@ func (l *lockState) addToMetrics(m *LockTableMetrics, now time.Time) { // claimed the lock. The claimant transaction may have changed, so there may be // inconsistencies with waitSelf and waitForDistinguished states that need // changing. +// // REQUIRES: l.mu is locked. func (l *lockState) informActiveWaiters() { if l.waitingReaders.Len() == 0 && l.queuedWriters.Len() == 0 { @@ -1439,13 +1473,6 @@ func (l *lockState) claimantTxn() (_ *enginepb.TxnMeta, held bool) { panic("no queued writers or lock holder; no one should be waiting on the lock") } qg := l.queuedWriters.Front().Value.(*queuedGuard) - if qg.active || qg.guard.txn == nil { - // TODO(arul): uncomment this assertion once tryActiveWait has been - // refactored, and we no longer call into this method before readjusting - // the queued of writers to make the first one inactive. - //panic("first queued writer should be transactional and inactive") - return qg.guard.txnMeta(), false - } return qg.guard.txnMeta(), false } @@ -1506,6 +1533,7 @@ func (l *lockState) tryMakeNewDistinguished() { // Returns true iff the lockState is empty, i.e., there is no lock holder and no // waiters. +// // REQUIRES: l.mu is locked. func (l *lockState) isEmptyLock() bool { if l.holder.locked { @@ -1623,6 +1651,16 @@ func (l *lockState) getLockHolder() (*enginepb.TxnMeta, hlc.Timestamp) { return l.holder.holder[index].txn, l.holder.holder[index].ts } +// getLockMode returns the Mode with which a lock is held. +// +// REQUIRES: l.mu is locked. +func (l *lockState) getLockMode() lock.Mode { + lockHolderTxn, lockHolderTS := l.getLockHolder() + assert(lockHolderTxn != nil, "cannot get lock mode of an unheld lock") + + return lock.MakeModeIntent(lockHolderTS) +} + // Removes the current lock holder from the lock. // REQUIRES: l.mu is locked. func (l *lockState) clearLockHolder() { @@ -1633,118 +1671,178 @@ func (l *lockState) clearLockHolder() { } } -// tryActiveWait decides whether the request g, with locking strength str, -// should actively wait at this lock or not. It adjusts the data-structures -// appropriately if the request needs to wait. The notify parameter is true iff -// the request's new state channel should be notified -- it is set to false when -// the call to tryActiveWait is happening due to an event for a different -// request or transaction (like a lock release) since in that case the channel -// is notified first and the call to tryActiveWait() happens later in -// lockTableGuard.CurState(). -// -// It uses the txnStatusCache to decide that the caller does not need to wait on -// a lock of a transaction that is already finalized or is pending but pushed -// above the request's read timestamp (for non-locking readers). -// -// - For unreplicated locks, this method will silently remove (or update) the -// lock and proceed as normal. +// scanAndMaybeEnqueue scans all locks held on the receiver's key and performs +// conflict resolution with the supplied request. It may[1] enqueue the request +// in the receiver's wait queues. The return value indicates whether the caller +// should suspend its scan of the lock table or not; otherwise, it is free[2] +// to proceed. // -// - For replicated locks the behavior is more complicated since we need to -// resolve the intent. We desire: -// A. batching of intent resolution. -// B. minimize races where intent resolution is being performed by multiple -// requests. -// C. minimize races where the intent has not yet been resolved but has been -// removed from the lock table, thereby causing some other request to -// evaluate wastefully and discover the intent. +// [1] To understand when a request is enqueued or not, it's useful to consider +// 3 separate cases: +// 1. Transactional requests of the locking nature are always enqueued. +// 2. Transactional requests that are non-locking are only enqueued if there is a +// conflict. +// 3. Non-transactional {non-locking,write} requests are only enqueued if there +// is a conflict. // -// For A, the caller of tryActiveWait will accumulate the LockUpdates. For B, -// we only generate a LockUpdate here if this request is either a reader, or -// the first writer in the queue, i.e., it is only blocked by the lock -// holder. This prevents races between multiple writers in doing resolution -// but not between multiple readers and between readers and writers. We could -// be more conservative in only doing the intent resolution if the waiter was -// equivalent to a distinguished-waiter, but there it no guarantee that that -// distinguished waiter will do intent resolution in a timely manner (since -// it could block waiting on some other lock). Instead, the caller of -// tryActiveWait makes a best-effort to reduce racing (explained below). For -// C, the caller of tryActiveWait removes the lock from the in-memory -// data-structure only if the request does not need to wait anywhere, which -// means it will immediately proceed to intent resolution. Additionally, if -// the lock has already been removed, it suggests that some other request has -// already claimed intent resolution (or done it), so this request does not -// need to do the resolution. +// [2] Locks belonging to a finalized transaction do not cause the caller to +// wait on them. Likewise, locks belonging to in-progress transactions that are +// known to be pushed to a non-conflicting timestamp (read: higher ts than the +// request's) do not cause the caller to wait on them either. However, in both +// these cases, such locks (may) need to be resolved before the request can +// evaluate. The guard's state is modified to indicate if there are locks that +// need resolution. // -// Ideally, we would strengthen B and C -- a request should make a claim on -// intent resolution for a set of keys, and will either resolve the intent, -// or due to an error will return that claim so others can do so. A -// replicated lock (intent) would not be removed from the in-memory -// data-structure until it was actually gone. -// TODO(sumeer): do this cleaner solution for batched intent resolution. -// -// In the future we'd like to augment the lockTable with an understanding of -// finalized but not yet resolved locks. These locks will allow conflicting -// transactions to proceed with evaluation without the need to first remove -// all traces of them via a round of replication. This is discussed in more -// detail in #41720. Specifically, see mention of "contention footprint" and -// COMMITTED_BUT_NOT_REMOVABLE. -// Also, resolving these locks/intents would proceed without latching, so we -// would not rely on MVCC scanning to add discovered locks to the lock table, -// since the discovered locks may be stale. -// -// The return value is true iff it is actively waiting. -// Acquires l.mu, g.mu. -func (l *lockState) tryActiveWait( - g *lockTableGuardImpl, str lock.Strength, notify bool, clock *hlc.Clock, -) (wait bool) { +// REQUIRES: l.mu to be locked. +func (l *lockState) scanAndMaybeEnqueue(g *lockTableGuardImpl, notify bool) (wait bool) { l.mu.Lock() defer l.mu.Unlock() + if l.isEmptyLock() { + return false /* wait */ + } - switch str { - case lock.None, lock.Intent: - default: - panic(errors.AssertionFailedf("unexpected lock strength %s", str)) + // It is possible that the lock is already held by this request's + // transaction, and it is held with a lock strength good enough for it. + if l.alreadyHoldsLockAndIsAllowedToProceed(g) { + return false /* wait */ } - // It is possible that this lock is empty and has not yet been deleted. - if l.isEmptyLock() { - return false + if g.curStrength() == lock.None { + conflicts := l.maybeEnqueueNonLockingReadRequest(g) + if conflicts { + ws := l.constructWaitingState(g) + g.startWaitingWithWaitingState(ws, notify) + return true /* wait */ + } + return false /* wait */ } - // Lock is not empty. - lockHolderTxn, lockHolderTS := l.getLockHolder() - if lockHolderTxn != nil && g.isSameTxn(lockHolderTxn) { - // Already locked by this txn. + // We're purely dealing with locking requests from here on out. + + maxQueueLengthExceeded := l.enqueueLockingRequest(g) + if maxQueueLengthExceeded { + // NB: Requests that encounter a lock wait-queue that is longer than + // what they're willing to wait for are rejected by the lock table + // waiter based on the waiting state we'll construct here. + ws := l.constructWaitingState(g) + ws.kind = waitQueueMaxLengthExceeded + g.startWaitingWithWaitingState(ws, notify) + // Return true, not because we want to wait, but because we want + // this request to be rejected in the lock table waiter. + return true /* wait */ + } + + if l.shouldRequestActivelyWait(g) { + ws := l.constructWaitingState(g) + g.startWaitingWithWaitingState(ws, notify) + // TODO(arul): In the future, when we extend the lock table to consider + // UPDATE locks as well, we'll need to add a call to informActiveWaiters + // here. Consider the following construction: + // + // keyA: [SHARED, UPDATE] + // waitQueue: [{r1: UPDATE(seq=10)}] + // g: {r2: Exclusive(seq=9)} + // + // Previously, the r1 was waiting on the UPDATE lock that is held. However, + // once r2 slots in front of it, r2 is waiting on the SHARED lock. To + // prevent cases where different waiters are pushing different transactions, + // we'll need to notify r1 to push the SHARED lock instead. To do so, we + // need to call informActiveWaiters. Note that informActiveWaiters elides + // updates if they're not meaningful, so we can get away with being less + // precise in handling the more general case at this level. + return true /* wait */ + } + + l.claimBeforeProceeding(g) + // Inform any active waiters that (may) need to be made aware that this + // request acquired a claim. + l.informActiveWaiters() + return false /* wait */ +} + +// constructWaitingState constructs the waiting state the supplied request +// should use to wait in the receiver's lock wait-queues. +// +// REQUIRES: l.mu to be locked. +func (l *lockState) constructWaitingState(g *lockTableGuardImpl) waitingState { + waitForState := waitingState{ + kind: waitFor, + key: l.key, + queuedWriters: l.queuedWriters.Len(), + queuedReaders: l.waitingReaders.Len(), + held: true, + } + txn, held := l.claimantTxn() + waitForState.held = held + waitForState.txn = txn + if g.isSameTxn(waitForState.txn) { + waitForState.kind = waitSelf + } else if l.distinguishedWaiter == g { + waitForState.kind = waitForDistinguished + } + return waitForState +} + +// alreadyHoldsLockAndIsAllowedToProceed returns true if the request, referenced +// by the supplied lock table guard, is allowed to proceed because its +// transaction already holds the lock with an equal or higher lock strength. +// Otherwise, false is returned. +// +// REQUIRES: l.mu to be locked. +func (l *lockState) alreadyHoldsLockAndIsAllowedToProceed(g *lockTableGuardImpl) bool { + lockHolderTxn, _ := l.getLockHolder() + if lockHolderTxn == nil { + return false // no one holds the lock + } + if !g.isSameTxn(lockHolderTxn) { return false } + heldMode := l.getLockMode() + // Check if the lock is already held by the guard's transaction with an equal + // or higher lock strength. If it is, we're good to go. Otherwise, the request + // is trying to promote a lock it previously acquired. In such cases, the + // existence of a lock with weaker strength doesn't do much for this request. + // It's no different than the case where its trying to acquire a fresh lock. + return g.curStrength() <= heldMode.Strength +} - var replicatedLockFinalizedTxn *roachpb.Transaction - var unreplicatedLockFinalizedTxn *roachpb.Transaction - if lockHolderTxn != nil { - finalizedTxn, ok := g.lt.txnStatusCache.finalizedTxns.get(lockHolderTxn.ID) - if ok { - if l.holder.holder[lock.Replicated].txn == nil { - g.toResolveUnreplicated = append( - g.toResolveUnreplicated, roachpb.MakeLockUpdate(finalizedTxn, roachpb.Span{Key: l.key})) - unreplicatedLockFinalizedTxn = finalizedTxn - } else { - replicatedLockFinalizedTxn = finalizedTxn - } +// conflictsWithLockHolder returns true if the request, referenced by the +// supplied lockTableGuardImpl, conflicts with the lock holder. Non-conflicting +// requests are allowed to proceed; conflicting requests must actively wait for +// the lock to be released. +// +// Locks held by transactions that are known to be finalized are considered +// non-conflicting. However, the caller may be responsible for cleaning them up +// before proceeding. +// +// REQUIRES: l.mu is locked. +// REQUIRES: the transaction, to which the request belongs, should not be a lock +// holder. +func (l *lockState) conflictsWithLockHolder(g *lockTableGuardImpl) bool { + lockHolderTxn, _ := l.getLockHolder() + if lockHolderTxn == nil { + return false // the lock isn't held; no conflict to speak of + } + // We should never get here if the lock is already held by another request + // from the same transaction; this should already be checked in + // alreadyHoldLockAndIsAllowedToProceed. + assert(!g.isSameTxn(lockHolderTxn), "lock already held by the request's transaction") + finalizedTxn, ok := g.lt.txnStatusCache.finalizedTxns.get(lockHolderTxn.ID) + if ok { + up := roachpb.MakeLockUpdate(finalizedTxn, roachpb.Span{Key: l.key}) + // The lock belongs to a finalized transaction. There's no conflict, but the + // lock must be resolved -- accumulate it on the appropriate slice. + if l.holder.holder[lock.Replicated].txn == nil { + g.toResolveUnreplicated = append(g.toResolveUnreplicated, up) + } else { + g.toResolve = append(g.toResolve, up) } + return false } - if str == lock.None { - if lockHolderTxn == nil { - // Non locking reads only care about locks, not reservations. - return false - } - // Locked by some other txn. - // TODO(arul): this will need to change once we start supporting different - // lock strengths. - if g.ts.Less(lockHolderTS) { - return false - } + // The lock is held by a different, un-finalized transaction. + if g.curStrength() == lock.None { // If the non-locking reader is reading at a higher timestamp than the lock // holder, but it knows that the lock holder has been pushed above its read // timestamp, it can proceed after rewriting the lock at its transaction's @@ -1763,15 +1861,14 @@ func (l *lockState) tryActiveWait( if ok && g.ts.Less(pushedTxn.WriteTimestamp) { up := roachpb.MakeLockUpdate(pushedTxn, roachpb.Span{Key: l.key}) if l.holder.holder[lock.Replicated].txn == nil { - // Only held unreplicated. Accumulate a unreplicated lock update in - // case any other waiting readers can benefit from the pushed - // timestamp. + // Only held unreplicated. Accumulate it as an unreplicated lock to + // resolve, in case any other waiting readers can benefit from the + // pushed timestamp. // // TODO(arul): this case is only possible while non-locking reads // block on Exclusive locks. Once non-locking reads start only // blocking on intents, it can be removed and asserted against. - g.toResolveUnreplicated = append( - g.toResolveUnreplicated, up) + g.toResolveUnreplicated = append(g.toResolveUnreplicated, up) } else { // Resolve to push the replicated intent. g.toResolve = append(g.toResolve, up) @@ -1783,179 +1880,265 @@ func (l *lockState) tryActiveWait( g.mu.Lock() _, alsoLocksWithHigherStrength := g.mu.locks[l] g.mu.Unlock() - - // If the request already has this lock in its locks map, it must also be - // acquiring this lock at a higher strength. It must either be a reservation - // holder or an inactive waiter at this lock. The former has already been - // handled above. For the latter to be possible, the request must have had - // its reservation broken. Since this is a weaker lock strength, we defer to - // the stronger lock strength and continuing with our scan. - // - // NB: If we were not defer to the stronger lock strength and start waiting - // here, we would end up doing so in the wrong wait queue (queuedReaders vs. - // queuedWriters). - // - // TODO(arul): the queued{Readers,Writers} names are going to change, as - // described in the Shared locks RFC. Reword this comment when that happens. - // - // Non-transactional requests cannot make reservations or acquire locks. - // They can only perform reads or writes, which means they can only have - // lock spans with strength {None,Intent}. However, because they cannot make - // reservations, we can not detect a key is being accessed with both None - // and Intent locking strengths, like we can for transactional requests. In - // some rare cases, the lock is now held at a timestamp that is not - // compatible with this request and it will wait here -- there's no - // correctness issue in doing so. - // - // TODO(arul): It seems like the above paragraph is implying a writing - // non-transactional request will end up waiting in the same queue as - // non-locking readers, and that's fine. We should revisit if we want to - // store non-transactional writers with other locking requests, as described - // in the shared locks RFC -- non-transactional requests race with readers - // and reservation holders anyway, so I'm not entirely sure what we get by - // storing them in the same queue as locking requests. if alsoLocksWithHigherStrength { + // If the request already has this lock in its locks map, it must also be + // trying to acquire this lock at a higher strength. For it to be here, it + // must have a (possibly joint) claim on this lock. The claim may have been + // broken since, but that's besides the point -- we defer to the stronger + // lock strength and continue with our scan. + // + // NB: If we were to not defer to the stronger lock strength and start + // waiting here, we could potentially end up doing so in the wrong wait + // queue (queuedReaders vs. queuedWriters). There wouldn't be a correctness + // issue in doing so, but it isn't ideal. + // + // NB: Non-transactional requests do not make claims or acquire locks. They + // can only perform reads or writes, which means they can only have lock + // spans with strength {None,Intent}. However, because they cannot make + // claims on locks, we can not detect a key is being accessed with both None + // and Intent locking strengths, like we can for transactional requests. In + // some rare cases, the lock may now be held at a timestamp that is not + // compatible with this request, and it will wait here -- there's no + // correctness issue in doing so. + // + // TODO(arul): I'm not entirely sure I understand why we have the + // g.str == lock.None condition above. We do need it, because taking it + // out breaks some tests. Will need to figure this out when trying to + // extend the lock table to work with multiple lock strengths. return false } } - if !l.holder.locked && l.queuedWriters.Len() > 0 { - qg := l.queuedWriters.Front().Value.(*queuedGuard) - if qg.guard == g { - // Already claimed by this request. - return false - } - // A non-transactional write request never makes or breaks claims, and only - // waits for a claim if the claim holder has a lower seqNum. Note that `str - // == lock.None && lockHolderTxn == nil` was already checked above. - if g.txn == nil && qg.guard.seqNum > g.seqNum { - // Claimed by a request with a higher seqNum and g is a non-transactional - // request. Ignore the claim. - return false - } - } - - // Incompatible with whoever is holding lock or reservation. + // The held lock neither belongs to the request's transaction (which has + // special handling above) nor to a transaction that has been finalized. Check + // for conflicts. + return lock.Conflicts(l.getLockMode(), g.curLockMode(), &g.lt.settings.SV) +} - waitForState := waitingState{ - kind: waitFor, - key: l.key, - queuedWriters: l.queuedWriters.Len(), - queuedReaders: l.waitingReaders.Len(), - } - waitForState.txn, waitForState.held = l.claimantTxn() +// maybeEnqueueNonLockingReadRequest enqueues a read request in the receiver's +// wait queue if the reader conflicts with the lock; otherwise, it's a no-op. +// A boolean is returned indicating whether the read request conflicted with +// the lock or not. +// +// REQUIRES: l.mu to be locked. +func (l *lockState) maybeEnqueueNonLockingReadRequest(g *lockTableGuardImpl) (conflicts bool) { + assert(g.curStrength() == lock.None, "unexpected locking strength; expected read") + if !l.conflictsWithLockHolder(g) { + return false // no conflict, no need to enqueue + } + l.waitingReaders.PushFront(g) + // This request may be a candidate to become a distinguished waiter if one + // doesn't exist yet; try making it such. + l.maybeMakeDistinguishedWaiter(g) + g.mu.Lock() + defer g.mu.Unlock() + g.mu.locks[l] = struct{}{} + return true +} - // May need to wait. - wait = true +// enqueueLockingRequest enqueues the supplied locking request in the receiver's +// lock wait queue. The locking request is wrapped in a queuedGuard which +// denotes it is actively waiting at the receiver. Note that the request may +// already be present in the lock's wait queue, in which case, the queuedGuard +// is modified to reflect its status as an active waiter, if necessary. +// +// Locking requests have a maximum queue length bound configured for them above +// which they refuse to wait. If the receiver's wait queue is longer than this +// configured bound the request is not enqueued; instead, a boolean indicating +// this case is returned to the caller. +// +// REQUIRES: l.mu to be locked. +func (l *lockState) enqueueLockingRequest(g *lockTableGuardImpl) (maxQueueLengthExceeded bool) { + assert(g.curStrength() != lock.None, "should only be called with a locking request") g.mu.Lock() defer g.mu.Unlock() - if str == lock.Intent { - var qg *queuedGuard - if _, inQueue := g.mu.locks[l]; inQueue { - // Already in queue and must be in the right position, so mark as active - // waiter there. We expect this to be rare. - for e := l.queuedWriters.Front(); e != nil; e = e.Next() { - qqg := e.Value.(*queuedGuard) - if qqg.guard == g { - qg = qqg - break - } - } - if qg == nil { - panic("lockTable bug") - } - // Tentative. See below. - qg.active = true - } else { - // Not in queue so insert as active waiter. The active waiter - // designation is tentative (see below). - qg = &queuedGuard{ - guard: g, - active: true, - } - if curLen := l.queuedWriters.Len(); curLen == 0 { - l.queuedWriters.PushFront(qg) - } else if g.maxWaitQueueLength > 0 && curLen >= g.maxWaitQueueLength { - // The wait-queue is longer than the request is willing to wait for. - // Instead of entering the queue, immediately reject the request. For - // simplicity, we are not finding the position of this writer in the - // queue and rejecting the tail of the queue above the max length. That - // would be more fair, but more complicated, and we expect that the - // common case is that this waiter will be at the end of the queue. - g.mu.startWait = true - state := waitForState - state.kind = waitQueueMaxLengthExceeded - g.maybeUpdateWaitingStateLocked(state, notify) - // NOTE: we return wait=true not because the request is waiting, but - // because it should not continue scanning for conflicting locks. - return true - } else { - var e *list.Element - for e = l.queuedWriters.Back(); e != nil; e = e.Prev() { - qqg := e.Value.(*queuedGuard) - if qqg.guard.seqNum < qg.guard.seqNum { - break - } - } - if e == nil { - l.queuedWriters.PushFront(qg) - } else { - l.queuedWriters.InsertAfter(qg, e) - } - } - g.mu.locks[l] = struct{}{} - waitForState.queuedWriters = l.queuedWriters.Len() // update field - } - if (replicatedLockFinalizedTxn != nil || - unreplicatedLockFinalizedTxn != nil || - !l.holder.locked) && - l.queuedWriters.Front().Value.(*queuedGuard) == qg { - _ = unreplicatedLockFinalizedTxn - // First waiter, so should not wait. NB: this inactive waiter can be - // non-transactional. - qg.active = false - wait = false - // If this request was previously designated as a distinguished waiter, - // and is now being marked inactive, clear out the designation. - if l.distinguishedWaiter == qg.guard { - l.distinguishedWaiter = nil + + // First, check if the request is already in the queue. This can happen if + // this function is called on behalf of a request that was previously was + // an inactive waiter at this lock and comes back around. + if _, inQueue := g.mu.locks[l]; inQueue { + // Find the request; it must already be in the correct position. + for e := l.queuedWriters.Front(); e != nil; e = e.Next() { + qqg := e.Value.(*queuedGuard) + if qqg.guard == g { + qqg.active = true // set the active status as true, in case it wasn't before + // Now that this request is actively waiting in the lock's wait queue, + // it may be a candidate for becoming the distinguished waiter (if one + // doesn't exist already). + l.maybeMakeDistinguishedWaiter(g) + return false /* maxQueueLengthExceeded */ } } - } else { - if replicatedLockFinalizedTxn != nil || unreplicatedLockFinalizedTxn != nil { - // Non-locking readers do not wait on finalized {replicated,unreplicated} - // locks. - wait = false - } else { - l.waitingReaders.PushFront(g) - g.mu.locks[l] = struct{}{} - waitForState.queuedReaders = l.waitingReaders.Len() // update field + panic("lock table bug") + } + + // Check if the lock's wait queue has room for one more request. + if g.maxWaitQueueLength > 0 && l.queuedWriters.Len() >= g.maxWaitQueueLength { + // The wait-queue is longer than the request is willing to wait for. + // Instead of entering the queue, immediately reject the request. For + // simplicity, we are not finding the position of this writer in the + // queue and rejecting the tail of the queue above the max length. That + // would be more fair, but more complicated, and we expect that the + // common case is that this waiter will be at the end of the queue. + return true /* maxQueueLengthExceeded */ + } + qg := &queuedGuard{ + guard: g, + active: true, + } + // The request isn't in the queue. Add it in the correct position, based on + // its sequence number. + var e *list.Element + for e = l.queuedWriters.Back(); e != nil; e = e.Prev() { + qqg := e.Value.(*queuedGuard) + if qqg.guard.seqNum < qg.guard.seqNum { + break } } - if !wait { - if replicatedLockFinalizedTxn != nil { - g.toResolve = append( - g.toResolve, roachpb.MakeLockUpdate(replicatedLockFinalizedTxn, roachpb.Span{Key: l.key})) + if e == nil { + l.queuedWriters.PushFront(qg) + } else { + l.queuedWriters.InsertAfter(qg, e) + } + // This request may be a candidate to become a distinguished waiter if one + // doesn't exist yet; try making it such. + l.maybeMakeDistinguishedWaiter(g) + g.mu.locks[l] = struct{}{} + return false /* maxQueueLengthExceeded */ +} + +// maybeMakeDistinguishedWaiter designates the supplied request as the +// distinguished waiter if no distinguished waiter. If there is a distinguished +// waiter, or the supplied request is not a candidate for becoming one[1], the +// function is a no-op. +// +// [1] A request that belongs to the lock's claimant transaction is not eligible +// to become a distinguished waiter. +// +// REQUIRES: l.mu to be locked. +func (l *lockState) maybeMakeDistinguishedWaiter(g *lockTableGuardImpl) { + if l.distinguishedWaiter != nil { + return + } + claimantTxn, _ := l.claimantTxn() + if !g.isSameTxn(claimantTxn) { + // We only want to make this request the distinguished waiter if a + // different request from its transaction isn't the claimant. + l.distinguishedWaiter = g + } +} + +// shouldRequestActivelyWait returns true iff the supplied request needs to +// actively wait on the receiver. +// +// REQUIRES: l.mu to be locked. +// REQUIRES: g.mu to be locked. +func (l *lockState) shouldRequestActivelyWait(g *lockTableGuardImpl) bool { + if g.curStrength() == lock.None { + return true // non-locking read requests always actively wait + } + + if l.conflictsWithLockHolder(g) { + return true + } + + // Start at the head of the queue and iterate backwards. As we iterate, we + // check for conflicts -- if a conflict is found, the request must actively + // wait; otherwise, it is free to proceed. Notably, we do not check the + // active/inactive status of the other requests as we're iterating through the + // head of the queue[1]. + // + // [1] This means that there may be an active waiter in front of the request, + // and it may still decide to not actively wait on the receiver. This can only + // happen if there are UPDATE strengths in the mix; this is because + // constructions using just SHARED, EXCLUSIVE, and INTENT lock strengths would + // result in there either being a conflict, or the head of the queue must + // entirely be comprised of inactive waiters. The lock table currenlty does + // not support UPDATE locks. Even if it did, there would be no correctness + // issue with what we're doing here, as long as the queue is maintained in + // sequence number order. + for e := l.queuedWriters.Front(); e != nil; e = e.Next() { + qqg := e.Value.(*queuedGuard) + if qqg.guard == g { + // We found our request while scanning from the front without finding any + // conflicting waiters; no need to actively wait here. + return false + } + // TODO(arul): Inactive waiters will need to capture the strength at which + // they're trying to acquire a lock in their queuedGuard. We can't simply + // use the guard's curStrength (or curLockMode) -- inactive waiters may have + // mutated these values as they scan. For now, we can just use the intent + // lock mode as that's the only lock strength supported by the lock table. + waiterLockMode := lock.MakeModeIntent(qqg.guard.ts) + if lock.Conflicts(waiterLockMode, g.curLockMode(), &g.lt.settings.SV) { + return true } - return false } - // Make it an active waiter. - g.key = l.key - g.mu.startWait = true - g.mu.curLockWaitStart = clock.PhysicalTime() - if g.isSameTxnAsReservation(waitForState) { - state := waitForState - state.kind = waitSelf - g.maybeUpdateWaitingStateLocked(state, notify) - } else { - state := waitForState - if l.distinguishedWaiter == nil { - l.distinguishedWaiter = g - state.kind = waitForDistinguished + panic("lock table bug: enqueued request not found") +} + +// claimBeforeProceeding adjusts datastructures on the receiver such that the +// supplied request lays a claim[1] on it before proceeding[2]. This method +// should only be called by locking requests. +// +// Note that the request acquiring the claim may be a distinguished waiter at +// this lock. In such cases, the distinguished status is cleared to allow the +// request to proceed (inactive waiters cannot be distinguished waiters). +// However, a new distinguished waiter is not chosen -- it's the caller's +// responsibility to detect this case and actually choose one. Typically, this +// is done using a call to informActiveWaiters. +// +// [1] Only transactional, locking requests can lay claims. Non-transactional +// writers cannot. +// [2] While non-transactional writers cannot lay claims, they do need to be +// removed from the receiver's wait queue before proceeding. We do that here. +// +// REQUIRES: l.mu to be locked. +func (l *lockState) claimBeforeProceeding(g *lockTableGuardImpl) { + assert(g.curStrength() != lock.None, "non-locking requests should not try to grab claims") + + // We're dealing with either a locking, transactional request or a + // non-transactional writer. We handle these two cases differently, because + // non-transactional writers are special cased[1]: + // 1. If the request is a transactional, locking request, it acquires a + // (possibly joint) claim by marking itself as inactive in the receiver's wait + // queue. + // 2. If it's a non-transactional writer, we simply remove it from the queue. + // As such, it races with other readers and transactional locking requests. + // + // [1] Non-transactional are special cased, in that they cannot lay claims on + // a lock (mark themselves as inactive and proceed with their scan). This is + // because doing so could result in undetectable deadlocks, as our distributed + // deadlock detection algorithm relies on {Push,Query}Txn requests. + // Non-transactional writers, by definition, have no associated transaction a + // waiter can push. + + // Find the request; iterate from the front, as requests proceeding are more + // likely to be closer to the front than the back. + for e := l.queuedWriters.Front(); e != nil; e = e.Next() { + qqg := e.Value.(*queuedGuard) + if qqg.guard == g { + // If the request was previously marked as a distinguished waiter, and is + // now able to claim the lock and proceed, clear the designation. Note + // that we're not choosing a new one to replace it; the responsibility of + // doing so is the caller's. + if g == l.distinguishedWaiter { + l.distinguishedWaiter = nil + } + if g.txn == nil { + // Non-transactional writer. + g.mu.Lock() + delete(g.mu.locks, l) + g.mu.Unlock() + l.queuedWriters.Remove(e) + } else { + // Transactional writer. + qqg.active = false // claim the lock + } + return } - g.maybeUpdateWaitingStateLocked(state, notify) } - return true + panic("lock table bug: did not find enqueued request") } func (l *lockState) isNonConflictingLock(g *lockTableGuardImpl, str lock.Strength) bool { @@ -2499,10 +2682,11 @@ func (l *lockState) removeWriter(e *list.Element) bool { delete(g.mu.locks, l) if qg.active { g.doneActivelyWaitingAtLock() - if g == l.distinguishedWaiter { - l.distinguishedWaiter = nil - return true - } + } + if g == l.distinguishedWaiter { + assert(qg.active, "distinguished waiter should be active") + l.distinguishedWaiter = nil + return true } return false } @@ -2916,8 +3100,8 @@ func (t *lockTableImpl) AddDiscoveredLock( // If the discoverer is a non-locking read, also check whether the lock's // holder is known to have been pushed above the reader's timestamp. See the - // comment in tryActiveWait for more details, including why we include the - // hasUncertaintyInterval condition. + // comment in scanAndMaybeEnqueue for more details, including why we include + // the hasUncertaintyInterval condition. if str == lock.None && !g.hasUncertaintyInterval() && t.batchPushedLockResolution() { pushedTxn, ok := g.lt.txnStatusCache.pendingTxns.get(intent.Txn.ID) if ok && g.ts.Less(pushedTxn.WriteTimestamp) { diff --git a/pkg/kv/kvserver/concurrency/testdata/lock_table/queue_length_exceeded b/pkg/kv/kvserver/concurrency/testdata/lock_table/queue_length_exceeded index 7b5de64a8c76..090880d533bb 100644 --- a/pkg/kv/kvserver/concurrency/testdata/lock_table/queue_length_exceeded +++ b/pkg/kv/kvserver/concurrency/testdata/lock_table/queue_length_exceeded @@ -241,12 +241,6 @@ scan r=req10 ---- start-waiting: true -# Note that the state here changed from waitForDistinguished to waitFor. This -# is because we're cheating slightly in this test by calling scan on a request -# that's already actively waiting at a lock, which is something that cannot -# happen outside of unit tests. tryActiveWait doesn't expect this, and doesn't -# handle this state transition -- we could teach it, but it would be just for -# this contrived test scenario. guard-state r=req10 ---- -new: state=waitFor txn=txn6 key="b" held=true guard-strength=Intent +new: state=waitForDistinguished txn=txn6 key="b" held=true guard-strength=Intent From d66290a02f65eec4f087da30c1decdeea4daa5c6 Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Mon, 26 Jun 2023 17:05:58 -0400 Subject: [PATCH 5/5] optbuilder: reset annotations when building CREATE FUNCTION In 22dabb08e76 we started overriding the annotations for each statement in the UDF body. We should reset them to the original values, so we don't accidentally leave the old reference. Release note: None --- pkg/sql/opt/optbuilder/create_function.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/sql/opt/optbuilder/create_function.go b/pkg/sql/opt/optbuilder/create_function.go index de8b88327f8d..3abcf9a0fba4 100644 --- a/pkg/sql/opt/optbuilder/create_function.go +++ b/pkg/sql/opt/optbuilder/create_function.go @@ -129,6 +129,10 @@ func (b *Builder) buildCreateFunction(cf *tree.CreateFunction, inScope *scope) ( // Reset the tracked dependencies for next statement. b.schemaDeps = nil b.schemaTypeDeps = intsets.Fast{} + + // Reset the annotations to the original values + b.evalCtx.Annotations = oldEvalCtxAnn + b.semaCtx.Annotations = oldSemaCtxAnn } // bodyScope is the base scope for each statement in the body. We add the