Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowenc: various improvements to IndexFetchSpec initialization #76795

Merged
merged 5 commits into from
Feb 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,6 @@ type TableDescriptor interface {
// 1. Whether the index is a mutation
// 2. if so, is it in state DELETE_AND_WRITE_ONLY
GetIndexMutationCapabilities(id descpb.IndexID) (isMutation, isWriteOnly bool)
// KeysPerRow returns the maximum number of keys used to encode a row for the
// given index. If a secondary index doesn't store any columns, then it only
// has one k/v pair, but if it stores some columns, it can return up to one
// k/v pair per family in the table, just like a primary index.
KeysPerRow(id descpb.IndexID) (int, error)

// AllIndexes returns a slice with all indexes, public and non-public,
// in the underlying proto, in their canonical order:
Expand Down Expand Up @@ -509,6 +504,16 @@ type TableDescriptor interface {
// stored columns in the specified Index.
IndexStoredColumns(idx Index) []Column

// IndexKeysPerRow returns the maximum number of keys used to encode a row for
// the given index. If a secondary index doesn't store any columns, then it
// only has one k/v pair, but if it stores some columns, it can return up to
// one k/v pair per family in the table, just like a primary index.
IndexKeysPerRow(idx Index) int

// IndexFetchSpecKeyAndSuffixColumns returns information about the key and
// suffix columns, suitable for populating a descpb.IndexFetchSpec.
IndexFetchSpecKeyAndSuffixColumns(idx Index) []descpb.IndexFetchSpec_KeyColumn

// FindColumnWithID returns the first column found whose ID matches the
// provided target ID, in the canonical order.
// If no column is found then an error is also returned.
Expand Down Expand Up @@ -552,6 +557,10 @@ type TableDescriptor interface {
// IDs are unique per table, but not unique globally.
GetNextFamilyID() descpb.FamilyID

// FamilyDefaultColumns returns the default column IDs for families with a
// default column. See IndexFetchSpec.FamilyDefaultColumns.
FamilyDefaultColumns() []descpb.IndexFetchSpec_FamilyDefaultColumn

// HasColumnBackfillMutation returns whether the table has any queued column
// mutations that require a backfill.
HasColumnBackfillMutation() bool
Expand Down
91 changes: 72 additions & 19 deletions pkg/sql/catalog/tabledesc/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,28 +251,30 @@ func (w column) HasGeneratedAsIdentitySequenceOption() bool {

// columnCache contains precomputed slices of catalog.Column interfaces.
type columnCache struct {
all []catalog.Column
public []catalog.Column
writable []catalog.Column
deletable []catalog.Column
nonDrop []catalog.Column
visible []catalog.Column
accessible []catalog.Column
readable []catalog.Column
withUDTs []catalog.Column
system []catalog.Column
index []indexColumnCache
all []catalog.Column
public []catalog.Column
writable []catalog.Column
deletable []catalog.Column
nonDrop []catalog.Column
visible []catalog.Column
accessible []catalog.Column
readable []catalog.Column
withUDTs []catalog.Column
system []catalog.Column
familyDefaultColumns []descpb.IndexFetchSpec_FamilyDefaultColumn
index []indexColumnCache
}

type indexColumnCache struct {
all []catalog.Column
allDirs []descpb.IndexDescriptor_Direction
key []catalog.Column
keyDirs []descpb.IndexDescriptor_Direction
stored []catalog.Column
keySuffix []catalog.Column
full []catalog.Column
fullDirs []descpb.IndexDescriptor_Direction
all []catalog.Column
allDirs []descpb.IndexDescriptor_Direction
key []catalog.Column
keyDirs []descpb.IndexDescriptor_Direction
stored []catalog.Column
keySuffix []catalog.Column
full []catalog.Column
fullDirs []descpb.IndexDescriptor_Direction
keyAndSuffix []descpb.IndexFetchSpec_KeyColumn
}

// newColumnCache returns a fresh fully-populated columnCache struct for the
Expand Down Expand Up @@ -336,6 +338,20 @@ func newColumnCache(desc *descpb.TableDescriptor, mutations *mutationCache) *col
lazyAllocAppendColumn(&c.withUDTs, col, numDeletable)
}
}

// Populate familyDefaultColumns.
for i := range desc.Families {
if f := &desc.Families[i]; f.DefaultColumnID != 0 {
if c.familyDefaultColumns == nil {
c.familyDefaultColumns = make([]descpb.IndexFetchSpec_FamilyDefaultColumn, 0, len(desc.Families)-i)
}
c.familyDefaultColumns = append(c.familyDefaultColumns, descpb.IndexFetchSpec_FamilyDefaultColumn{
FamilyID: f.ID,
DefaultColumnID: f.DefaultColumnID,
})
}
}

// Populate the per-index column cache
c.index = make([]indexColumnCache, 0, 1+len(desc.Indexes)+len(mutations.indexes))
c.index = append(c.index, makeIndexColumnCache(&desc.PrimaryIndex, c.all))
Expand Down Expand Up @@ -372,6 +388,43 @@ func makeIndexColumnCache(idx *descpb.IndexDescriptor, all []catalog.Column) (ic
}
ic.full = ic.all[:nFull]
ic.fullDirs = ic.allDirs[:nFull]

// Populate keyAndSuffix. Note that this method can be called on an incomplete
// (mutable) descriptor (e.g. as part of initializing a new descriptor); this
// code needs to tolerate any descriptor state (like having no key columns, or
// having uninitialized column IDs).
var invertedColumnID descpb.ColumnID
if nKey > 0 && idx.Type == descpb.IndexDescriptor_INVERTED {
invertedColumnID = idx.InvertedColumnID()
}
var compositeIDs catalog.TableColSet
for _, colID := range idx.CompositeColumnIDs {
compositeIDs.Add(colID)
}
ic.keyAndSuffix = make([]descpb.IndexFetchSpec_KeyColumn, nKey+nKeySuffix)
for i := range ic.keyAndSuffix {
col := ic.all[i]
if col == nil {
ic.keyAndSuffix[i].Name = "invalid"
continue
}
colID := col.GetID()
typ := col.GetType()
if colID != 0 && colID == invertedColumnID {
typ = idx.InvertedColumnKeyType()
}
ic.keyAndSuffix[i] = descpb.IndexFetchSpec_KeyColumn{
IndexFetchSpec_Column: descpb.IndexFetchSpec_Column{
Name: col.GetName(),
ColumnID: colID,
Type: typ,
IsNonNullable: !col.IsNullable(),
},
Direction: ic.allDirs[i],
IsComposite: compositeIDs.Contains(colID),
IsInverted: colID == invertedColumnID,
}
}
return ic
}

Expand Down
16 changes: 6 additions & 10 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,13 @@ func (desc *wrapper) GetParentSchemaID() descpb.ID {
return parentSchemaID
}

// KeysPerRow implements the TableDescriptor interface.
func (desc *wrapper) KeysPerRow(indexID descpb.IndexID) (int, error) {
if desc.PrimaryIndex.ID == indexID {
return len(desc.Families), nil
}
idx, err := desc.FindIndexWithID(indexID)
if err != nil {
return 0, err
// IndexKeysPerRow implements the TableDescriptor interface.
func (desc *wrapper) IndexKeysPerRow(idx catalog.Index) int {
if desc.PrimaryIndex.ID == idx.GetID() {
return len(desc.Families)
}
if idx.NumSecondaryStoredColumns() == 0 || len(desc.Families) == 1 {
return 1, nil
return 1
}
// Calculate the number of column families used by the secondary index. We
// only need to look at the stored columns because column families are only
Expand All @@ -132,7 +128,7 @@ func (desc *wrapper) KeysPerRow(indexID descpb.IndexID) (int, error) {
}
}
}
return numUsedFamilies, nil
return numUsedFamilies
}

// BuildIndexName returns an index name that is not equal to any
Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/catalog/tabledesc/structured_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,9 @@ func TestKeysPerRow(t *testing.T) {

desc := desctestutils.TestingGetPublicTableDescriptor(db, keys.SystemSQLCodec, "d", tableName)
require.NotNil(t, desc)
keys, err := desc.KeysPerRow(test.indexID)
if err != nil {
t.Fatal(err)
}
idx, err := desc.FindIndexWithID(test.indexID)
require.NoError(t, err)
keys := desc.IndexKeysPerRow(idx)
if test.expected != keys {
t.Errorf("expected %d keys got %d", test.expected, keys)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@ func (desc *wrapper) SystemColumns() []catalog.Column {
return desc.getExistingOrNewColumnCache().system
}

// FamilyDefaultColumns implements the TableDescriptor interface.
func (desc *wrapper) FamilyDefaultColumns() []descpb.IndexFetchSpec_FamilyDefaultColumn {
return desc.getExistingOrNewColumnCache().familyDefaultColumns
}

// PublicColumnIDs implements the TableDescriptor interface.
func (desc *wrapper) PublicColumnIDs() []descpb.ColumnID {
cols := desc.PublicColumns()
Expand Down Expand Up @@ -492,6 +497,16 @@ func (desc *wrapper) IndexStoredColumns(idx catalog.Index) []catalog.Column {
return nil
}

// IndexFetchSpecKeyAndSuffixColumns implements the TableDescriptor interface.
func (desc *wrapper) IndexFetchSpecKeyAndSuffixColumns(
idx catalog.Index,
) []descpb.IndexFetchSpec_KeyColumn {
if ic := desc.getExistingOrNewIndexColumnCache(idx); ic != nil {
return ic.keyAndSuffix
}
return nil
}

// getExistingOrNewIndexColumnCache is a convenience method for Index*Columns
// methods.
func (desc *wrapper) getExistingOrNewIndexColumnCache(idx catalog.Index) *indexColumnCache {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/drop_index
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ DROP INDEX t_secondary CASCADE;
ALTER TABLE t DROP COLUMN b;
INSERT INTO t SELECT a + 1 FROM t;

statement error pgcode 23505 duplicate key value got decoding error: column \"b\" \(2\) is not public
statement error pgcode 23505 duplicate key value violates unique constraint "t_secondary"\nDETAIL: Key \(b\)=\(0\.0\) already exists
UPSERT INTO t SELECT a + 1 FROM t;

statement ok
Expand Down
86 changes: 20 additions & 66 deletions pkg/sql/rowenc/index_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
package rowenc

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/errors"
)

Expand All @@ -34,9 +33,7 @@ func InitIndexFetchSpec(
index catalog.Index,
fetchColumnIDs []descpb.ColumnID,
) error {
oldKeyAndSuffixCols := s.KeyAndSuffixColumns
oldFetchedCols := s.FetchedColumns
oldFamilies := s.FamilyDefaultColumns
*s = descpb.IndexFetchSpec{
Version: descpb.IndexFetchSpecVersionInitial,
TableName: table.GetName(),
Expand All @@ -48,80 +45,28 @@ func InitIndexFetchSpec(
NumKeySuffixColumns: uint32(index.NumKeySuffixColumns()),
}

indexID := index.GetID()
maxKeysPerRow, err := table.KeysPerRow(indexID)
if err != nil {
return err
}
maxKeysPerRow := table.IndexKeysPerRow(index)
s.MaxKeysPerRow = uint32(maxKeysPerRow)
s.KeyPrefixLength = uint32(len(MakeIndexKeyPrefix(codec, table.GetID(), indexID)))
s.KeyPrefixLength = uint32(len(codec.TenantPrefix()) +
encoding.EncodedLengthUvarintAscending(uint64(s.TableID)) +
encoding.EncodedLengthUvarintAscending(uint64(index.GetID())))

s.FamilyDefaultColumns = table.FamilyDefaultColumns()

families := table.GetFamilies()
for i := range families {
f := &families[i]
if f.DefaultColumnID != 0 {
if s.FamilyDefaultColumns == nil {
s.FamilyDefaultColumns = oldFamilies[:0]
}
s.FamilyDefaultColumns = append(s.FamilyDefaultColumns, descpb.IndexFetchSpec_FamilyDefaultColumn{
FamilyID: f.ID,
DefaultColumnID: f.DefaultColumnID,
})
}
if f.ID > s.MaxFamilyID {
s.MaxFamilyID = f.ID
if id := families[i].ID; id > s.MaxFamilyID {
s.MaxFamilyID = id
}
}

indexCols := table.IndexColumns(index)
keyDirs := table.IndexFullColumnDirections(index)
compositeIDs := index.CollectCompositeColumnIDs()
s.KeyAndSuffixColumns = table.IndexFetchSpecKeyAndSuffixColumns(index)

var invertedColumnID descpb.ColumnID
if index.GetType() == descpb.IndexDescriptor_INVERTED {
invertedColumnID = index.InvertedColumnID()
}

mkCol := func(col catalog.Column, colID descpb.ColumnID) descpb.IndexFetchSpec_Column {
typ := col.GetType()
if colID == invertedColumnID {
typ = index.InvertedColumnKeyType()
}
return descpb.IndexFetchSpec_Column{
Name: col.GetName(),
ColumnID: colID,
Type: typ,
IsNonNullable: !col.IsNullable() && col.Public(),
}
}

numKeyCols := index.NumKeyColumns() + index.NumKeySuffixColumns()
if cap(oldKeyAndSuffixCols) >= numKeyCols {
s.KeyAndSuffixColumns = oldKeyAndSuffixCols[:numKeyCols]
} else {
s.KeyAndSuffixColumns = make([]descpb.IndexFetchSpec_KeyColumn, numKeyCols)
}
for i := range s.KeyAndSuffixColumns {
col := indexCols[i]
if !col.Public() {
// Key columns must be public.
return fmt.Errorf("column %q (%d) is not public", col.GetName(), col.GetID())
}
colID := col.GetID()
dir := descpb.IndexDescriptor_ASC
// If this is a unique index, the suffix columns are not part of the full
// index columns and are always ascending.
if i < len(keyDirs) {
dir = keyDirs[i]
}
s.KeyAndSuffixColumns[i] = descpb.IndexFetchSpec_KeyColumn{
IndexFetchSpec_Column: mkCol(col, colID),
Direction: dir,
IsComposite: compositeIDs.Contains(colID),
IsInverted: colID == invertedColumnID,
}
}

if cap(oldFetchedCols) >= len(fetchColumnIDs) {
s.FetchedColumns = oldFetchedCols[:len(fetchColumnIDs)]
} else {
Expand All @@ -132,7 +77,16 @@ func InitIndexFetchSpec(
if err != nil {
return err
}
s.FetchedColumns[i] = mkCol(col, colID)
typ := col.GetType()
if colID == invertedColumnID {
typ = index.InvertedColumnKeyType()
}
s.FetchedColumns[i] = descpb.IndexFetchSpec_Column{
Name: col.GetName(),
ColumnID: colID,
Type: typ,
IsNonNullable: !col.IsNullable() && col.Public(),
}
}

// In test builds, verify that we aren't trying to fetch columns that are not
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,7 @@ func newJoinReader(
)
jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount()
jr.streamerInfo.maxKeysPerRow, err = jr.desc.KeysPerRow(jr.index.GetID())
if err != nil {
return nil, err
}
jr.streamerInfo.maxKeysPerRow = jr.desc.IndexKeysPerRow(jr.index)
} else {
// When not using the Streamer API, we want to limit the batch size hint
// to at most half of the workmem limit. Note that it is ok if it is set
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/span/span_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func MakeSplitter(
// * The index either has just 1 family (so we'll make a GetRequest) or we
// need fewer than every column family in the table (otherwise we'd just
// make a big ScanRequest).
// TODO(radu): should we be using index.KeysPerRow() instead?
// TODO(radu): should we be using IndexKeysPerRow() instead?
numFamilies := len(table.GetFamilies())
if numFamilies > 1 && len(neededFamilies) == numFamilies {
return NoopSplitter()
Expand Down
Loading