From 3b30f98281a708f9ace6f3a89d5bfb2ff2e7145e Mon Sep 17 00:00:00 2001 From: arulajmani Date: Mon, 12 Apr 2021 21:00:41 -0400 Subject: [PATCH] sql: pull out multi-region finalization into its own struct This thing was getting slightly unwieldy, so it makes sense to encapsulate all the finalization logic in one place and a separate file. Inspired by otan's draft PR #63459. Release note: None --- pkg/sql/multiregion_finalizer.go | 246 +++++++++++++++++++++++++++++++ pkg/sql/type_change.go | 206 +------------------------- 2 files changed, 251 insertions(+), 201 deletions(-) create mode 100644 pkg/sql/multiregion_finalizer.go diff --git a/pkg/sql/multiregion_finalizer.go b/pkg/sql/multiregion_finalizer.go new file mode 100644 index 000000000000..5d0115bc3e85 --- /dev/null +++ b/pkg/sql/multiregion_finalizer.go @@ -0,0 +1,246 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/security" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" +) + +type multiRegionFinalizer struct { + typeID descpb.ID + regionalByRowTables []descpb.ID +} + +// NewMultiRegionFinalizer returns a multiRegionFinalizer. +func NewMultiRegionFinalizer(typeID descpb.ID) *multiRegionFinalizer { + return &multiRegionFinalizer{ + typeID: typeID, + } +} + +// finalize updates the zone configurations of the database and all enclosed +// REGIONAL BY ROW tables once the region promotion/demotion is complete. The +// caller must call waitToUpdateLeases once the provided transaction commits. +func (r *multiRegionFinalizer) finalize( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, execCfg *ExecutorConfig, +) error { + if err := r.updateDatabaseZoneConfig(ctx, txn, descsCol, execCfg); err != nil { + return err + } + return r.repartitionRegionalByRowTables(ctx, txn, descsCol, execCfg) +} + +// updateDatabaseZoneConfig updates the zone config of the database that +// encloses the multi-region enum such that there is an entry for all PUBLIC +// region values. +func (r *multiRegionFinalizer) updateDatabaseZoneConfig( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, execCfg *ExecutorConfig, +) error { + typeDesc, err := descsCol.GetImmutableTypeByID(ctx, txn, r.typeID, tree.ObjectLookupFlags{}) + if err != nil { + return err + } + + regionConfig, err := SynthesizeRegionConfig(ctx, txn, typeDesc.GetParentID(), descsCol) + if err != nil { + return err + } + return ApplyZoneConfigFromDatabaseRegionConfig( + ctx, + typeDesc.GetParentID(), + regionConfig, + txn, + execCfg, + ) +} + +// repartitionRegionalByRowTables re-partitions all REGIONAL BY ROW tables +// enclosed inside the database of its multi-region enum. There is a partition +// and corresponding zone configuration for all PUBLIC enum members (regions) +// after repartitioning. +func (r *multiRegionFinalizer) repartitionRegionalByRowTables( + ctx context.Context, txn *kv.Txn, descsCol *descs.Collection, execCfg *ExecutorConfig, +) error { + var repartitionedTableIDs []descpb.ID + + typeDesc, err := descsCol.GetImmutableTypeByID(ctx, txn, r.typeID, tree.ObjectLookupFlags{}) + if err != nil { + return err + } + + if typeDesc.GetKind() != descpb.TypeDescriptor_MULTIREGION_ENUM { + return errors.AssertionFailedf( + "expected multi-region enum, but found type descriptor of kind: %v", typeDesc.GetKind(), + ) + } + + p, cleanup := NewInternalPlanner( + "repartition-regional-by-row-tables", + txn, + security.RootUserName(), + &MemoryMetrics{}, + execCfg, + sessiondatapb.SessionData{}, + WithDescCollection(descsCol), + ) + defer cleanup() + localPlanner := p.(*planner) + + _, dbDesc, err := descsCol.GetImmutableDatabaseByID( + ctx, txn, typeDesc.GetParentID(), tree.DatabaseLookupFlags{ + Required: true, + }) + if err != nil { + return err + } + + b := txn.NewBatch() + regionConfig, err := SynthesizeRegionConfig(ctx, txn, typeDesc.GetParentID(), descsCol) + if err != nil { + return err + } + + err = localPlanner.forEachMutableTableInDatabase(ctx, dbDesc, + func(ctx context.Context, tableDesc *tabledesc.Mutable) error { + if !tableDesc.IsLocalityRegionalByRow() || tableDesc.Dropped() { + // We only need to re-partition REGIONAL BY ROW tables. Even then, we + // don't need to (can't) repartition a REGIONAL BY ROW table if it has + // been dropped. + return nil + } + + colName, err := tableDesc.GetRegionalByRowTableRegionColumnName() + if err != nil { + return err + } + partitionAllBy := partitionByForRegionalByRow(regionConfig, colName) + + // oldPartitioningDescs saves the old partitioning descriptors for each + // index that is repartitioned. This is later used to remove zone + // configurations from any partitions that are removed. + oldPartitioningDescs := make(map[descpb.IndexID]descpb.PartitioningDescriptor) + + // Update the partitioning on all indexes of the table that aren't being + // dropped. + for _, index := range tableDesc.NonDropIndexes() { + newIdx, err := CreatePartitioning( + ctx, + localPlanner.extendedEvalCtx.Settings, + localPlanner.EvalContext(), + tableDesc, + *index.IndexDesc(), + partitionAllBy, + nil, /* allowedNewColumnName*/ + true, /* allowImplicitPartitioning */ + ) + if err != nil { + return err + } + + oldPartitioningDescs[index.GetID()] = index.IndexDesc().Partitioning + + // Update the index descriptor proto's partitioning. + index.IndexDesc().Partitioning = newIdx.Partitioning + } + + // Remove zone configurations that applied to partitions that were removed + // in the previous step. This requires all indexes to have been + // repartitioned such that there is no partitioning on the removed enum + // value. This is because `deleteRemovedPartitionZoneConfigs` generates + // subzone spans for the entire table (all indexes) downstream for each + // index. Spans can only be generated if partitioning values are present on + // the type descriptor (removed enum values obviously aren't), so we must + // remove the partition from all indexes before trying to delete zone + // configurations. + for _, index := range tableDesc.NonDropIndexes() { + oldPartitioning := oldPartitioningDescs[index.GetID()] + + // Remove zone configurations that reference partition values we removed + // in the previous step. + if err = deleteRemovedPartitionZoneConfigs( + ctx, + txn, + tableDesc, + index.IndexDesc(), + &oldPartitioning, + &index.IndexDesc().Partitioning, + execCfg, + ); err != nil { + return err + } + } + + // Update the zone configurations now that the partition's been added. + if err := ApplyZoneConfigForMultiRegionTable( + ctx, + txn, + localPlanner.ExecCfg(), + regionConfig, + tableDesc, + ApplyZoneConfigForMultiRegionTableOptionTableAndIndexes, + ); err != nil { + return err + } + + if err := localPlanner.Descriptors().WriteDescToBatch(ctx, false /* kvTrace */, tableDesc, b); err != nil { + return err + } + + repartitionedTableIDs = append(repartitionedTableIDs, tableDesc.GetID()) + return nil + }) + if err != nil { + return err + } + + if err := txn.Run(ctx, b); err != nil { + return err + } + + r.regionalByRowTables = repartitionedTableIDs + + return nil +} + +// waitToUpdateLeases ensures that the entire cluster has been updated to the +// latest descriptor version for all regional by row tables that were previously +// repartitioned. +func (r *multiRegionFinalizer) waitToUpdateLeases( + ctx context.Context, leaseMgr *lease.Manager, +) error { + for _, tbID := range r.regionalByRowTables { + if err := WaitToUpdateLeases(ctx, leaseMgr, tbID); err != nil { + if errors.Is(err, catalog.ErrDescriptorNotFound) { + // Swallow. + log.Infof(ctx, + "could not find table %d to be repartitioned when adding/removing regions on "+ + "enum %d, assuming it was dropped and moving on", + tbID, + r.typeID, + ) + } + return err + } + } + return nil +} diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index b1cbcddc000b..793971bce2a5 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -29,7 +29,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/multiregion" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/parser" @@ -37,7 +36,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/errors" @@ -310,10 +308,7 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error { return err } - // A list of multi-region tables that were repartitioned as a result of - // promotion/demotion of enum values. This is used to track tables whose - // leases need to be invalidated. - var repartitionedTables []descpb.ID + var multiRegionFinalizer *multiRegionFinalizer // Now that we've ascertained that the enum values can be removed, we can // actually go about modifying the type descriptor. @@ -365,23 +360,13 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error { // REGIONAL BY ROW tables must be updated to reflect the new region values // available. if typeDesc.Kind == descpb.TypeDescriptor_MULTIREGION_ENUM { - immut, err := descsCol.GetImmutableTypeByID(ctx, txn, t.typeID, tree.ObjectLookupFlags{}) - if err != nil { - return err - } if fn := t.execCfg.TypeSchemaChangerTestingKnobs.RunBeforeMultiRegionUpdates; fn != nil { if err := fn(); err != nil { return err } } - repartitionedTables, err = performMultiRegionFinalization( - ctx, - immut, - txn, - t.execCfg, - descsCol, - ) - if err != nil { + multiRegionFinalizer = NewMultiRegionFinalizer(t.typeID) + if err := multiRegionFinalizer.finalize(ctx, txn, descsCol, t.execCfg); err != nil { return err } } @@ -397,17 +382,8 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error { // If any tables were repartitioned, make sure their leases are updated as // well. - for _, tbID := range repartitionedTables { - if err := WaitToUpdateLeases(ctx, leaseMgr, tbID); err != nil { - if errors.Is(err, catalog.ErrDescriptorNotFound) { - // Swallow. - log.Infof(ctx, - "could not find table %d to be repartitioned when adding/removing regions on "+ - "enum %d, assuming it was dropped and moving on", - tbID, - t.typeID, - ) - } + if multiRegionFinalizer != nil { + if err := multiRegionFinalizer.waitToUpdateLeases(ctx, leaseMgr); err != nil { return err } } @@ -431,178 +407,6 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error { return nil } -// performMultiRegionFinalization updates the zone configurations on the -// database and re-partitions all REGIONAL BY ROW tables after REGION ADD/DROP -// has completed. A list of re-partitioned tables, if any, is returned. -func performMultiRegionFinalization( - ctx context.Context, - typeDesc catalog.TypeDescriptor, - txn *kv.Txn, - execCfg *ExecutorConfig, - descsCol *descs.Collection, -) ([]descpb.ID, error) { - regionConfig, err := SynthesizeRegionConfig(ctx, txn, typeDesc.GetParentID(), descsCol) - if err != nil { - return nil, err - } - // Once the region promotion/demotion is complete, we update the - // zone configuration on the database. - if err := ApplyZoneConfigFromDatabaseRegionConfig( - ctx, - typeDesc.GetParentID(), - regionConfig, - txn, - execCfg, - ); err != nil { - return nil, err - } - - return repartitionRegionalByRowTables(ctx, typeDesc, txn, execCfg, descsCol, regionConfig) -} - -// repartitionRegionalByRowTables takes a multi-region enum and re-partitions -// all REGIONAL BY ROW tables in the enclosing database such that there is a -// partition and corresponding zone configuration for all PUBLIC enum members -// (regions). -func repartitionRegionalByRowTables( - ctx context.Context, - typeDesc catalog.TypeDescriptor, - txn *kv.Txn, - execCfg *ExecutorConfig, - descsCol *descs.Collection, - regionConfig multiregion.RegionConfig, -) ([]descpb.ID, error) { - var repartitionedTableIDs []descpb.ID - if typeDesc.GetKind() != descpb.TypeDescriptor_MULTIREGION_ENUM { - return repartitionedTableIDs, errors.AssertionFailedf( - "expected multi-region enum, but found type descriptor of kind: %v", typeDesc.GetKind(), - ) - } - p, cleanup := NewInternalPlanner( - "repartition-regional-by-row-tables", - txn, - security.RootUserName(), - &MemoryMetrics{}, - execCfg, - sessiondatapb.SessionData{}, - WithDescCollection(descsCol), - ) - defer cleanup() - localPlanner := p.(*planner) - - _, dbDesc, err := descsCol.GetImmutableDatabaseByID( - ctx, txn, typeDesc.GetParentID(), tree.DatabaseLookupFlags{ - Required: true, - }) - if err != nil { - return nil, err - } - - b := txn.NewBatch() - err = localPlanner.forEachMutableTableInDatabase(ctx, dbDesc, - func(ctx context.Context, tableDesc *tabledesc.Mutable) error { - if !tableDesc.IsLocalityRegionalByRow() || tableDesc.Dropped() { - // We only need to re-partition REGIONAL BY ROW tables. Even then, we - // don't need to (can't) repartition a REGIONAL BY ROW table if it has - // been dropped. - return nil - } - - colName, err := tableDesc.GetRegionalByRowTableRegionColumnName() - if err != nil { - return err - } - partitionAllBy := partitionByForRegionalByRow(regionConfig, colName) - - // oldPartitioningDescs saves the old partitioning descriptors for each - // index that is repartitioned. This is later used to remove zone - // configurations from any partitions that are removed. - oldPartitioningDescs := make(map[descpb.IndexID]descpb.PartitioningDescriptor) - - // Update the partitioning on all indexes of the table that aren't being - // dropped. - for _, index := range tableDesc.NonDropIndexes() { - newIdx, err := CreatePartitioning( - ctx, - localPlanner.extendedEvalCtx.Settings, - localPlanner.EvalContext(), - tableDesc, - *index.IndexDesc(), - partitionAllBy, - nil, /* allowedNewColumnName*/ - true, /* allowImplicitPartitioning */ - ) - if err != nil { - return err - } - - oldPartitioningDescs[index.GetID()] = index.IndexDesc().Partitioning - - // Update the index descriptor proto's partitioning. - index.IndexDesc().Partitioning = newIdx.Partitioning - } - - // Remove zone configurations that applied to partitions that were removed - // in the previous step. This requires all indexes to have been - // repartitioned such that there is no partitioning on the removed enum - // value. This is because `deleteRemovedPartitionZoneConfigs` generates - // subzone spans for the entire table (all indexes) downstream for each - // index. Spans can only be generated if partitioning values are present on - // the type descriptor (removed enum values obviously aren't), so we must - // remove the partition from all indexes before trying to delete zone - // configurations. - for _, index := range tableDesc.NonDropIndexes() { - oldPartitioning := oldPartitioningDescs[index.GetID()] - - // Remove zone configurations that reference partition values we removed - // in the previous step. - if err = deleteRemovedPartitionZoneConfigs( - ctx, - txn, - tableDesc, - index.IndexDesc(), - &oldPartitioning, - &index.IndexDesc().Partitioning, - execCfg, - ); err != nil { - return err - } - } - - // Update the zone configurations now that the partition's been added. - regionConfig, err := SynthesizeRegionConfig(ctx, txn, typeDesc.GetParentID(), descsCol) - if err != nil { - return err - } - if err := ApplyZoneConfigForMultiRegionTable( - ctx, - txn, - localPlanner.ExecCfg(), - regionConfig, - tableDesc, - ApplyZoneConfigForMultiRegionTableOptionTableAndIndexes, - ); err != nil { - return err - } - - if err := localPlanner.Descriptors().WriteDescToBatch(ctx, false /* kvTrace */, tableDesc, b); err != nil { - return err - } - - repartitionedTableIDs = append(repartitionedTableIDs, tableDesc.GetID()) - return nil - }) - if err != nil { - return nil, err - } - - if err := txn.Run(ctx, b); err != nil { - return nil, err - } - - return repartitionedTableIDs, nil -} - // isTransitioningInCurrentJob returns true if the given member is either being // added or removed in the current job. func (t *typeSchemaChanger) isTransitioningInCurrentJob(