Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
65474: changefeedccl: Support enum columns in avro encoding r=[miretskiy] a=HonoreDB

In the JSON encoder, enums are rendered as strings,
and if a type is altered, this is reflected in future messages
but doesn't backfill. Because a backfill would be nontrivial,
we're doing the same thing in the avro encoder rather than use
the Avro enum type.

Note that enums are not a built-in type and therefore not
hit in avro_test.go. I'm hoping to figure out how to fix that
before merging this commit, or as a followup.

Release note (bug fix): Enum columns can be encoded in avro changefeeds

65683: server: Reject SQL Pods with an invalid tenant id. r=ajwerner a=rimadeodhar

This commit fixes an issue where attempting to start a SQL Pod
through cockroach mt start-sql command returns an error if the
tenant id has not been created before. Previously, attempting to
start a SQL Pod with a non existent tenant id would crash. Even
worse, it would poison the tenant id causing future
crdb_internal.create_sql_tenant calls to fail.
With this fix, the cockroach mt start-sql command returns an error
for non existent tenant ids and future calls to
crdb_internal.create_sql_tenant are successful.

Resolves: #64963

Release note (bug fix): The cockroach mt start-sql command with a
non existent tenant id returns an error. Previously, it would
crash and poison the tenant id for future usage.

65750: sql: fix schema privileges on descriptor read r=ajwerner a=RichardJCai

Release note (bug fix): Previously a schema's privilege descriptor
could become corrupted upon executing ALTER DATABASE ...
CONVERT TO SCHEMA due to privileges that are invalid on a schema
being copied over to the schema rendering the schema inusable due
to invalid privileges.

Fixes #65697

65866: bazel: add targets to generate cluster setting/functions documentation r=rail a=rickystewart

I validated that these work and have a clean `diff` with what's in tree.

Closes #65808.
Closes #65811.

Release note: None

65907:  opt: normalize x=True, x=False, x != True, and x != False to x or NOT x r=rytaft a=rytaft

This commit adds four normalization rules, `FoldEqTrue`, `FoldEqFalse`, 
`FoldNeTrue`, and `FoldNeFalse`, which normalize `x=True` to `x`, `x=False` to
`NOT x`, `x != True` to `NOT x`, and `x != False` to `x`. These rules are
important since they can unlock other types of optimizations, such
as constrained index scans.

Fixes #65684

Release note (performance improvement): Fixed an issue in the optimizer
that prevented spatial predicates of the form `(column && value) = true` from
being index-accelerated. These queries can now use a spatial index if one is
available.

65960: bazel: bump size of `//pkg/sql:sql_test` r=rail a=rickystewart

This has been routinely timing out in CI.

Release note: None

Co-authored-by: Aaron Zinger <[email protected]>
Co-authored-by: rimadeodhar <[email protected]>
Co-authored-by: richardjcai <[email protected]>
Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
6 people committed Jun 1, 2021
7 parents d4cf887 + 8113bed + 57cad09 + 6369e54 + e20d10a + 33babf1 + 8773980 commit 94e48ae
Show file tree
Hide file tree
Showing 30 changed files with 702 additions and 72 deletions.
13 changes: 13 additions & 0 deletions docs/generated/settings/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
genrule(
name = "settings",
outs = ["settings.html"],
cmd = "$(location //pkg/cmd/cockroach-short) gen settings-list --format=html > $@",
exec_tools = ["//pkg/cmd/cockroach-short"],
)

genrule(
name = "settings_for_tenants",
outs = ["settings-for-tenants.txt"],
cmd = "$(location //pkg/cmd/cockroach-short) gen settings-list --without-system-only > $@",
exec_tools = ["//pkg/cmd/cockroach-short"],
)
17 changes: 17 additions & 0 deletions docs/generated/sql/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
genrule(
name = "sql",
outs = [
"aggregates.md",
"functions.md",
"operators.md",
"window_functions.md",
],
cmd = """
$(location //pkg/cmd/docgen) functions . --quiet
mv aggregates.md $(location aggregates.md)
mv functions.md $(location functions.md)
mv operators.md $(location operators.md)
mv window_functions.md $(location window_functions.md)
""",
exec_tools = ["//pkg/cmd/docgen"],
)
3 changes: 3 additions & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,4 +268,7 @@ type TestTenantArgs struct {
// automatically open a connection to the server. That's equivalent to running
// SET DATABASE=foo, which works even if the database doesn't (yet) exist.
UseDatabase string

// Skip check for tenant existence when running the test.
SkipTenantCheck bool
}
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ go_test(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/schemaexpr",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/distsql",
"//pkg/sql/enum",
"//pkg/sql/execinfra",
"//pkg/sql/flowinfra",
"//pkg/sql/parser",
Expand Down
25 changes: 25 additions & 0 deletions pkg/ccl/changefeedccl/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ type avroDataRecord struct {

colIdxByFieldIdx map[int]int
fieldIdxByName map[string]int
fieldIdxByColIdx map[int]int
// Allocate Go native representation once, to avoid repeated map allocation
// when encoding.
native map[string]interface{}
Expand Down Expand Up @@ -468,6 +469,16 @@ func typeToAvroSchema(typ *types.T, reuseMap bool) (*avroSchemaField, error) {
return tree.ParseDJSON(x.(string))
},
)
case types.EnumFamily:
setNullable(
avroSchemaString,
func(d tree.Datum) (interface{}, error) {
return d.(*tree.DEnum).LogicalRep, nil
},
func(x interface{}) (tree.Datum, error) {
return tree.MakeDEnumFromLogicalRepresentation(typ, x.(string))
},
)
case types.ArrayFamily:
itemSchema, err := typeToAvroSchema(typ.ArrayContents(), false /*reuse map*/)
if err != nil {
Expand Down Expand Up @@ -545,6 +556,7 @@ func indexToAvroSchema(
},
fieldIdxByName: make(map[string]int),
colIdxByFieldIdx: make(map[int]int),
fieldIdxByColIdx: make(map[int]int),
}
colIdxByID := catalog.ColumnIDToOrdinalMap(tableDesc.PublicColumns())
for i := 0; i < index.NumKeyColumns(); i++ {
Expand All @@ -559,6 +571,7 @@ func indexToAvroSchema(
return nil, err
}
schema.colIdxByFieldIdx[len(schema.Fields)] = colIdx
schema.fieldIdxByColIdx[colIdx] = len(schema.Fields)
schema.fieldIdxByName[field.Name] = len(schema.Fields)
schema.Fields = append(schema.Fields, field)
}
Expand Down Expand Up @@ -598,6 +611,7 @@ func tableToAvroSchema(
},
fieldIdxByName: make(map[string]int),
colIdxByFieldIdx: make(map[int]int),
fieldIdxByColIdx: make(map[int]int),
}
for _, col := range tableDesc.PublicColumns() {
field, err := columnToAvroSchema(col)
Expand All @@ -606,6 +620,7 @@ func tableToAvroSchema(
}
schema.colIdxByFieldIdx[len(schema.Fields)] = col.Ordinal()
schema.fieldIdxByName[field.Name] = len(schema.Fields)
schema.fieldIdxByColIdx[col.Ordinal()] = len(schema.Fields)
schema.Fields = append(schema.Fields, field)
}
schemaJSON, err := json.Marshal(schema)
Expand Down Expand Up @@ -819,6 +834,16 @@ func (r *avroEnvelopeRecord) BinaryFromRow(
return r.codec.BinaryFromNative(buf, native)
}

// Refresh the metadata for user-defined types on a cached schema
// The only user-defined type is enum, so this is usually a no-op
func (r *avroDataRecord) refreshTypeMetadata(tbl catalog.TableDescriptor) {
for _, col := range tbl.UserDefinedTypeColumns() {
if fieldIdx, ok := r.fieldIdxByColIdx[col.Ordinal()]; ok {
r.Fields[fieldIdx].typ = col.GetType()
}
}
}

// decimalToRat converts one of our apd decimals to the format expected by the
// avro library we use. If the column has a fixed scale (which is always true if
// precision is set) this is roundtripable without information loss.
Expand Down
66 changes: 63 additions & 3 deletions pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
Expand All @@ -43,6 +45,15 @@ import (
"golang.org/x/text/collate"
)

var testTypes = make(map[string]*types.T)
var testTypeResolver = tree.MakeTestingMapTypeResolver(testTypes)

func makeTestSemaCtx() tree.SemaContext {
testSemaCtx := tree.MakeSemaContext()
testSemaCtx.TypeResolver = testTypeResolver
return testSemaCtx
}

func parseTableDesc(createTableStmt string) (catalog.TableDescriptor, error) {
ctx := context.Background()
stmt, err := parser.ParseOne(createTableStmt)
Expand All @@ -56,7 +67,7 @@ func parseTableDesc(createTableStmt string) (catalog.TableDescriptor, error) {
st := cluster.MakeTestingClusterSettings()
const parentID = descpb.ID(keys.MaxReservedDescID + 1)
const tableID = descpb.ID(keys.MaxReservedDescID + 2)
semaCtx := tree.MakeSemaContext()
semaCtx := makeTestSemaCtx()
mutDesc, err := importccl.MakeSimpleTableDescriptor(
ctx, &semaCtx, st, createTable, parentID, keys.PublicSchemaID, tableID, importccl.NoFKs, hlc.UnixNano())
if err != nil {
Expand All @@ -67,7 +78,7 @@ func parseTableDesc(createTableStmt string) (catalog.TableDescriptor, error) {

func parseValues(tableDesc catalog.TableDescriptor, values string) ([]rowenc.EncDatumRow, error) {
ctx := context.Background()
semaCtx := tree.MakeSemaContext()
semaCtx := makeTestSemaCtx()
evalCtx := &tree.EvalContext{}

valuesStmt, err := parser.ParseOne(values)
Expand Down Expand Up @@ -136,7 +147,7 @@ func avroFieldMetadataToColDesc(metadata string) (*descpb.ColumnDescriptor, erro
}
def := parsed.AST.(*tree.AlterTable).Cmds[0].(*tree.AlterTableAddColumn).ColumnDef
ctx := context.Background()
semaCtx := tree.MakeSemaContext()
semaCtx := makeTestSemaCtx()
col, _, _, err := tabledesc.MakeColumnDefDescs(ctx, def, &semaCtx, &tree.EvalContext{})
return col, err
}
Expand All @@ -147,6 +158,37 @@ func randTime(rng *rand.Rand) time.Time {
return timeutil.Unix(0, rng.Int63())
}

//Create a thin, in-memory user-defined enum type
func createEnum(enumLabels tree.EnumValueList, typeName tree.TypeName) *types.T {

members := make([]descpb.TypeDescriptor_EnumMember, len(enumLabels))
physReps := enum.GenerateNEvenlySpacedBytes(len(enumLabels))
for i := range enumLabels {
members[i] = descpb.TypeDescriptor_EnumMember{
LogicalRepresentation: string(enumLabels[i]),
PhysicalRepresentation: physReps[i],
Capability: descpb.TypeDescriptor_EnumMember_ALL,
}
}

enumKind := descpb.TypeDescriptor_ENUM

typeDesc := typedesc.NewBuilder(&descpb.TypeDescriptor{
Name: typeName.Type(),
ID: 0,
Kind: enumKind,
EnumMembers: members,
Version: 1,
}).BuildCreatedMutableType()

typ, _ := typeDesc.MakeTypesT(context.Background(), &typeName, nil)

testTypes[typeName.SQLString()] = typ

return typ

}

func TestAvroSchema(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -278,6 +320,13 @@ func TestAvroSchema(t *testing.T) {
}
}

testEnum := createEnum(
tree.EnumValueList{tree.EnumValue(`open`), tree.EnumValue(`closed`)},
tree.MakeUnqualifiedTypeName(`switch`),
)

typesToTest = append(typesToTest, testEnum)

// Generate a test for each column type with a random datum of that type.
for _, typ := range typesToTest {
var datum tree.Datum
Expand Down Expand Up @@ -516,6 +565,9 @@ func TestAvroSchema(t *testing.T) {
{sqlType: `VARCHAR COLLATE "fr"`,
sql: `'Bonjour' COLLATE "fr"`,
avro: `{"string":"Bonjour"}`},
{sqlType: `switch`, // User-defined enum with values "open", "closed"
sql: `'open'`,
avro: `{"string":"open"}`},
}

for _, test := range goldens {
Expand Down Expand Up @@ -796,6 +848,14 @@ func BenchmarkEncodeBool(b *testing.B) {
benchmarkEncodeType(b, types.Bool, randEncDatumRow(types.Bool))
}

func BenchmarkEncodeEnum(b *testing.B) {
testEnum := createEnum(
tree.EnumValueList{tree.EnumValue(`open`), tree.EnumValue(`closed`)},
tree.MakeUnqualifiedTypeName(`switch`),
)
benchmarkEncodeType(b, testEnum, randEncDatumRow(testEnum))
}

func BenchmarkEncodeFloat(b *testing.B) {
benchmarkEncodeType(b, types.Float, randEncDatumRow(types.Float))
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,16 @@ func TestChangefeedUserDefinedTypes(t *testing.T) {
`tt: [3]->{"after": {"x": 3, "y": "hiya", "z": "bye"}}`,
`tt: [4]->{"after": {"x": 4, "y": "hello", "z": "cya"}}`,
})

// If we rename a value in an existing type, it doesn't count as a change
// but the rename is reflected in future changes.
sqlDB.Exec(t, `ALTER TYPE t RENAME VALUE 'hi' TO 'yo'`)
sqlDB.Exec(t, `UPDATE tt SET z='cya' where x=2`)

assertPayloads(t, cf, []string{
`tt: [2]->{"after": {"x": 2, "y": "yo", "z": "cya"}}`,
})

}

t.Run(`sinkless`, sinklessTest(testFn))
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/changefeedccl/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,10 @@ func (e *confluentAvroEncoder) EncodeKey(ctx context.Context, row encodeRow) ([]
e.keyCache[cacheKey] = registered
}

if ok {
registered.schema.refreshTypeMetadata(row.tableDesc)
}

// https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
header := []byte{
changefeedbase.ConfluentAvroWireFormatMagic,
Expand Down Expand Up @@ -432,6 +436,13 @@ func (e *confluentAvroEncoder) EncodeValue(ctx context.Context, row encodeRow) (
// TODO(dan): Bound the size of this cache.
e.valueCache[cacheKey] = registered
}
if ok {
registered.schema.after.refreshTypeMetadata(row.tableDesc)
if row.prevTableDesc != nil && registered.schema.before != nil {
registered.schema.before.refreshTypeMetadata(row.prevTableDesc)
}
}

var meta avroMetadata
if registered.schema.opts.updatedField {
meta = map[string]interface{}{
Expand Down
65 changes: 65 additions & 0 deletions pkg/ccl/changefeedccl/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,71 @@ func TestAvroCollatedString(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestAvroEnum(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
reg := cdctest.StartTestSchemaRegistry()
defer reg.Close()

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')`)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b status, c int default 0)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 'open')`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (2, null)`)

foo := feed(t, f, `CREATE CHANGEFEED FOR foo `+
`WITH format=$1, confluent_schema_registry=$2`,
changefeedbase.OptFormatAvro, reg.URL())
defer closeFeed(t, foo)
assertPayloadsAvro(t, reg, foo, []string{
`foo: {"a":{"long":1}}->{"after":{"foo":{"a":{"long":1},"b":{"string":"open"},"c":{"long":0}}}}`,
`foo: {"a":{"long":2}}->{"after":{"foo":{"a":{"long":2},"b":null,"c":{"long":0}}}}`,
})

sqlDB.Exec(t, `ALTER TYPE status ADD value 'review'`)
sqlDB.Exec(t, `INSERT INTO foo values (4, 'review')`)

assertPayloadsAvro(t, reg, foo, []string{
`foo: {"a":{"long":4}}->{"after":{"foo":{"a":{"long":4},"b":{"string":"review"},"c":{"long":0}}}}`,
})

// Renaming an enum type doesn't count as a change itself but gets picked up by the encoder
sqlDB.Exec(t, `ALTER TYPE status RENAME value 'open' to 'active'`)
sqlDB.Exec(t, `INSERT INTO foo values (3, 'active')`)
sqlDB.Exec(t, `UPDATE foo set c=1 where a=1`)

assertPayloadsAvro(t, reg, foo, []string{
`foo: {"a":{"long":3}}->{"after":{"foo":{"a":{"long":3},"b":{"string":"active"},"c":{"long":0}}}}`,
`foo: {"a":{"long":1}}->{"after":{"foo":{"a":{"long":1},"b":{"string":"active"},"c":{"long":1}}}}`,
})

// Enum can be part of a compound primary key
sqlDB.Exec(t, `CREATE TABLE soft_deletes (a INT, b status, c INT default 0, PRIMARY KEY (a,b))`)
sqlDB.Exec(t, `INSERT INTO soft_deletes values (0, 'active')`)

sd := feed(t, f, `CREATE CHANGEFEED FOR soft_deletes `+
`WITH format=$1, confluent_schema_registry=$2`,
changefeedbase.OptFormatAvro, reg.URL())
defer closeFeed(t, sd)
assertPayloadsAvro(t, reg, sd, []string{
`soft_deletes: {"a":{"long":0},"b":{"string":"active"}}->{"after":{"soft_deletes":{"a":{"long":0},"b":{"string":"active"},"c":{"long":0}}}}`,
})

sqlDB.Exec(t, `ALTER TYPE status RENAME value 'active' to 'open'`)
sqlDB.Exec(t, `UPDATE soft_deletes set c=1 where a=0`)

assertPayloadsAvro(t, reg, sd, []string{
`soft_deletes: {"a":{"long":0},"b":{"string":"open"}}->{"after":{"soft_deletes":{"a":{"long":0},"b":{"string":"open"},"c":{"long":1}}}}`,
})

}

t.Run(`sinkless`, sinklessTest(testFn))
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestAvroSchemaNaming(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
18 changes: 18 additions & 0 deletions pkg/ccl/serverccl/server_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,21 @@ func TestIdleExit(t *testing.T) {
t.Error("stop on idle didn't trigger")
}
}

func TestNonExistentTenant(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

_, err := tc.Server(0).StartTenant(ctx,
base.TestTenantArgs{
TenantID: serverutils.TestTenantID(),
Existing: true,
SkipTenantCheck: true,
})
require.Error(t, err)
require.Equal(t, "system DB uninitialized, check if tenant is non existent", err.Error())
}
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catconstants",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
Expand Down
Loading

0 comments on commit 94e48ae

Please sign in to comment.