Skip to content

Commit

Permalink
rowenc: error when encoding NULLs for PK columns
Browse files Browse the repository at this point in the history
This commit adds a validation check to rowenc.EncodeIndexKey to have it
return an error when it encodes a NULL value for a primary key column.

Fixes cockroachdb#69867.

Release note: None
  • Loading branch information
Marius Posta committed Oct 13, 2021
1 parent e67cf96 commit aeff00b
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 30 deletions.
8 changes: 8 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/create_as
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,11 @@ SELECT * FROM t
----
1 1 false
2 2 true

# Regression test for #69867
statement error pgcode 23502 null value in column "x" violates not-null constraint
BEGIN;
CREATE TABLE foo69867 (x PRIMARY KEY) AS VALUES (1), (NULL);

statement ok
ROLLBACK
1 change: 1 addition & 0 deletions pkg/sql/rowenc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/sql/lex",
"//pkg/sql/parser",
"//pkg/sql/sem/tree",
"//pkg/sql/sqlerrors",
"//pkg/sql/types",
"//pkg/util",
"//pkg/util/bitarray",
Expand Down
75 changes: 49 additions & 26 deletions pkg/sql/rowenc/index_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/inverted"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -68,14 +69,27 @@ func EncodeIndexKey(
values []tree.Datum,
keyPrefix []byte,
) (key []byte, containsNull bool, err error) {
return EncodePartialIndexKey(
var colIDWithNullVal descpb.ColumnID
key, colIDWithNullVal, err = EncodePartialIndexKey(
tableDesc,
index,
index.NumKeyColumns(), /* encode all columns */
colMap,
values,
keyPrefix,
)
containsNull = colIDWithNullVal != 0
if err == nil && containsNull && index.Primary() {
col, findErr := tableDesc.FindColumnWithID(colIDWithNullVal)
if findErr != nil {
return nil, true, errors.WithAssertionFailure(findErr)
}
if col.IsNullable() {
return nil, true, errors.AssertionFailedf("primary key column %q should not be nullable", col.GetName())
}
return nil, true, sqlerrors.NewNonNullViolationError(col.GetName())
}
return key, containsNull, err
}

// EncodePartialIndexSpan creates the minimal key span for the key specified by the
Expand All @@ -91,7 +105,9 @@ func EncodePartialIndexSpan(
) (span roachpb.Span, containsNull bool, err error) {
var key roachpb.Key
var endKey roachpb.Key
key, containsNull, err = EncodePartialIndexKey(tableDesc, index, numCols, colMap, values, keyPrefix)
var colIDWithNullVal descpb.ColumnID
key, colIDWithNullVal, err = EncodePartialIndexKey(tableDesc, index, numCols, colMap, values, keyPrefix)
containsNull = colIDWithNullVal != 0
if err != nil {
return span, containsNull, err
}
Expand All @@ -117,16 +133,16 @@ func EncodePartialIndexKey(
colMap catalog.TableColMap,
values []tree.Datum,
keyPrefix []byte,
) (key []byte, containsNull bool, err error) {
var colIDs, extraColIDs []descpb.ColumnID
) (key []byte, colIDWithNullVal descpb.ColumnID, err error) {
var colIDs, keySuffixColIDs []descpb.ColumnID
if numCols <= index.NumKeyColumns() {
colIDs = index.IndexDesc().KeyColumnIDs[:numCols]
} else {
if index.IsUnique() || numCols > index.NumKeyColumns()+index.NumKeySuffixColumns() {
return nil, false, errors.Errorf("encoding too many columns (%d)", numCols)
return nil, colIDWithNullVal, errors.Errorf("encoding too many columns (%d)", numCols)
}
colIDs = index.IndexDesc().KeyColumnIDs
extraColIDs = index.IndexDesc().KeySuffixColumnIDs[:numCols-index.NumKeyColumns()]
keySuffixColIDs = index.IndexDesc().KeySuffixColumnIDs[:numCols-index.NumKeyColumns()]
}

// We know we will append to the key which will cause the capacity to grow so
Expand All @@ -152,17 +168,15 @@ func EncodePartialIndexKey(
length = len(colIDs)
partial = true
}
var n bool
key, n, err = EncodeColumns(colIDs[:length], dirs[:length], colMap, values, key)
key, colIDWithNullVal, err = EncodeColumns(colIDs[:length], dirs[:length], colMap, values, key)
if err != nil {
return nil, false, err
return nil, colIDWithNullVal, err
}
containsNull = containsNull || n
if partial {
// Early stop. Note that if we had exactly SharedPrefixLen columns
// remaining, we want to append the next tableID/indexID pair because
// that results in a more specific key.
return key, containsNull, nil
return key, colIDWithNullVal, nil
}
colIDs, dirs = colIDs[length:], dirs[length:]
// Each ancestor is separated by an interleaved
Expand All @@ -173,19 +187,23 @@ func EncodePartialIndexKey(
key = EncodePartialTableIDIndexID(key, tableDesc.GetID(), index.GetID())
}

var n bool
key, n, err = EncodeColumns(colIDs, dirs, colMap, values, key)
var keyColIDWithNullVal, keySuffixColIDWithNullVal descpb.ColumnID
key, keyColIDWithNullVal, err = EncodeColumns(colIDs, dirs, colMap, values, key)
if colIDWithNullVal == 0 {
colIDWithNullVal = keyColIDWithNullVal
}
if err != nil {
return nil, false, err
return nil, colIDWithNullVal, err
}
containsNull = containsNull || n

key, n, err = EncodeColumns(extraColIDs, nil /* directions */, colMap, values, key)
key, keySuffixColIDWithNullVal, err = EncodeColumns(keySuffixColIDs, nil /* directions */, colMap, values, key)
if colIDWithNullVal == 0 {
colIDWithNullVal = keySuffixColIDWithNullVal
}
if err != nil {
return nil, false, err
return nil, colIDWithNullVal, err
}
containsNull = containsNull || n
return key, containsNull, nil
return key, colIDWithNullVal, nil
}

type directions []descpb.IndexDescriptor_Direction
Expand Down Expand Up @@ -1209,8 +1227,13 @@ func EncodeSecondaryIndex(

// Add the extra columns - they are encoded in ascending order which is done
// by passing nil for the encoding directions.
extraKey, _, err := EncodeColumns(secondaryIndex.IndexDesc().KeySuffixColumnIDs, nil,
colMap, values, nil)
extraKey, _, err := EncodeColumns(
secondaryIndex.IndexDesc().KeySuffixColumnIDs,
nil, /* directions */
colMap,
values,
nil, /* keyPrefix */
)
if err != nil {
return []IndexEntry{}, err
}
Expand Down Expand Up @@ -1851,24 +1874,24 @@ func EncodeColumns(
colMap catalog.TableColMap,
values []tree.Datum,
keyPrefix []byte,
) (key []byte, containsNull bool, err error) {
) (key []byte, colIDWithNullVal descpb.ColumnID, err error) {
key = keyPrefix
for colIdx, id := range columnIDs {
val := findColumnValue(id, colMap, values)
if val == tree.DNull {
containsNull = true
colIDWithNullVal = id
}

dir, err := directions.get(colIdx)
if err != nil {
return nil, containsNull, err
return nil, colIDWithNullVal, err
}

if key, err = EncodeTableKey(key, val, dir); err != nil {
return nil, containsNull, err
return nil, colIDWithNullVal, err
}
}
return key, containsNull, nil
return key, colIDWithNullVal, nil
}

// growKey returns a new key with the same contents as the given key and with
Expand Down
14 changes: 10 additions & 4 deletions pkg/sql/rowenc/index_encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestIndexKey(t *testing.T) {
valuesLen := randutil.RandIntInRange(rng, len(t.primaryInterleaves)+1, len(t.primaryInterleaves)+10)
t.primaryValues = make([]tree.Datum, valuesLen)
for j := range t.primaryValues {
t.primaryValues[j] = randgen.RandDatum(rng, types.Int, true)
t.primaryValues[j] = randgen.RandDatum(rng, types.Int, false /* nullOk */)
}

t.secondaryInterleaves = make([]descpb.ID, rng.Intn(10))
Expand All @@ -195,7 +195,7 @@ func TestIndexKey(t *testing.T) {
valuesLen = randutil.RandIntInRange(rng, len(t.secondaryInterleaves)+1, len(t.secondaryInterleaves)+10)
t.secondaryValues = make([]tree.Datum, valuesLen)
for j := range t.secondaryValues {
t.secondaryValues[j] = randgen.RandDatum(rng, types.Int, true)
t.secondaryValues[j] = randgen.RandDatum(rng, types.Int, true /* nullOk */)
}

tests = append(tests, t)
Expand Down Expand Up @@ -1089,11 +1089,17 @@ func TestIndexKeyEquivSignature(t *testing.T) {
// Column values should be at the beginning of the
// remaining bytes of the key.
pkIndexDesc := desc.GetPrimaryIndex().IndexDesc()
colVals, null, err := EncodeColumns(pkIndexDesc.KeyColumnIDs, pkIndexDesc.KeyColumnDirections, colMap, tc.table.values, nil /*key*/)
colVals, nullColID, err := EncodeColumns(
pkIndexDesc.KeyColumnIDs,
pkIndexDesc.KeyColumnDirections,
colMap,
tc.table.values,
nil, /* keyPrefix */
)
if err != nil {
t.Fatal(err)
}
if null {
if nullColID != 0 {
t.Fatalf("unexpected null values when encoding expected column values")
}

Expand Down

0 comments on commit aeff00b

Please sign in to comment.