Skip to content

Commit

Permalink
sql: avoid writing to column families that do not exist in the primar…
Browse files Browse the repository at this point in the history
…y index

Previously, if a new column family was added during an add
column and an update/insert occurred concurrently, we could
end up writing to this new column family in any primary index.
This was inadequate because if the primary index did not store
the column, then runtime will have trouble reading data from this
table, since after a rollback the added column / column family
will get cleaned up from the table descriptor. To address this,
this patch avoids writing any columns not stored within an index
descriptor.

Fixes: #99950

Release note (bug fix): Concurrent DML while adding
a new column with a new column family can lead to
corruption in the existing primary index. If a rollback
occurs the table may no longer be accessible.
  • Loading branch information
fqazi committed Mar 30, 2023
1 parent e7b37a7 commit 5dff0e5
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
8 changes: 7 additions & 1 deletion pkg/sql/row/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,13 @@ func prepareInsertOrUpdateBatch(
if !ok {
continue
}

// Skip any values with a default ID not stored in the primary index,
// which can happen if we are adding new columns.
if skip, err := helper.skipColumnNotInPrimaryIndexValue(family.DefaultColumnID, values[idx]); err != nil {
return nil, err
} else if skip {
continue
}
typ := fetchedCols[idx].GetType()
marshaled, err := valueside.MarshalLegacy(typ, values[idx])
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/sql/rowenc/index_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,14 @@ func EncodePrimaryIndex(
// The decoders expect that column family 0 is encoded with a TUPLE value tag, so we
// don't want to use the untagged value encoding.
if len(family.ColumnIDs) == 1 && family.ColumnIDs[0] == family.DefaultColumnID && family.ID != 0 {
// Single column value families which are not stored can be skipped, these
// may exist temporarily while adding a column.
if !storedColumns.Contains(family.DefaultColumnID) {
if cdatum, ok := values[colMap.GetDefault(family.DefaultColumnID)].(tree.CompositeDatum); !ok ||
!cdatum.IsComposite() {
return nil
}
}
datum := findColumnValue(family.DefaultColumnID, colMap, values)
// We want to include this column if its value is non-null or
// we were requested to include all of the columns.
Expand Down
6 changes: 0 additions & 6 deletions pkg/sql/schema_changer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3144,9 +3144,6 @@ CREATE TABLE t.test (
// backfill. This should have the same number of Puts
// as there are CPuts above.
fmt.Sprintf("Put /Table/%d/3/2/0 -> /BYTES/0x0a030a1302", tableID),
// TODO (rohany): this k/v is spurious and should be removed
// when #45343 is fixed.
fmt.Sprintf("Put /Table/%d/3/2/1/1 -> /BYTES/0x0a020104", tableID),
fmt.Sprintf("Put /Table/%d/3/2/2/1 -> /BYTES/0x0a030a3306", tableID),
fmt.Sprintf("Put /Table/%d/3/2/4/1 -> /BYTES/0x0a02010c", tableID),

Expand Down Expand Up @@ -3185,7 +3182,6 @@ CREATE TABLE t.test (
// The temporary indexes are delete-preserving -- they
// should see the delete and issue Puts.
fmt.Sprintf("Put (delete) /Table/%d/3/2/0", tableID),
fmt.Sprintf("Put (delete) /Table/%d/3/2/1/1", tableID),
fmt.Sprintf("Put (delete) /Table/%d/3/2/2/1", tableID),
fmt.Sprintf("Put (delete) /Table/%d/3/2/3/1", tableID),
fmt.Sprintf("Put (delete) /Table/%d/3/2/4/1", tableID),
Expand Down Expand Up @@ -3213,7 +3209,6 @@ CREATE TABLE t.test (
// The temporary index for the newly added index sees
// a Put in all families.
fmt.Sprintf("Put /Table/%d/3/3/0 -> /BYTES/0x0a030a1302", tableID),
fmt.Sprintf("Put /Table/%d/3/3/1/1 -> /BYTES/0x0a020106", tableID),
fmt.Sprintf("Put /Table/%d/3/3/2/1 -> /BYTES/0x0a030a3306", tableID),
fmt.Sprintf("Put /Table/%d/3/3/4/1 -> /BYTES/0x0a02010c", tableID),
// TODO(ssd): double-check that this trace makes
Expand Down Expand Up @@ -3243,7 +3238,6 @@ CREATE TABLE t.test (
// The temporary index sees a Put in all families even though
// only some are changing. This is expected.
fmt.Sprintf("Put /Table/%d/3/3/0 -> /BYTES/0x0a030a1302", tableID),
fmt.Sprintf("Put /Table/%d/3/3/1/1 -> /BYTES/0x0a020106", tableID),
fmt.Sprintf("Put /Table/%d/3/3/3/1 -> /BYTES/0x0a02010a", tableID),
}
require.Equal(t, expected, scanToArray(rows))
Expand Down

0 comments on commit 5dff0e5

Please sign in to comment.