Skip to content

Commit

Permalink
sql: fix bad interraction with DROP region rollback and alter to RBR
Browse files Browse the repository at this point in the history
When altering to a regional by row table we only add partitions for
PUBLIC enum values (regions). The type schema change job that
negotiates promotion/demotion is responsible for re-partitioning
regional by row tables. Previously, this re-partitioning only happened
if the type schema change job succeeded.

This behavior missed an edge case, which looks like:
- Client 1 drops region A from the database. The user txn commits.
- Client 2 alters table t to a RBR.
- The drop region fails, and therefore rolls back.
- table t is now missing a partition for region A.

This patch fixes this problem by trigerring a re-partition even when
the type schema change job is rolling back.

Informs #63462

Release note (bug fix): Previously if a user altered a table to
REGIONAL BY ROW when a region was being dropped, and the drop failed
and had to be rolled back, it could have resulted in the Regional By
Row table missing a partition for this region. This is now fixed and
region drop failure rollbacks are sane.
  • Loading branch information
arulajmani committed Apr 15, 2021
1 parent 670c096 commit 8374c0f
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 11 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2739,7 +2739,7 @@ func TestBackupRestoreDuringUserDefinedTypeChange(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() {
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
if numTypeChangesStarted < len(tc.queries) {
numTypeChangesStarted++
Expand All @@ -2751,6 +2751,7 @@ func TestBackupRestoreDuringUserDefinedTypeChange(t *testing.T) {
} else {
mu.Unlock()
}
return nil
},
},
},
Expand Down
150 changes: 148 additions & 2 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestConcurrentAddDropRegions(t *testing.T) {

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() {
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
if firstOp {
firstOp = false
Expand All @@ -129,6 +129,7 @@ func TestConcurrentAddDropRegions(t *testing.T) {
} else {
mu.Unlock()
}
return nil
},
},
}
Expand Down Expand Up @@ -201,6 +202,150 @@ CREATE TABLE db.rbr () LOCALITY REGIONAL BY ROW`)
}
}

// TestRegionAddDropEnclosingRegionalByRowAlters tests adding/dropping regions
// (expected to fail) with a concurrent alter to a regional by row table. The
// sketch of the test is as follows:
// - Client 1 performs an ALTER ADD / DROP REGION. Let the user txn commit.
// - Block in the type schema changer.
// - Client 2 alters a REGIONAL / GLOBAL table to a REGIONAL BY ROW table. Let
// this operation finish.
// - Force a rollback on the REGION ADD / DROP by injecting an error.
// - Ensure the partitions on the REGIONAL BY ROW table are sane.
func TestRegionAddDropFailureEnclosingRegionalByRowAlters(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Decrease the adopt loop interval so that retries happen quickly.
defer sqltestutils.SetTestJobsAdoptInterval()()

regionAlterCmds := []struct {
name string
cmd string
}{
{
name: "drop-region",
cmd: `ALTER DATABASE db DROP REGION "us-east3"`,
},
{
name: "add-region",
cmd: `ALTER DATABASE db ADD REGION "us-east4"`,
},
}

testCases := []struct {
name string
setup string
}{
{
name: "alter-from-global",
setup: `CREATE TABLE db.t () LOCALITY GLOBAL`,
},
{
name: "alter-from-explicit-regional",
setup: `CREATE TABLE db.t () LOCALITY REGIONAL IN "us-east2"`,
},
{
name: "alter-from-regional",
setup: `CREATE TABLE db.t () LOCALITY REGIONAL IN PRIMARY REGION`,
},
{
name: "alter-from-rbr",
setup: `CREATE TABLE db.t (reg db.crdb_internal_region) LOCALITY REGIONAL BY ROW AS reg`,
},
}

var mu syncutil.Mutex
typeChangeStarted := make(chan struct{}, 1)
typeChangeFinished := make(chan struct{}, 1)
rbrOpFinished := make(chan struct{}, 1)

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
defer mu.Unlock()
typeChangeStarted <- struct{}{}
<-rbrOpFinished
// Trigger a roll-back.
return errors.New("boom")
},
},
}

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 4 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

for _, tc := range testCases {
for _, regionAlterCmd := range regionAlterCmds {
t.Run(regionAlterCmd.name+tc.name, func(t *testing.T) {

_, err := sqlDB.Exec(`
DROP DATABASE IF EXISTS db;
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3";
`)
require.NoError(t, err)

_, err = sqlDB.Exec(tc.setup)
require.NoError(t, err)

go func() {
_, err := sqlDB.Exec(regionAlterCmd.cmd)
if !testutils.IsError(err, "boom") {
t.Errorf("expected error boom, found %v", err)
}
typeChangeFinished <- struct{}{}
}()

<-typeChangeStarted

_, err = sqlDB.Exec(`BEGIN; SET TRANSACTION PRIORITY HIGH; ALTER TABLE db.t SET LOCALITY REGIONAL BY ROW; COMMIT;`)
rbrOpFinished <- struct{}{}
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
rows, err := sqlDB.Query("SELECT partition_name FROM [SHOW PARTITIONS FROM TABLE db.t] ORDER BY partition_name")
if err != nil {
return err
}
defer rows.Close()

var partitionNames []string
for rows.Next() {
var partitionName string
if err := rows.Scan(&partitionName); err != nil {
return err
}

partitionNames = append(partitionNames, partitionName)
}

expectedPartitions := []string{"us-east1", "us-east2", "us-east3"}
if len(partitionNames) != len(expectedPartitions) {
return errors.AssertionFailedf(
"unexpected number of partitions; expected %d, found %d",
len(expectedPartitions),
len(partitionNames),
)
}
for i := range expectedPartitions {
if expectedPartitions[i] != partitionNames[i] {
return errors.AssertionFailedf(
"unexpected partitions; expected %v, found %v",
expectedPartitions,
partitionNames,
)
}
}
return nil
})
<-typeChangeFinished
})
}
}
}

func TestSettingPrimaryRegionAmidstDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -211,14 +356,15 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) {
dropRegionFinished := make(chan struct{})
knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() {
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
defer mu.Unlock()
if dropRegionStarted != nil {
close(dropRegionStarted)
<-waitForPrimaryRegionSwitch
dropRegionStarted = nil
}
return nil
},
},
}
Expand Down
30 changes: 23 additions & 7 deletions pkg/sql/type_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ type TypeSchemaChangerTestingKnobs struct {
RunBeforeExec func() error
// RunBeforeEnumMemberPromotion runs before enum members are promoted from
// readable to all permissions in the typeSchemaChanger.
RunBeforeEnumMemberPromotion func()
RunBeforeEnumMemberPromotion func() error
// RunAfterOnFailOrCancel runs after OnFailOrCancel completes, if
// OnFailOrCancel is triggered.
RunAfterOnFailOrCancel func() error
Expand Down Expand Up @@ -281,7 +281,9 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error {
typeDesc.GetKind() == descpb.TypeDescriptor_MULTIREGION_ENUM) &&
len(t.transitioningMembers) != 0 {
if fn := t.execCfg.TypeSchemaChangerTestingKnobs.RunBeforeEnumMemberPromotion; fn != nil {
fn()
if err := fn(); err != nil {
return err
}
}

// First, we check if any of the enum values that are being removed are in
Expand Down Expand Up @@ -445,13 +447,13 @@ func applyFilterOnEnumMembers(
// 2. If an enum value was being removed as part of this txn, we promote
// it back to writable.
func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error {
var regionChangeFinalizer *databaseRegionChangeFinalizer
// Cleanup:
cleanup := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
typeDesc, err := descsCol.GetMutableTypeVersionByID(ctx, txn, t.typeID)
if err != nil {
return err
}
b := txn.NewBatch()
// No cleanup required.
if !enumHasNonPublic(typeDesc) {
return nil
Expand All @@ -472,12 +474,19 @@ func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error {
return t.isTransitioningInCurrentJob(member) && enumMemberIsAdding(member)
})

if err := descsCol.WriteDescToBatch(
ctx, true /* kvTrace */, typeDesc, b,
); err != nil {
if err := descsCol.WriteDesc(ctx, true /* kvTrace */, typeDesc, txn); err != nil {
return err
}
return txn.Run(ctx, b)

if typeDesc.Kind == descpb.TypeDescriptor_MULTIREGION_ENUM {
regionChangeFinalizer = newDatabaseRegionChangeFinalizer(typeDesc.GetParentID(), typeDesc.GetID())

if err := regionChangeFinalizer.finalize(ctx, txn, descsCol, t.execCfg); err != nil {
return err
}
}

return nil
}
if err := descs.Txn(ctx, t.execCfg.Settings, t.execCfg.LeaseManager, t.execCfg.InternalExecutor,
t.execCfg.DB, cleanup); err != nil {
Expand All @@ -491,6 +500,13 @@ func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error {
}
return err
}

if regionChangeFinalizer != nil {
if err := regionChangeFinalizer.waitToUpdateLeases(ctx, t.execCfg.LeaseManager); err != nil {
return err
}
}

return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/type_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,10 @@ func TestTypeChangeJobCancelSemantics(t *testing.T) {
finishedSchemaChange.Add(1)

params.Knobs.SQLTypeSchemaChanger = &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() {
RunBeforeEnumMemberPromotion: func() error {
typeSchemaChangeStarted.Done()
blockTypeSchemaChange.Wait()
return nil
},
}

Expand Down

0 comments on commit 8374c0f

Please sign in to comment.