Skip to content

Commit

Permalink
row,schemachanger: use StoreColumnIDs/Names in primary index
Browse files Browse the repository at this point in the history
In a recent commit, the StoreColumnIDs and StoreColumnNames slices in
primary indexes were populated when previously they had simply been
empty. We simply assumed that all non-virtual columns in a table would
be stored in the primary index: primary key columns in the key, the rest
in the value.

This commit breaks that assumption by using the StoreColumnIDs slice to
determine what goes into the primary index. This makes it possible for
the new schema changer to add columns safely, preventing unwanted writes
to the old primary index while the schema change is underway.

Fixes #59149.

Release note: None
  • Loading branch information
Marius Posta committed Jun 21, 2021
1 parent eb8e1c8 commit d8ebd2b
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 505 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,7 @@ func (rf *Fetcher) checkPrimaryIndexDatumEncodings(ctx context.Context) error {
continue
}

if skip, err := rh.skipColumnInPK(colID, rowVal.Datum); err != nil {
if skip, err := rh.skipColumnNotInPrimaryIndexValue(colID, rowVal.Datum); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "unable to determine skip")
} else if skip {
continue
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/row/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type rowHelper struct {

// Computed and cached.
primaryIndexKeyPrefix []byte
primaryIndexCols catalog.TableColSet
primaryIndexKeyCols catalog.TableColSet
primaryIndexValueCols catalog.TableColSet
sortedColumnFamilies map[descpb.FamilyID][]descpb.ColumnID
}

Expand Down Expand Up @@ -130,20 +131,19 @@ func (rh *rowHelper) encodeSecondaryIndexes(
return rh.indexEntries, nil
}

// skipColumnInPK returns true if the value at column colID does not need
// to be encoded because it is already part of the primary key. Composite
// skipColumnNotInPrimaryIndexValue returns true if the value at column colID
// does not need to be encoded, either because it is already part of the primary
// key, or because it is not part of the primary index altogether. Composite
// datums are considered too, so a composite datum in a PK will return false.
// TODO(dan): This logic is common and being moved into TableDescriptor (see
// #6233). Once it is, use the shared one.
func (rh *rowHelper) skipColumnInPK(colID descpb.ColumnID, value tree.Datum) (bool, error) {
if rh.primaryIndexCols.Empty() {
for i := 0; i < rh.TableDesc.GetPrimaryIndex().NumKeyColumns(); i++ {
pkColID := rh.TableDesc.GetPrimaryIndex().GetKeyColumnID(i)
rh.primaryIndexCols.Add(pkColID)
}
func (rh *rowHelper) skipColumnNotInPrimaryIndexValue(
colID descpb.ColumnID, value tree.Datum,
) (bool, error) {
if rh.primaryIndexKeyCols.Empty() {
rh.primaryIndexKeyCols = rh.TableDesc.GetPrimaryIndex().CollectKeyColumnIDs()
rh.primaryIndexValueCols = rh.TableDesc.GetPrimaryIndex().CollectPrimaryStoredColumnIDs()
}
if !rh.primaryIndexCols.Contains(colID) {
return false, nil
if !rh.primaryIndexKeyCols.Contains(colID) {
return !rh.primaryIndexValueCols.Contains(colID), nil
}
if cdatum, ok := value.(tree.CompositeDatum); ok {
// Composite columns are encoded in both the key and the value.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/row/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func prepareInsertOrUpdateBatch(
continue
}

if skip, err := helper.skipColumnInPK(colID, values[idx]); err != nil {
if skip, err := helper.skipColumnNotInPrimaryIndexValue(colID, values[idx]); err != nil {
return nil, err
} else if skip {
continue
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/schemachanger/scbuild/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ go_library(
"//pkg/sql/sqltelemetry",
"//pkg/sql/types",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/protoutil",
"//pkg/util/sequence",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
Expand Down
75 changes: 24 additions & 51 deletions pkg/sql/schemachanger/scbuild/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/sequence"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
Expand Down Expand Up @@ -428,8 +427,8 @@ func (b *buildContext) addOrUpdatePrimaryIndexTargetsForAddColumn(
if t, ok := n.Element().(*scpb.PrimaryIndex); ok &&
b.outputNodes[i].Target.Direction == scpb.Target_ADD &&
t.TableID == table.GetID() {
t.StoreColumnIDs = append(t.StoreColumnIDs, colID)
t.StoreColumnNames = append(t.StoreColumnNames, colName)
t.Index.StoreColumnIDs = append(t.Index.StoreColumnIDs, colID)
t.Index.StoreColumnNames = append(t.Index.StoreColumnNames, colName)
return t.Index.ID
}
}
Expand All @@ -448,37 +447,23 @@ func (b *buildContext) addOrUpdatePrimaryIndexTargetsForAddColumn(
)
newIdx.ID = idxID

var storeColIDs []descpb.ColumnID
var storeColNames []string
for _, col := range table.PublicColumns() {
containsCol := false
for _, id := range newIdx.KeyColumnIDs {
if id == col.GetID() {
containsCol = true
break
}
}
if !containsCol {
storeColIDs = append(storeColIDs, col.GetID())
storeColNames = append(storeColNames, col.GetName())
}
if !table.GetPrimaryIndex().CollectKeyColumnIDs().Contains(colID) &&
!table.GetPrimaryIndex().CollectPrimaryStoredColumnIDs().Contains(colID) {
newIdx.StoreColumnIDs = append(newIdx.StoreColumnIDs, colID)
newIdx.StoreColumnNames = append(newIdx.StoreColumnNames, colName)
}

b.addNode(scpb.Target_ADD, &scpb.PrimaryIndex{
TableID: table.GetID(),
Index: newIdx,
OtherPrimaryIndexID: table.GetPrimaryIndexID(),
StoreColumnIDs: append(storeColIDs, colID),
StoreColumnNames: append(storeColNames, colName),
})

// Drop the existing primary index.
b.addNode(scpb.Target_DROP, &scpb.PrimaryIndex{
TableID: table.GetID(),
Index: table.GetPrimaryIndex().IndexDescDeepCopy(),
OtherPrimaryIndexID: newIdx.ID,
StoreColumnIDs: storeColIDs,
StoreColumnNames: storeColNames,
})

return idxID
Expand All @@ -494,10 +479,10 @@ func (b *buildContext) addOrUpdatePrimaryIndexTargetsForDropColumn(
if t, ok := n.Element().(*scpb.PrimaryIndex); ok &&
n.Target.Direction == scpb.Target_ADD &&
t.TableID == table.GetID() {
for j := range t.StoreColumnIDs {
if t.StoreColumnIDs[j] == colID {
t.StoreColumnIDs = append(t.StoreColumnIDs[:j], t.StoreColumnIDs[j+1:]...)
t.StoreColumnNames = append(t.StoreColumnNames[:j], t.StoreColumnNames[j+1:]...)
for j := range t.Index.StoreColumnIDs {
if t.Index.StoreColumnIDs[j] == colID {
t.Index.StoreColumnIDs = append(t.Index.StoreColumnIDs[:j], t.Index.StoreColumnIDs[j+1:]...)
t.Index.StoreColumnNames = append(t.Index.StoreColumnNames[:j], t.Index.StoreColumnNames[j+1:]...)
return t.Index.ID
}

Expand All @@ -509,7 +494,7 @@ func (b *buildContext) addOrUpdatePrimaryIndexTargetsForDropColumn(
// Create a new primary index, identical to the existing one except for its
// ID and name.
idxID = b.nextIndexID(table)
newIdx := protoutil.Clone(table.GetPrimaryIndex().IndexDesc()).(*descpb.IndexDescriptor)
newIdx := table.GetPrimaryIndex().IndexDescDeepCopy()
newIdx.Name = tabledesc.GenerateUniqueName(
"new_primary_key",
func(name string) bool {
Expand All @@ -519,44 +504,32 @@ func (b *buildContext) addOrUpdatePrimaryIndexTargetsForDropColumn(
},
)
newIdx.ID = idxID

var addStoreColIDs []descpb.ColumnID
var addStoreColNames []string
var dropStoreColIDs []descpb.ColumnID
var dropStoreColNames []string
for _, col := range table.PublicColumns() {
containsCol := false
for _, id := range newIdx.KeyColumnIDs {
if id == col.GetID() {
containsCol = true
break
}
for j, id := range newIdx.KeyColumnIDs {
if id == colID {
newIdx.KeyColumnIDs = append(newIdx.KeyColumnIDs[:j], newIdx.KeyColumnIDs[j+1:]...)
newIdx.KeyColumnNames = append(newIdx.KeyColumnNames[:j], newIdx.KeyColumnNames[j+1:]...)
break
}
if !containsCol {
if colID != col.GetID() {
addStoreColIDs = append(addStoreColIDs, col.GetID())
addStoreColNames = append(addStoreColNames, col.GetName())
}
dropStoreColIDs = append(dropStoreColIDs, col.GetID())
dropStoreColNames = append(dropStoreColNames, col.GetName())
}
for j, id := range newIdx.StoreColumnIDs {
if id == colID {
newIdx.StoreColumnIDs = append(newIdx.StoreColumnIDs[:j], newIdx.StoreColumnIDs[j+1:]...)
newIdx.StoreColumnNames = append(newIdx.StoreColumnNames[:j], newIdx.StoreColumnNames[j+1:]...)
break
}
}

b.addNode(scpb.Target_ADD, &scpb.PrimaryIndex{
TableID: table.GetID(),
Index: *newIdx,
Index: newIdx,
OtherPrimaryIndexID: table.GetPrimaryIndexID(),
StoreColumnIDs: addStoreColIDs,
StoreColumnNames: addStoreColNames,
})

// Drop the existing primary index.
b.addNode(scpb.Target_DROP, &scpb.PrimaryIndex{
TableID: table.GetID(),
Index: *(protoutil.Clone(table.GetPrimaryIndex().IndexDesc()).(*descpb.IndexDescriptor)),
Index: table.GetPrimaryIndex().IndexDescDeepCopy(),
OtherPrimaryIndexID: idxID,
StoreColumnIDs: dropStoreColIDs,
StoreColumnNames: dropStoreColNames,
})
return idxID
}
Expand Down
56 changes: 24 additions & 32 deletions pkg/sql/schemachanger/scbuild/testdata/alter_table
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ ALTER TABLE defaultdb.foo ADD COLUMN j INT
name: new_primary_key
partitioning: {}
sharded: {}
storeColumnIds:
- 2
storeColumnNames:
- j
unique: true
version: 4
otherPrimaryIndexId: 1
storeColumnIds:
- 2
storeColumnNames:
- j
tableId: 52
- DROP PrimaryIndex:{DescID: 52, ElementName: "primary", IndexID: 1}
state: PUBLIC
Expand Down Expand Up @@ -102,13 +102,13 @@ ALTER TABLE defaultdb.foo ADD COLUMN j INT DEFAULT 123
name: new_primary_key
partitioning: {}
sharded: {}
storeColumnIds:
- 2
storeColumnNames:
- j
unique: true
version: 4
otherPrimaryIndexId: 1
storeColumnIds:
- 2
storeColumnNames:
- j
tableId: 52
- DROP PrimaryIndex:{DescID: 52, ElementName: "primary", IndexID: 1}
state: PUBLIC
Expand Down Expand Up @@ -183,15 +183,15 @@ ALTER TABLE defaultdb.foo ADD COLUMN k INT DEFAULT 456;
name: new_primary_key
partitioning: {}
sharded: {}
storeColumnIds:
- 2
- 3
storeColumnNames:
- j
- k
unique: true
version: 4
otherPrimaryIndexId: 1
storeColumnIds:
- 2
- 3
storeColumnNames:
- j
- k
tableId: 52
- DROP PrimaryIndex:{DescID: 52, ElementName: "primary", IndexID: 1}
state: PUBLIC
Expand Down Expand Up @@ -251,13 +251,13 @@ ALTER TABLE defaultdb.foo ADD COLUMN a INT AS (i+1) STORED
name: new_primary_key
partitioning: {}
sharded: {}
storeColumnIds:
- 2
storeColumnNames:
- a
unique: true
version: 4
otherPrimaryIndexId: 1
storeColumnIds:
- 2
storeColumnNames:
- a
tableId: 52
- DROP PrimaryIndex:{DescID: 52, ElementName: "primary", IndexID: 1}
state: PUBLIC
Expand Down Expand Up @@ -334,13 +334,13 @@ ALTER TABLE defaultdb.bar ADD COLUMN b INT;
name: new_primary_key
partitioning: {}
sharded: {}
storeColumnIds:
- 2
storeColumnNames:
- a
unique: true
version: 4
otherPrimaryIndexId: 1
storeColumnIds:
- 2
storeColumnNames:
- a
tableId: 52
- ADD PrimaryIndex:{DescID: 53, ElementName: "new_primary_key", IndexID: 2}
state: ABSENT
Expand All @@ -362,17 +362,13 @@ ALTER TABLE defaultdb.bar ADD COLUMN b INT;
sharded: {}
storeColumnIds:
- 1
- 3
storeColumnNames:
- j
- b
unique: true
version: 4
otherPrimaryIndexId: 1
storeColumnIds:
- 1
- 3
storeColumnNames:
- j
- b
tableId: 53
- DROP PrimaryIndex:{DescID: 52, ElementName: "primary", IndexID: 1}
state: PUBLIC
Expand Down Expand Up @@ -421,8 +417,4 @@ ALTER TABLE defaultdb.bar ADD COLUMN b INT;
unique: true
version: 4
otherPrimaryIndexId: 2
storeColumnIds:
- 1
storeColumnNames:
- j
tableId: 53
6 changes: 2 additions & 4 deletions pkg/sql/schemachanger/scexec/executor_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ func TestSchemaChanger(t *testing.T) {
KeyColumnIDs: []descpb.ColumnID{1},
KeyColumnNames: []string{"i"},
KeyColumnDirections: []descpb.IndexDescriptor_Direction{descpb.IndexDescriptor_ASC},
StoreColumnIDs: []descpb.ColumnID{2},
StoreColumnNames: []string{"j"},
Unique: true,
Type: descpb.IndexDescriptor_FORWARD,
},
OtherPrimaryIndexID: fooTable.GetPrimaryIndexID(),
StoreColumnIDs: []descpb.ColumnID{2},
StoreColumnNames: []string{"j"},
}),
scpb.NewTarget(scpb.Target_ADD, &scpb.Column{
TableID: fooTable.GetID(),
Expand All @@ -272,8 +272,6 @@ func TestSchemaChanger(t *testing.T) {
Type: descpb.IndexDescriptor_FORWARD,
},
OtherPrimaryIndexID: 2,
StoreColumnIDs: []descpb.ColumnID{},
StoreColumnNames: []string{},
}),
}

Expand Down
Loading

0 comments on commit d8ebd2b

Please sign in to comment.