From 8374c0fc4bd8b57b4cb6ba6322efae095dc02c94 Mon Sep 17 00:00:00 2001 From: arulajmani Date: Mon, 12 Apr 2021 23:04:09 -0400 Subject: [PATCH] sql: fix bad interraction with DROP region rollback and alter to RBR When altering to a regional by row table we only add partitions for PUBLIC enum values (regions). The type schema change job that negotiates promotion/demotion is responsible for re-partitioning regional by row tables. Previously, this re-partitioning only happened if the type schema change job succeeded. This behavior missed an edge case, which looks like: - Client 1 drops region A from the database. The user txn commits. - Client 2 alters table t to a RBR. - The drop region fails, and therefore rolls back. - table t is now missing a partition for region A. This patch fixes this problem by trigerring a re-partition even when the type schema change job is rolling back. Informs #63462 Release note (bug fix): Previously if a user altered a table to REGIONAL BY ROW when a region was being dropped, and the drop failed and had to be rolled back, it could have resulted in the Regional By Row table missing a partition for this region. This is now fixed and region drop failure rollbacks are sane. --- pkg/ccl/backupccl/backup_test.go | 3 +- pkg/ccl/multiregionccl/region_test.go | 150 +++++++++++++++++++++++++- pkg/sql/type_change.go | 30 ++++-- pkg/sql/type_change_test.go | 3 +- 4 files changed, 175 insertions(+), 11 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 854406fe810e..c127aa803c6a 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -2739,7 +2739,7 @@ func TestBackupRestoreDuringUserDefinedTypeChange(t *testing.T) { ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() { + RunBeforeEnumMemberPromotion: func() error { mu.Lock() if numTypeChangesStarted < len(tc.queries) { numTypeChangesStarted++ @@ -2751,6 +2751,7 @@ func TestBackupRestoreDuringUserDefinedTypeChange(t *testing.T) { } else { mu.Unlock() } + return nil }, }, }, diff --git a/pkg/ccl/multiregionccl/region_test.go b/pkg/ccl/multiregionccl/region_test.go index 65fa83c78413..53206208facb 100644 --- a/pkg/ccl/multiregionccl/region_test.go +++ b/pkg/ccl/multiregionccl/region_test.go @@ -117,7 +117,7 @@ func TestConcurrentAddDropRegions(t *testing.T) { knobs := base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() { + RunBeforeEnumMemberPromotion: func() error { mu.Lock() if firstOp { firstOp = false @@ -129,6 +129,7 @@ func TestConcurrentAddDropRegions(t *testing.T) { } else { mu.Unlock() } + return nil }, }, } @@ -201,6 +202,150 @@ CREATE TABLE db.rbr () LOCALITY REGIONAL BY ROW`) } } +// TestRegionAddDropEnclosingRegionalByRowAlters tests adding/dropping regions +// (expected to fail) with a concurrent alter to a regional by row table. The +// sketch of the test is as follows: +// - Client 1 performs an ALTER ADD / DROP REGION. Let the user txn commit. +// - Block in the type schema changer. +// - Client 2 alters a REGIONAL / GLOBAL table to a REGIONAL BY ROW table. Let +// this operation finish. +// - Force a rollback on the REGION ADD / DROP by injecting an error. +// - Ensure the partitions on the REGIONAL BY ROW table are sane. +func TestRegionAddDropFailureEnclosingRegionalByRowAlters(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Decrease the adopt loop interval so that retries happen quickly. + defer sqltestutils.SetTestJobsAdoptInterval()() + + regionAlterCmds := []struct { + name string + cmd string + }{ + { + name: "drop-region", + cmd: `ALTER DATABASE db DROP REGION "us-east3"`, + }, + { + name: "add-region", + cmd: `ALTER DATABASE db ADD REGION "us-east4"`, + }, + } + + testCases := []struct { + name string + setup string + }{ + { + name: "alter-from-global", + setup: `CREATE TABLE db.t () LOCALITY GLOBAL`, + }, + { + name: "alter-from-explicit-regional", + setup: `CREATE TABLE db.t () LOCALITY REGIONAL IN "us-east2"`, + }, + { + name: "alter-from-regional", + setup: `CREATE TABLE db.t () LOCALITY REGIONAL IN PRIMARY REGION`, + }, + { + name: "alter-from-rbr", + setup: `CREATE TABLE db.t (reg db.crdb_internal_region) LOCALITY REGIONAL BY ROW AS reg`, + }, + } + + var mu syncutil.Mutex + typeChangeStarted := make(chan struct{}, 1) + typeChangeFinished := make(chan struct{}, 1) + rbrOpFinished := make(chan struct{}, 1) + + knobs := base.TestingKnobs{ + SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ + RunBeforeEnumMemberPromotion: func() error { + mu.Lock() + defer mu.Unlock() + typeChangeStarted <- struct{}{} + <-rbrOpFinished + // Trigger a roll-back. + return errors.New("boom") + }, + }, + } + + _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( + t, 4 /* numServers */, knobs, nil, /* baseDir */ + ) + defer cleanup() + + for _, tc := range testCases { + for _, regionAlterCmd := range regionAlterCmds { + t.Run(regionAlterCmd.name+tc.name, func(t *testing.T) { + + _, err := sqlDB.Exec(` +DROP DATABASE IF EXISTS db; +CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"; +`) + require.NoError(t, err) + + _, err = sqlDB.Exec(tc.setup) + require.NoError(t, err) + + go func() { + _, err := sqlDB.Exec(regionAlterCmd.cmd) + if !testutils.IsError(err, "boom") { + t.Errorf("expected error boom, found %v", err) + } + typeChangeFinished <- struct{}{} + }() + + <-typeChangeStarted + + _, err = sqlDB.Exec(`BEGIN; SET TRANSACTION PRIORITY HIGH; ALTER TABLE db.t SET LOCALITY REGIONAL BY ROW; COMMIT;`) + rbrOpFinished <- struct{}{} + require.NoError(t, err) + + testutils.SucceedsSoon(t, func() error { + rows, err := sqlDB.Query("SELECT partition_name FROM [SHOW PARTITIONS FROM TABLE db.t] ORDER BY partition_name") + if err != nil { + return err + } + defer rows.Close() + + var partitionNames []string + for rows.Next() { + var partitionName string + if err := rows.Scan(&partitionName); err != nil { + return err + } + + partitionNames = append(partitionNames, partitionName) + } + + expectedPartitions := []string{"us-east1", "us-east2", "us-east3"} + if len(partitionNames) != len(expectedPartitions) { + return errors.AssertionFailedf( + "unexpected number of partitions; expected %d, found %d", + len(expectedPartitions), + len(partitionNames), + ) + } + for i := range expectedPartitions { + if expectedPartitions[i] != partitionNames[i] { + return errors.AssertionFailedf( + "unexpected partitions; expected %v, found %v", + expectedPartitions, + partitionNames, + ) + } + } + return nil + }) + <-typeChangeFinished + }) + } + } +} + func TestSettingPrimaryRegionAmidstDrop(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -211,7 +356,7 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) { dropRegionFinished := make(chan struct{}) knobs := base.TestingKnobs{ SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() { + RunBeforeEnumMemberPromotion: func() error { mu.Lock() defer mu.Unlock() if dropRegionStarted != nil { @@ -219,6 +364,7 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) { <-waitForPrimaryRegionSwitch dropRegionStarted = nil } + return nil }, }, } diff --git a/pkg/sql/type_change.go b/pkg/sql/type_change.go index 76899fd01c38..f33589a1806d 100644 --- a/pkg/sql/type_change.go +++ b/pkg/sql/type_change.go @@ -189,7 +189,7 @@ type TypeSchemaChangerTestingKnobs struct { RunBeforeExec func() error // RunBeforeEnumMemberPromotion runs before enum members are promoted from // readable to all permissions in the typeSchemaChanger. - RunBeforeEnumMemberPromotion func() + RunBeforeEnumMemberPromotion func() error // RunAfterOnFailOrCancel runs after OnFailOrCancel completes, if // OnFailOrCancel is triggered. RunAfterOnFailOrCancel func() error @@ -281,7 +281,9 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error { typeDesc.GetKind() == descpb.TypeDescriptor_MULTIREGION_ENUM) && len(t.transitioningMembers) != 0 { if fn := t.execCfg.TypeSchemaChangerTestingKnobs.RunBeforeEnumMemberPromotion; fn != nil { - fn() + if err := fn(); err != nil { + return err + } } // First, we check if any of the enum values that are being removed are in @@ -445,13 +447,13 @@ func applyFilterOnEnumMembers( // 2. If an enum value was being removed as part of this txn, we promote // it back to writable. func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error { + var regionChangeFinalizer *databaseRegionChangeFinalizer // Cleanup: cleanup := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { typeDesc, err := descsCol.GetMutableTypeVersionByID(ctx, txn, t.typeID) if err != nil { return err } - b := txn.NewBatch() // No cleanup required. if !enumHasNonPublic(typeDesc) { return nil @@ -472,12 +474,19 @@ func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error { return t.isTransitioningInCurrentJob(member) && enumMemberIsAdding(member) }) - if err := descsCol.WriteDescToBatch( - ctx, true /* kvTrace */, typeDesc, b, - ); err != nil { + if err := descsCol.WriteDesc(ctx, true /* kvTrace */, typeDesc, txn); err != nil { return err } - return txn.Run(ctx, b) + + if typeDesc.Kind == descpb.TypeDescriptor_MULTIREGION_ENUM { + regionChangeFinalizer = newDatabaseRegionChangeFinalizer(typeDesc.GetParentID(), typeDesc.GetID()) + + if err := regionChangeFinalizer.finalize(ctx, txn, descsCol, t.execCfg); err != nil { + return err + } + } + + return nil } if err := descs.Txn(ctx, t.execCfg.Settings, t.execCfg.LeaseManager, t.execCfg.InternalExecutor, t.execCfg.DB, cleanup); err != nil { @@ -491,6 +500,13 @@ func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error { } return err } + + if regionChangeFinalizer != nil { + if err := regionChangeFinalizer.waitToUpdateLeases(ctx, t.execCfg.LeaseManager); err != nil { + return err + } + } + return nil } diff --git a/pkg/sql/type_change_test.go b/pkg/sql/type_change_test.go index 37a032aaf224..eacfc529baa9 100644 --- a/pkg/sql/type_change_test.go +++ b/pkg/sql/type_change_test.go @@ -528,9 +528,10 @@ func TestTypeChangeJobCancelSemantics(t *testing.T) { finishedSchemaChange.Add(1) params.Knobs.SQLTypeSchemaChanger = &sql.TypeSchemaChangerTestingKnobs{ - RunBeforeEnumMemberPromotion: func() { + RunBeforeEnumMemberPromotion: func() error { typeSchemaChangeStarted.Done() blockTypeSchemaChange.Wait() + return nil }, }