Skip to content

Commit

Permalink
Merge #76795
Browse files Browse the repository at this point in the history
76795: rowenc: various improvements to IndexFetchSpec initialization r=RaduBerinde a=RaduBerinde

First commit is #76788.

#### catalog: store IndexFetchSpec key columns in table descriptor

We now cache the IndexFetchSpec_KeyAndSuffixColumns in the table
descriptor, making it cheaper to populate an IndexFetchSpec.

Release note: None

#### catalog: cleanup KeysPerRow

This change renames KeysPerRow to IndexKeysPerRow and changes the
argument to an IndexDescriptor, removing the error case.

Release note: None

#### catalog: cache family default columns in table descriptor

We now cache the `IndexFetchSpec_FamilyDefaultColumn`s in the table
descriptor, to avoid an allocation when creating an `IndexFetchSpec`.

Release note: None

#### rowenc: calculate key prefix length without allocating

We now calculate the key prefix length directly instead of creating a
throw-away key.

Release note: None

Co-authored-by: Radu Berinde <[email protected]>
  • Loading branch information
craig[bot] and RaduBerinde committed Feb 20, 2022
2 parents d05e52e + 9515fc3 commit ae6d033
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 110 deletions.
19 changes: 14 additions & 5 deletions pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,6 @@ type TableDescriptor interface {
// 1. Whether the index is a mutation
// 2. if so, is it in state DELETE_AND_WRITE_ONLY
GetIndexMutationCapabilities(id descpb.IndexID) (isMutation, isWriteOnly bool)
// KeysPerRow returns the maximum number of keys used to encode a row for the
// given index. If a secondary index doesn't store any columns, then it only
// has one k/v pair, but if it stores some columns, it can return up to one
// k/v pair per family in the table, just like a primary index.
KeysPerRow(id descpb.IndexID) (int, error)

// AllIndexes returns a slice with all indexes, public and non-public,
// in the underlying proto, in their canonical order:
Expand Down Expand Up @@ -509,6 +504,16 @@ type TableDescriptor interface {
// stored columns in the specified Index.
IndexStoredColumns(idx Index) []Column

// IndexKeysPerRow returns the maximum number of keys used to encode a row for
// the given index. If a secondary index doesn't store any columns, then it
// only has one k/v pair, but if it stores some columns, it can return up to
// one k/v pair per family in the table, just like a primary index.
IndexKeysPerRow(idx Index) int

// IndexFetchSpecKeyAndSuffixColumns returns information about the key and
// suffix columns, suitable for populating a descpb.IndexFetchSpec.
IndexFetchSpecKeyAndSuffixColumns(idx Index) []descpb.IndexFetchSpec_KeyColumn

// FindColumnWithID returns the first column found whose ID matches the
// provided target ID, in the canonical order.
// If no column is found then an error is also returned.
Expand Down Expand Up @@ -552,6 +557,10 @@ type TableDescriptor interface {
// IDs are unique per table, but not unique globally.
GetNextFamilyID() descpb.FamilyID

// FamilyDefaultColumns returns the default column IDs for families with a
// default column. See IndexFetchSpec.FamilyDefaultColumns.
FamilyDefaultColumns() []descpb.IndexFetchSpec_FamilyDefaultColumn

// HasColumnBackfillMutation returns whether the table has any queued column
// mutations that require a backfill.
HasColumnBackfillMutation() bool
Expand Down
91 changes: 72 additions & 19 deletions pkg/sql/catalog/tabledesc/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,28 +251,30 @@ func (w column) HasGeneratedAsIdentitySequenceOption() bool {

// columnCache contains precomputed slices of catalog.Column interfaces.
type columnCache struct {
all []catalog.Column
public []catalog.Column
writable []catalog.Column
deletable []catalog.Column
nonDrop []catalog.Column
visible []catalog.Column
accessible []catalog.Column
readable []catalog.Column
withUDTs []catalog.Column
system []catalog.Column
index []indexColumnCache
all []catalog.Column
public []catalog.Column
writable []catalog.Column
deletable []catalog.Column
nonDrop []catalog.Column
visible []catalog.Column
accessible []catalog.Column
readable []catalog.Column
withUDTs []catalog.Column
system []catalog.Column
familyDefaultColumns []descpb.IndexFetchSpec_FamilyDefaultColumn
index []indexColumnCache
}

type indexColumnCache struct {
all []catalog.Column
allDirs []descpb.IndexDescriptor_Direction
key []catalog.Column
keyDirs []descpb.IndexDescriptor_Direction
stored []catalog.Column
keySuffix []catalog.Column
full []catalog.Column
fullDirs []descpb.IndexDescriptor_Direction
all []catalog.Column
allDirs []descpb.IndexDescriptor_Direction
key []catalog.Column
keyDirs []descpb.IndexDescriptor_Direction
stored []catalog.Column
keySuffix []catalog.Column
full []catalog.Column
fullDirs []descpb.IndexDescriptor_Direction
keyAndSuffix []descpb.IndexFetchSpec_KeyColumn
}

// newColumnCache returns a fresh fully-populated columnCache struct for the
Expand Down Expand Up @@ -336,6 +338,20 @@ func newColumnCache(desc *descpb.TableDescriptor, mutations *mutationCache) *col
lazyAllocAppendColumn(&c.withUDTs, col, numDeletable)
}
}

// Populate familyDefaultColumns.
for i := range desc.Families {
if f := &desc.Families[i]; f.DefaultColumnID != 0 {
if c.familyDefaultColumns == nil {
c.familyDefaultColumns = make([]descpb.IndexFetchSpec_FamilyDefaultColumn, 0, len(desc.Families)-i)
}
c.familyDefaultColumns = append(c.familyDefaultColumns, descpb.IndexFetchSpec_FamilyDefaultColumn{
FamilyID: f.ID,
DefaultColumnID: f.DefaultColumnID,
})
}
}

// Populate the per-index column cache
c.index = make([]indexColumnCache, 0, 1+len(desc.Indexes)+len(mutations.indexes))
c.index = append(c.index, makeIndexColumnCache(&desc.PrimaryIndex, c.all))
Expand Down Expand Up @@ -372,6 +388,43 @@ func makeIndexColumnCache(idx *descpb.IndexDescriptor, all []catalog.Column) (ic
}
ic.full = ic.all[:nFull]
ic.fullDirs = ic.allDirs[:nFull]

// Populate keyAndSuffix. Note that this method can be called on an incomplete
// (mutable) descriptor (e.g. as part of initializing a new descriptor); this
// code needs to tolerate any descriptor state (like having no key columns, or
// having uninitialized column IDs).
var invertedColumnID descpb.ColumnID
if nKey > 0 && idx.Type == descpb.IndexDescriptor_INVERTED {
invertedColumnID = idx.InvertedColumnID()
}
var compositeIDs catalog.TableColSet
for _, colID := range idx.CompositeColumnIDs {
compositeIDs.Add(colID)
}
ic.keyAndSuffix = make([]descpb.IndexFetchSpec_KeyColumn, nKey+nKeySuffix)
for i := range ic.keyAndSuffix {
col := ic.all[i]
if col == nil {
ic.keyAndSuffix[i].Name = "invalid"
continue
}
colID := col.GetID()
typ := col.GetType()
if colID != 0 && colID == invertedColumnID {
typ = idx.InvertedColumnKeyType()
}
ic.keyAndSuffix[i] = descpb.IndexFetchSpec_KeyColumn{
IndexFetchSpec_Column: descpb.IndexFetchSpec_Column{
Name: col.GetName(),
ColumnID: colID,
Type: typ,
IsNonNullable: !col.IsNullable(),
},
Direction: ic.allDirs[i],
IsComposite: compositeIDs.Contains(colID),
IsInverted: colID == invertedColumnID,
}
}
return ic
}

Expand Down
16 changes: 6 additions & 10 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,13 @@ func (desc *wrapper) GetParentSchemaID() descpb.ID {
return parentSchemaID
}

// KeysPerRow implements the TableDescriptor interface.
func (desc *wrapper) KeysPerRow(indexID descpb.IndexID) (int, error) {
if desc.PrimaryIndex.ID == indexID {
return len(desc.Families), nil
}
idx, err := desc.FindIndexWithID(indexID)
if err != nil {
return 0, err
// IndexKeysPerRow implements the TableDescriptor interface.
func (desc *wrapper) IndexKeysPerRow(idx catalog.Index) int {
if desc.PrimaryIndex.ID == idx.GetID() {
return len(desc.Families)
}
if idx.NumSecondaryStoredColumns() == 0 || len(desc.Families) == 1 {
return 1, nil
return 1
}
// Calculate the number of column families used by the secondary index. We
// only need to look at the stored columns because column families are only
Expand All @@ -132,7 +128,7 @@ func (desc *wrapper) KeysPerRow(indexID descpb.IndexID) (int, error) {
}
}
}
return numUsedFamilies, nil
return numUsedFamilies
}

// BuildIndexName returns an index name that is not equal to any
Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/catalog/tabledesc/structured_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,10 +723,9 @@ func TestKeysPerRow(t *testing.T) {

desc := desctestutils.TestingGetPublicTableDescriptor(db, keys.SystemSQLCodec, "d", tableName)
require.NotNil(t, desc)
keys, err := desc.KeysPerRow(test.indexID)
if err != nil {
t.Fatal(err)
}
idx, err := desc.FindIndexWithID(test.indexID)
require.NoError(t, err)
keys := desc.IndexKeysPerRow(idx)
if test.expected != keys {
t.Errorf("expected %d keys got %d", test.expected, keys)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/catalog/tabledesc/table_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,11 @@ func (desc *wrapper) SystemColumns() []catalog.Column {
return desc.getExistingOrNewColumnCache().system
}

// FamilyDefaultColumns implements the TableDescriptor interface.
func (desc *wrapper) FamilyDefaultColumns() []descpb.IndexFetchSpec_FamilyDefaultColumn {
return desc.getExistingOrNewColumnCache().familyDefaultColumns
}

// PublicColumnIDs implements the TableDescriptor interface.
func (desc *wrapper) PublicColumnIDs() []descpb.ColumnID {
cols := desc.PublicColumns()
Expand Down Expand Up @@ -492,6 +497,16 @@ func (desc *wrapper) IndexStoredColumns(idx catalog.Index) []catalog.Column {
return nil
}

// IndexFetchSpecKeyAndSuffixColumns implements the TableDescriptor interface.
func (desc *wrapper) IndexFetchSpecKeyAndSuffixColumns(
idx catalog.Index,
) []descpb.IndexFetchSpec_KeyColumn {
if ic := desc.getExistingOrNewIndexColumnCache(idx); ic != nil {
return ic.keyAndSuffix
}
return nil
}

// getExistingOrNewIndexColumnCache is a convenience method for Index*Columns
// methods.
func (desc *wrapper) getExistingOrNewIndexColumnCache(idx catalog.Index) *indexColumnCache {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/drop_index
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ DROP INDEX t_secondary CASCADE;
ALTER TABLE t DROP COLUMN b;
INSERT INTO t SELECT a + 1 FROM t;

statement error pgcode 23505 duplicate key value got decoding error: column \"b\" \(2\) is not public
statement error pgcode 23505 duplicate key value violates unique constraint "t_secondary"\nDETAIL: Key \(b\)=\(0\.0\) already exists
UPSERT INTO t SELECT a + 1 FROM t;

statement ok
Expand Down
86 changes: 20 additions & 66 deletions pkg/sql/rowenc/index_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
package rowenc

import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/errors"
)

Expand All @@ -34,9 +33,7 @@ func InitIndexFetchSpec(
index catalog.Index,
fetchColumnIDs []descpb.ColumnID,
) error {
oldKeyAndSuffixCols := s.KeyAndSuffixColumns
oldFetchedCols := s.FetchedColumns
oldFamilies := s.FamilyDefaultColumns
*s = descpb.IndexFetchSpec{
Version: descpb.IndexFetchSpecVersionInitial,
TableName: table.GetName(),
Expand All @@ -48,80 +45,28 @@ func InitIndexFetchSpec(
NumKeySuffixColumns: uint32(index.NumKeySuffixColumns()),
}

indexID := index.GetID()
maxKeysPerRow, err := table.KeysPerRow(indexID)
if err != nil {
return err
}
maxKeysPerRow := table.IndexKeysPerRow(index)
s.MaxKeysPerRow = uint32(maxKeysPerRow)
s.KeyPrefixLength = uint32(len(MakeIndexKeyPrefix(codec, table.GetID(), indexID)))
s.KeyPrefixLength = uint32(len(codec.TenantPrefix()) +
encoding.EncodedLengthUvarintAscending(uint64(s.TableID)) +
encoding.EncodedLengthUvarintAscending(uint64(index.GetID())))

s.FamilyDefaultColumns = table.FamilyDefaultColumns()

families := table.GetFamilies()
for i := range families {
f := &families[i]
if f.DefaultColumnID != 0 {
if s.FamilyDefaultColumns == nil {
s.FamilyDefaultColumns = oldFamilies[:0]
}
s.FamilyDefaultColumns = append(s.FamilyDefaultColumns, descpb.IndexFetchSpec_FamilyDefaultColumn{
FamilyID: f.ID,
DefaultColumnID: f.DefaultColumnID,
})
}
if f.ID > s.MaxFamilyID {
s.MaxFamilyID = f.ID
if id := families[i].ID; id > s.MaxFamilyID {
s.MaxFamilyID = id
}
}

indexCols := table.IndexColumns(index)
keyDirs := table.IndexFullColumnDirections(index)
compositeIDs := index.CollectCompositeColumnIDs()
s.KeyAndSuffixColumns = table.IndexFetchSpecKeyAndSuffixColumns(index)

var invertedColumnID descpb.ColumnID
if index.GetType() == descpb.IndexDescriptor_INVERTED {
invertedColumnID = index.InvertedColumnID()
}

mkCol := func(col catalog.Column, colID descpb.ColumnID) descpb.IndexFetchSpec_Column {
typ := col.GetType()
if colID == invertedColumnID {
typ = index.InvertedColumnKeyType()
}
return descpb.IndexFetchSpec_Column{
Name: col.GetName(),
ColumnID: colID,
Type: typ,
IsNonNullable: !col.IsNullable() && col.Public(),
}
}

numKeyCols := index.NumKeyColumns() + index.NumKeySuffixColumns()
if cap(oldKeyAndSuffixCols) >= numKeyCols {
s.KeyAndSuffixColumns = oldKeyAndSuffixCols[:numKeyCols]
} else {
s.KeyAndSuffixColumns = make([]descpb.IndexFetchSpec_KeyColumn, numKeyCols)
}
for i := range s.KeyAndSuffixColumns {
col := indexCols[i]
if !col.Public() {
// Key columns must be public.
return fmt.Errorf("column %q (%d) is not public", col.GetName(), col.GetID())
}
colID := col.GetID()
dir := descpb.IndexDescriptor_ASC
// If this is a unique index, the suffix columns are not part of the full
// index columns and are always ascending.
if i < len(keyDirs) {
dir = keyDirs[i]
}
s.KeyAndSuffixColumns[i] = descpb.IndexFetchSpec_KeyColumn{
IndexFetchSpec_Column: mkCol(col, colID),
Direction: dir,
IsComposite: compositeIDs.Contains(colID),
IsInverted: colID == invertedColumnID,
}
}

if cap(oldFetchedCols) >= len(fetchColumnIDs) {
s.FetchedColumns = oldFetchedCols[:len(fetchColumnIDs)]
} else {
Expand All @@ -132,7 +77,16 @@ func InitIndexFetchSpec(
if err != nil {
return err
}
s.FetchedColumns[i] = mkCol(col, colID)
typ := col.GetType()
if colID == invertedColumnID {
typ = index.InvertedColumnKeyType()
}
s.FetchedColumns[i] = descpb.IndexFetchSpec_Column{
Name: col.GetName(),
ColumnID: colID,
Type: typ,
IsNonNullable: !col.IsNullable() && col.Public(),
}
}

// In test builds, verify that we aren't trying to fetch columns that are not
Expand Down
5 changes: 1 addition & 4 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,10 +484,7 @@ func newJoinReader(
)
jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{})
jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount()
jr.streamerInfo.maxKeysPerRow, err = jr.desc.KeysPerRow(jr.index.GetID())
if err != nil {
return nil, err
}
jr.streamerInfo.maxKeysPerRow = jr.desc.IndexKeysPerRow(jr.index)
} else {
// When not using the Streamer API, we want to limit the batch size hint
// to at most half of the workmem limit. Note that it is ok if it is set
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/span/span_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func MakeSplitter(
// * The index either has just 1 family (so we'll make a GetRequest) or we
// need fewer than every column family in the table (otherwise we'd just
// make a big ScanRequest).
// TODO(radu): should we be using index.KeysPerRow() instead?
// TODO(radu): should we be using IndexKeysPerRow() instead?
numFamilies := len(table.GetFamilies())
if numFamilies > 1 && len(neededFamilies) == numFamilies {
return NoopSplitter()
Expand Down
Loading

0 comments on commit ae6d033

Please sign in to comment.