Skip to content

Commit

Permalink
rowenc: error when encoding NULLs for PK columns
Browse files Browse the repository at this point in the history
This commit adds a validation check to rowenc.EncodeIndexKey to have it
return an error when it encodes a NULL value for a primary key column.

Fixes cockroachdb#69867.

Release justification: Low risk addition of a correctness check.

Release note: None
  • Loading branch information
Marius Posta committed Oct 28, 2021
1 parent f3a2e36 commit 03064dc
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 28 deletions.
8 changes: 8 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/create_as
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,11 @@ SELECT * FROM t
----
1 1 false
2 2 true

# Regression test for #69867
statement error pgcode 23502 null value in column "x" violates not-null constraint
BEGIN;
CREATE TABLE foo69867 (x PRIMARY KEY) AS VALUES (1), (NULL);

statement ok
ROLLBACK
66 changes: 42 additions & 24 deletions pkg/sql/rowenc/index_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -67,14 +68,27 @@ func EncodeIndexKey(
values []tree.Datum,
keyPrefix []byte,
) (key []byte, containsNull bool, err error) {
return EncodePartialIndexKey(
var colIDWithNullVal descpb.ColumnID
key, colIDWithNullVal, err = EncodePartialIndexKey(
tableDesc,
index,
len(index.ColumnIDs), /* encode all columns */
colMap,
values,
keyPrefix,
)
containsNull = colIDWithNullVal != 0
if err == nil && containsNull && tableDesc.GetPrimaryIndexID() == index.ID {
col, findErr := tableDesc.FindColumnByID(colIDWithNullVal)
if findErr != nil {
return nil, true, errors.WithAssertionFailure(findErr)
}
if col.Nullable {
return nil, true, errors.AssertionFailedf("primary key column %q should not be nullable", col.Name)
}
return nil, true, sqlerrors.NewNonNullViolationError(col.Name)
}
return key, containsNull, err
}

// EncodePartialIndexSpan creates the minimal key span for the key specified by the
Expand All @@ -90,7 +104,9 @@ func EncodePartialIndexSpan(
) (span roachpb.Span, containsNull bool, err error) {
var key roachpb.Key
var endKey roachpb.Key
key, containsNull, err = EncodePartialIndexKey(tableDesc, index, numCols, colMap, values, keyPrefix)
var colIDWithNullVal descpb.ColumnID
key, colIDWithNullVal, err = EncodePartialIndexKey(tableDesc, index, numCols, colMap, values, keyPrefix)
containsNull = colIDWithNullVal != 0
if err != nil {
return span, containsNull, err
}
Expand All @@ -116,16 +132,16 @@ func EncodePartialIndexKey(
colMap map[descpb.ColumnID]int,
values []tree.Datum,
keyPrefix []byte,
) (key []byte, containsNull bool, err error) {
var colIDs, extraColIDs []descpb.ColumnID
) (key []byte, colIDWithNullVal descpb.ColumnID, err error) {
var colIDs, keySuffixColIDs []descpb.ColumnID
if numCols <= len(index.ColumnIDs) {
colIDs = index.ColumnIDs[:numCols]
} else {
if index.Unique || numCols > len(index.ColumnIDs)+len(index.ExtraColumnIDs) {
return nil, false, errors.Errorf("encoding too many columns (%d)", numCols)
return nil, colIDWithNullVal, errors.Errorf("encoding too many columns (%d)", numCols)
}
colIDs = index.ColumnIDs
extraColIDs = index.ExtraColumnIDs[:numCols-len(index.ColumnIDs)]
keySuffixColIDs = index.ExtraColumnIDs[:numCols-len(index.ColumnIDs)]
}

// We know we will append to the key which will cause the capacity to grow so
Expand All @@ -151,17 +167,15 @@ func EncodePartialIndexKey(
length = len(colIDs)
partial = true
}
var n bool
key, n, err = EncodeColumns(colIDs[:length], dirs[:length], colMap, values, key)
key, colIDWithNullVal, err = EncodeColumns(colIDs[:length], dirs[:length], colMap, values, key)
if err != nil {
return nil, false, err
return nil, colIDWithNullVal, err
}
containsNull = containsNull || n
if partial {
// Early stop. Note that if we had exactly SharedPrefixLen columns
// remaining, we want to append the next tableID/indexID pair because
// that results in a more specific key.
return key, containsNull, nil
return key, colIDWithNullVal, nil
}
colIDs, dirs = colIDs[length:], dirs[length:]
// Each ancestor is separated by an interleaved
Expand All @@ -172,19 +186,23 @@ func EncodePartialIndexKey(
key = EncodePartialTableIDIndexID(key, tableDesc.GetID(), index.ID)
}

var n bool
key, n, err = EncodeColumns(colIDs, dirs, colMap, values, key)
var keyColIDWithNullVal, keySuffixColIDWithNullVal descpb.ColumnID
key, keyColIDWithNullVal, err = EncodeColumns(colIDs, dirs, colMap, values, key)
if colIDWithNullVal == 0 {
colIDWithNullVal = keyColIDWithNullVal
}
if err != nil {
return nil, false, err
return nil, colIDWithNullVal, err
}
containsNull = containsNull || n

key, n, err = EncodeColumns(extraColIDs, nil /* directions */, colMap, values, key)
key, keySuffixColIDWithNullVal, err = EncodeColumns(keySuffixColIDs, nil /* directions */, colMap, values, key)
if colIDWithNullVal == 0 {
colIDWithNullVal = keySuffixColIDWithNullVal
}
if err != nil {
return nil, false, err
return nil, colIDWithNullVal, err
}
containsNull = containsNull || n
return key, containsNull, nil
return key, colIDWithNullVal, nil
}

type directions []descpb.IndexDescriptor_Direction
Expand Down Expand Up @@ -1671,22 +1689,22 @@ func EncodeColumns(
colMap map[descpb.ColumnID]int,
values []tree.Datum,
keyPrefix []byte,
) (key []byte, containsNull bool, err error) {
) (key []byte, colIDWithNullVal descpb.ColumnID, err error) {
key = keyPrefix
for colIdx, id := range columnIDs {
val := findColumnValue(id, colMap, values)
if val == tree.DNull {
containsNull = true
colIDWithNullVal = id
}

dir, err := directions.get(colIdx)
if err != nil {
return nil, containsNull, err
return nil, colIDWithNullVal, err
}

if key, err = EncodeTableKey(key, val, dir); err != nil {
return nil, containsNull, err
return nil, colIDWithNullVal, err
}
}
return key, containsNull, nil
return key, colIDWithNullVal, nil
}
8 changes: 4 additions & 4 deletions pkg/sql/rowenc/index_encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestIndexKey(t *testing.T) {
valuesLen := randutil.RandIntInRange(rng, len(t.primaryInterleaves)+1, len(t.primaryInterleaves)+10)
t.primaryValues = make([]tree.Datum, valuesLen)
for j := range t.primaryValues {
t.primaryValues[j] = RandDatum(rng, types.Int, true)
t.primaryValues[j] = RandDatum(rng, types.Int, false /* nullOk */)
}

t.secondaryInterleaves = make([]descpb.ID, rng.Intn(10))
Expand All @@ -187,7 +187,7 @@ func TestIndexKey(t *testing.T) {
valuesLen = randutil.RandIntInRange(rng, len(t.secondaryInterleaves)+1, len(t.secondaryInterleaves)+10)
t.secondaryValues = make([]tree.Datum, valuesLen)
for j := range t.secondaryValues {
t.secondaryValues[j] = RandDatum(rng, types.Int, true)
t.secondaryValues[j] = RandDatum(rng, types.Int, true /* nullOk */)
}

tests = append(tests, t)
Expand Down Expand Up @@ -691,11 +691,11 @@ func TestIndexKeyEquivSignature(t *testing.T) {

// Column values should be at the beginning of the
// remaining bytes of the key.
colVals, null, err := EncodeColumns(desc.PrimaryIndex.ColumnIDs, desc.PrimaryIndex.ColumnDirections, colMap, tc.table.values, nil /*key*/)
colVals, nullColID, err := EncodeColumns(desc.PrimaryIndex.ColumnIDs, desc.PrimaryIndex.ColumnDirections, colMap, tc.table.values, nil /*key*/)
if err != nil {
t.Fatal(err)
}
if null {
if nullColID != 0 {
t.Fatalf("unexpected null values when encoding expected column values")
}

Expand Down

0 comments on commit 03064dc

Please sign in to comment.