Skip to content

Commit

Permalink
schemachanger: Refactor element result set filters
Browse files Browse the repository at this point in the history
This commit cleaned our element results set filters to be more
principled.
  • Loading branch information
Xiang-Gu committed May 25, 2023
1 parent a2545b4 commit fef97d9
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ func getNextStoredIndexColumnOrdinal(allTargets ElementResultSet, idx *scpb.Prim
func getImplicitSecondaryIndexName(
b BuildCtx, descID descpb.ID, indexID descpb.IndexID, numImplicitColumns int,
) string {
elts := b.QueryByID(descID).Filter(notAbsentTargetFilter)
elts := b.QueryByID(descID).Filter(notFilter(absentTargetFilter))
var idx *scpb.Index
scpb.ForEachSecondaryIndex(elts, func(current scpb.Status, target scpb.TargetStatus, e *scpb.SecondaryIndex) {
if e.IndexID == indexID {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func fallBackIfConcurrentSchemaChange(b BuildCtx, t alterPrimaryKeySpec, tableID
// fallBackIfPartitionedIndexExists panics with an unimplemented error
// if there exists partitioned indexes on the table.
func fallBackIfPartitionedIndexExists(b BuildCtx, t alterPrimaryKeySpec, tableID catid.DescID) {
tableElts := b.QueryByID(tableID).Filter(notAbsentTargetFilter)
tableElts := b.QueryByID(tableID).Filter(notFilter(absentTargetFilter))
scpb.ForEachIndexPartitioning(tableElts, func(_ scpb.Status, _ scpb.TargetStatus, _ *scpb.IndexPartitioning) {
panic(scerrors.NotImplementedErrorf(t.n,
"ALTER PRIMARY KEY on a table with index partitioning is not yet supported"))
Expand All @@ -373,7 +373,7 @@ func fallBackIfPartitionedIndexExists(b BuildCtx, t alterPrimaryKeySpec, tableID
// fallBackIfShardedIndexExists panics with an unimplemented
// error if there exists sharded indexes on the table.
func fallBackIfShardedIndexExists(b BuildCtx, t alterPrimaryKeySpec, tableID catid.DescID) {
tableElts := b.QueryByID(tableID).Filter(notAbsentTargetFilter)
tableElts := b.QueryByID(tableID).Filter(notFilter(absentTargetFilter))
var hasSecondary bool
scpb.ForEachSecondaryIndex(tableElts, func(_ scpb.Status, _ scpb.TargetStatus, idx *scpb.SecondaryIndex) {
hasSecondary = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func isIndexUniqueAndCanServeFK(
func hasColsUniquenessConstraintOtherThan(
b BuildCtx, tableID descpb.ID, columnIDs []descpb.ColumnID, otherThan descpb.ConstraintID,
) (ret bool) {
b.QueryByID(tableID).Filter(publicTargetFilter).Filter(statusPublicFilter).
b.QueryByID(tableID).Filter(publicTargetFilter).Filter(publicStatusFilter).
ForEachElementStatus(func(
current scpb.Status, target scpb.TargetStatus, e scpb.Element,
) {
Expand Down
41 changes: 29 additions & 12 deletions pkg/sql/schemachanger/scbuild/internal/scbuildstmt/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,25 @@ func isColumnFilter(_ scpb.Status, _ scpb.TargetStatus, e scpb.Element) bool {
return isColumn
}

func orFilter(
fs ...func(_ scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool,
) func(_ scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return func(status scpb.Status, target scpb.TargetStatus, e scpb.Element) (ret bool) {
for _, f := range fs {
ret = ret || f(status, target, e)
}
return ret
}
}

func notFilter(
f func(_ scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool,
) func(_ scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return func(status scpb.Status, target scpb.TargetStatus, e scpb.Element) bool {
return !f(status, target, e)
}
}

func publicTargetFilter(_ scpb.Status, target scpb.TargetStatus, _ scpb.Element) bool {
return target == scpb.ToPublic
}
Expand All @@ -400,18 +419,16 @@ func absentTargetFilter(_ scpb.Status, target scpb.TargetStatus, _ scpb.Element)
return target == scpb.ToAbsent
}

func notAbsentTargetFilter(_ scpb.Status, target scpb.TargetStatus, _ scpb.Element) bool {
return target != scpb.ToAbsent
func publicStatusFilter(status scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return status == scpb.Status_PUBLIC
}

func statusAbsentOrBackfillOnlyFilter(
status scpb.Status, _ scpb.TargetStatus, _ scpb.Element,
) bool {
return status == scpb.Status_ABSENT || status == scpb.Status_BACKFILL_ONLY
func absentStatusFilter(status scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return status == scpb.Status_ABSENT
}

func statusPublicFilter(status scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return status == scpb.Status_PUBLIC
func backfillOnlyStatusFilter(status scpb.Status, _ scpb.TargetStatus, _ scpb.Element) bool {
return status == scpb.Status_BACKFILL_ONLY
}

func hasIndexIDAttrFilter(
Expand Down Expand Up @@ -470,8 +487,8 @@ func getPrimaryIndexes(
allTargets := b.QueryByID(tableID)
_, _, freshlyAdded = scpb.FindPrimaryIndex(allTargets.
Filter(publicTargetFilter).
Filter(statusAbsentOrBackfillOnlyFilter))
_, _, existing = scpb.FindPrimaryIndex(allTargets.Filter(statusPublicFilter))
Filter(orFilter(absentStatusFilter, backfillOnlyStatusFilter)))
_, _, existing = scpb.FindPrimaryIndex(allTargets.Filter(publicStatusFilter))
if existing == nil {
// TODO(postamar): can this even be possible?
panic(pgerror.Newf(pgcode.NoPrimaryKey, "missing active primary key"))
Expand Down Expand Up @@ -578,7 +595,7 @@ func (s indexSpec) clone() (c indexSpec) {

// makeIndexSpec constructs an indexSpec based on an existing index element.
func makeIndexSpec(b BuildCtx, tableID catid.DescID, indexID catid.IndexID) (s indexSpec) {
tableElts := b.QueryByID(tableID).Filter(notAbsentTargetFilter)
tableElts := b.QueryByID(tableID).Filter(notFilter(absentTargetFilter))
idxElts := tableElts.Filter(hasIndexIDAttrFilter(indexID))
var constraintID catid.ConstraintID
var n int
Expand Down Expand Up @@ -718,7 +735,7 @@ func makeSwapIndexSpec(
var inID, tempID catid.IndexID
var inConstraintID catid.ConstraintID
{
_, _, tbl := scpb.FindTable(b.QueryByID(tableID).Filter(notAbsentTargetFilter))
_, _, tbl := scpb.FindTable(b.QueryByID(tableID).Filter(notFilter(absentTargetFilter)))
inID = b.NextTableIndexID(tbl)
inConstraintID = b.NextTableConstraintID(tbl.TableID)
tempID = inID + 1
Expand Down

0 comments on commit fef97d9

Please sign in to comment.