Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
103476: backfill: Fixed a bug where we ignore indexes to backfill r=Xiang-Gu a=Xiang-Gu

This PR contains three misc bug fixes/refactorings:
- backfill: index backfill ignores the IndexToBackfill field but rather backfills all new indexes on the table
- node formatting storage parameter: storage parameter was previously omitted in the formatting of certain node.
- element result set filter: refactor to make it cleaner

They were discovered while working on cockroachdb#99526 but they are tangent enough to be of its own separate PR.
Epic: None 

103863: kv: remove stale TODO about commit triggers r=miraradeva a=nvanbenschoten

Closes cockroachdb#18170.

As of c00ea84, `Txn.PrepareForRetry` is called for all transaction retries, regardless of whether they're automatically performed by `Txn.exec` or manually performed by the client. As a result, the TODO describing the need to reset commit triggers separately for user-directed retry attempts is no longer relevant.

Release note: None

Co-authored-by: Xiang Gu <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
3 people committed May 25, 2023
3 parents 0ab47ef + fef97d9 + fea0a90 commit 2060b07
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 25 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,8 @@ func (txn *Txn) exec(ctx context.Context, fn func(context.Context, *Txn) error)
// PrepareForRetry needs to be called before a retry to perform some
// book-keeping and clear errors when possible.
func (txn *Txn) PrepareForRetry(ctx context.Context) {
// TODO(andrei): I think commit triggers are reset in the wrong place. See #18170.
// Reset commit triggers. These must be reconfigured by the client during the
// next retry.
txn.commitTriggers = nil

txn.mu.Lock()
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/backfill/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/sql/row",
"//pkg/sql/rowenc",
"//pkg/sql/rowinfra",
"//pkg/sql/sem/catid",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/transform",
"//pkg/sql/sem/tree",
Expand All @@ -58,6 +59,8 @@ go_test(
"//pkg/sql/catalog",
"//pkg/sql/catalog/catenumpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/sem/catid",
"@com_github_stretchr_testify//require",
],
)
Expand Down
22 changes: 15 additions & 7 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/transform"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -505,7 +506,7 @@ func (ib *IndexBackfiller) InitForLocalUse(
) error {

// Initialize ib.added.
ib.initIndexes(desc)
ib.initIndexes(desc, nil /* allowList */)

// Initialize ib.cols and ib.colIdxMap.
if err := ib.initCols(desc); err != nil {
Expand Down Expand Up @@ -640,11 +641,12 @@ func (ib *IndexBackfiller) InitForDistributedUse(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
desc catalog.TableDescriptor,
allowList []catid.IndexID,
mon *mon.BytesMonitor,
) error {

// Initialize ib.added.
ib.initIndexes(desc)
ib.initIndexes(desc, allowList)

// Initialize ib.indexBackfillerCols.
if err := ib.initCols(desc); err != nil {
Expand Down Expand Up @@ -728,19 +730,25 @@ func (ib *IndexBackfiller) initCols(desc catalog.TableDescriptor) (err error) {
}

// initIndexes is a helper to populate index metadata of an IndexBackfiller. It
// populates the added field. It returns a set of column ordinals that must be
// fetched in order to backfill the added indexes.
func (ib *IndexBackfiller) initIndexes(desc catalog.TableDescriptor) {
// populates the added field to be all adding index mutations.
// If `allowList` is non-nil, we only add those in this list.
// If `allowList` is nil, we add all adding index mutations.
func (ib *IndexBackfiller) initIndexes(desc catalog.TableDescriptor, allowList []catid.IndexID) {
var allowListAsSet catid.IndexSet
if len(allowList) > 0 {
allowListAsSet = catid.MakeIndexIDSet(allowList...)
}

mutations := desc.AllMutations()
mutationID := mutations[0].MutationID()

// Mutations in the same transaction have the same ID. Loop through the
// mutations and collect all index mutations.
for _, m := range mutations {
if m.MutationID() != mutationID {
break
}
if IndexMutationFilter(m) {
if IndexMutationFilter(m) &&
(allowListAsSet.Empty() || allowListAsSet.Contains(m.AsIndex().GetID())) {
idx := m.AsIndex()
ib.added = append(ib.added, idx)
}
Expand Down
63 changes: 63 additions & 0 deletions pkg/sql/backfill/index_backfiller_cols_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catenumpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -367,6 +369,67 @@ func TestIndexBackfillerColumns(t *testing.T) {
}
}

// TestInitIndexesAllowList tests that initIndexes works correctly with
// "allowList" to populate the "added" field of the index backfiller.
func TestInitIndexesAllowList(t *testing.T) {
desc := &tabledesc.Mutable{}
desc.TableDescriptor = descpb.TableDescriptor{
Mutations: []descpb.DescriptorMutation{
{
// candidate 1
Descriptor_: &descpb.DescriptorMutation_Index{
Index: &descpb.IndexDescriptor{ID: 2},
},
Direction: descpb.DescriptorMutation_ADD,
},
{
// candidate 2
Descriptor_: &descpb.DescriptorMutation_Index{
Index: &descpb.IndexDescriptor{ID: 3},
},
Direction: descpb.DescriptorMutation_ADD,
},
{
// non-candidate: index is being dropped
Descriptor_: &descpb.DescriptorMutation_Index{
Index: &descpb.IndexDescriptor{ID: 4},
},
Direction: descpb.DescriptorMutation_DROP,
},
{
// non-candidate: index is temporary index
Descriptor_: &descpb.DescriptorMutation_Index{
Index: &descpb.IndexDescriptor{ID: 4, UseDeletePreservingEncoding: true},
},
Direction: descpb.DescriptorMutation_ADD,
},
{
// non-candidate: not an index
Descriptor_: &descpb.DescriptorMutation_Column{
Column: &descpb.ColumnDescriptor{ID: 2},
},
Direction: descpb.DescriptorMutation_ADD,
},
},
}

t.Run("nil allowList", func(t *testing.T) {
// A nil allowList means no filtering.
ib := &IndexBackfiller{}
ib.initIndexes(desc, nil /* allowList */)
require.Equal(t, 2, len(ib.added))
require.Equal(t, catid.IndexID(2), ib.added[0].GetID())
require.Equal(t, catid.IndexID(3), ib.added[1].GetID())
})

t.Run("non-nil allowList", func(t *testing.T) {
ib := &IndexBackfiller{}
ib.initIndexes(desc, []catid.IndexID{3} /* allowList */)
require.Equal(t, 1, len(ib.added))
require.Equal(t, catid.IndexID(3), ib.added[0].GetID())
})
}

type fakeColumn struct {
catalog.Column
id descpb.ColumnID
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func newIndexBackfiller(
}

if err := ib.IndexBackfiller.InitForDistributedUse(ctx, flowCtx, ib.desc,
indexBackfillerMon); err != nil {
ib.spec.IndexesToBackfill, indexBackfillerMon); err != nil {
return nil, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func getNextStoredIndexColumnOrdinal(allTargets ElementResultSet, idx *scpb.Prim
func getImplicitSecondaryIndexName(
b BuildCtx, descID descpb.ID, indexID descpb.IndexID, numImplicitColumns int,
) string {
elts := b.QueryByID(descID).Filter(notAbsentTargetFilter)
elts := b.QueryByID(descID).Filter(notFilter(absentTargetFilter))
var idx *scpb.Index
scpb.ForEachSecondaryIndex(elts, func(current scpb.Status, target scpb.TargetStatus, e *scpb.SecondaryIndex) {
if e.IndexID == indexID {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func fallBackIfConcurrentSchemaChange(b BuildCtx, t alterPrimaryKeySpec, tableID
// fallBackIfPartitionedIndexExists panics with an unimplemented error
// if there exists partitioned indexes on the table.
func fallBackIfPartitionedIndexExists(b BuildCtx, t alterPrimaryKeySpec, tableID catid.DescID) {
tableElts := b.QueryByID(tableID).Filter(notAbsentTargetFilter)
tableElts := b.QueryByID(tableID).Filter(notFilter(absentTargetFilter))
scpb.ForEachIndexPartitioning(tableElts, func(_ scpb.Status, _ scpb.TargetStatus, _ *scpb.IndexPartitioning) {
panic(scerrors.NotImplementedErrorf(t.n,
"ALTER PRIMARY KEY on a table with index partitioning is not yet supported"))
Expand All @@ -373,7 +373,7 @@ func fallBackIfPartitionedIndexExists(b BuildCtx, t alterPrimaryKeySpec, tableID
// fallBackIfShardedIndexExists panics with an unimplemented
// error if there exists sharded indexes on the table.
func fallBackIfShardedIndexExists(b BuildCtx, t alterPrimaryKeySpec, tableID catid.DescID) {
tableElts := b.QueryByID(tableID).Filter(notAbsentTargetFilter)
tableElts := b.QueryByID(tableID).Filter(notFilter(absentTargetFilter))
var hasSecondary bool
scpb.ForEachSecondaryIndex(tableElts, func(_ scpb.Status, _ scpb.TargetStatus, idx *scpb.SecondaryIndex) {
hasSecondary = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func isIndexUniqueAndCanServeFK(
func hasColsUniquenessConstraintOtherThan(
b BuildCtx, tableID descpb.ID, columnIDs []descpb.ColumnID, otherThan descpb.ConstraintID,
) (ret bool) {
b.QueryByID(tableID).Filter(publicTargetFilter).Filter(statusPublicFilter).
b.QueryByID(tableID).Filter(publicTargetFilter).Filter(publicStatusFilter).
ForEachElementStatus(func(
current scpb.Status, target scpb.TargetStatus, e scpb.Element,
) {
Expand Down
41 changes: 29 additions & 12 deletions pkg/sql/schemachanger/scbuild/internal/scbuildstmt/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,25 @@ func isColumnFilter(_ scpb.Status, _ scpb.TargetStatus, e scpb.Element) bool {
return isColumn
}

func orFilter(
fs ...func(_ scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool,
) func(_ scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return func(status scpb.Status, target scpb.TargetStatus, e scpb.Element) (ret bool) {
for _, f := range fs {
ret = ret || f(status, target, e)
}
return ret
}
}

func notFilter(
f func(_ scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool,
) func(_ scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return func(status scpb.Status, target scpb.TargetStatus, e scpb.Element) bool {
return !f(status, target, e)
}
}

func publicTargetFilter(_ scpb.Status, target scpb.TargetStatus, _ scpb.Element) bool {
return target == scpb.ToPublic
}
Expand All @@ -400,18 +419,16 @@ func absentTargetFilter(_ scpb.Status, target scpb.TargetStatus, _ scpb.Element)
return target == scpb.ToAbsent
}

func notAbsentTargetFilter(_ scpb.Status, target scpb.TargetStatus, _ scpb.Element) bool {
return target != scpb.ToAbsent
func publicStatusFilter(status scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return status == scpb.Status_PUBLIC
}

func statusAbsentOrBackfillOnlyFilter(
status scpb.Status, _ scpb.TargetStatus, _ scpb.Element,
) bool {
return status == scpb.Status_ABSENT || status == scpb.Status_BACKFILL_ONLY
func absentStatusFilter(status scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return status == scpb.Status_ABSENT
}

func statusPublicFilter(status scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return status == scpb.Status_PUBLIC
func backfillOnlyStatusFilter(status scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return status == scpb.Status_BACKFILL_ONLY
}

func hasIndexIDAttrFilter(
Expand Down Expand Up @@ -470,8 +487,8 @@ func getPrimaryIndexes(
allTargets := b.QueryByID(tableID)
_, _, freshlyAdded = scpb.FindPrimaryIndex(allTargets.
Filter(publicTargetFilter).
Filter(statusAbsentOrBackfillOnlyFilter))
_, _, existing = scpb.FindPrimaryIndex(allTargets.Filter(statusPublicFilter))
Filter(orFilter(absentStatusFilter, backfillOnlyStatusFilter)))
_, _, existing = scpb.FindPrimaryIndex(allTargets.Filter(publicStatusFilter))
if existing == nil {
// TODO(postamar): can this even be possible?
panic(pgerror.Newf(pgcode.NoPrimaryKey, "missing active primary key"))
Expand Down Expand Up @@ -578,7 +595,7 @@ func (s indexSpec) clone() (c indexSpec) {

// makeIndexSpec constructs an indexSpec based on an existing index element.
func makeIndexSpec(b BuildCtx, tableID catid.DescID, indexID catid.IndexID) (s indexSpec) {
tableElts := b.QueryByID(tableID).Filter(notAbsentTargetFilter)
tableElts := b.QueryByID(tableID).Filter(notFilter(absentTargetFilter))
idxElts := tableElts.Filter(hasIndexIDAttrFilter(indexID))
var constraintID catid.ConstraintID
var n int
Expand Down Expand Up @@ -718,7 +735,7 @@ func makeSwapIndexSpec(
var inID, tempID catid.IndexID
var inConstraintID catid.ConstraintID
{
_, _, tbl := scpb.FindTable(b.QueryByID(tableID).Filter(notAbsentTargetFilter))
_, _, tbl := scpb.FindTable(b.QueryByID(tableID).Filter(notFilter(absentTargetFilter)))
inID = b.NextTableIndexID(tbl)
inConstraintID = b.NextTableConstraintID(tbl.TableID)
tempID = inID + 1
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/sem/tree/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,11 @@ func (node *UniqueConstraintTableDef) Format(ctx *FmtCtx) {
ctx.WriteString(" VISIBILITY " + fmt.Sprintf("%.2f", 1-invisibility))
}
}
if node.StorageParams != nil {
ctx.WriteString(" WITH (")
ctx.FormatNode(&node.StorageParams)
ctx.WriteString(")")
}
}

// ForeignKeyConstraintTableDef represents a FOREIGN KEY constraint in the AST.
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/sem/tree/pretty.go
Original file line number Diff line number Diff line change
Expand Up @@ -1826,6 +1826,13 @@ func (node *UniqueConstraintTableDef) doc(p *PrettyCfg) pretty.Doc {
}
}

if node.StorageParams != nil {
clauses = append(clauses, p.bracketKeyword(
"WITH", "(",
p.Doc(&node.StorageParams),
")", ""))
}

if len(clauses) == 0 {
return title
}
Expand Down

0 comments on commit 2060b07

Please sign in to comment.