Skip to content

Commit

Permalink
sql: fix bad interraction with DROP region rollback and alter to RBR
Browse files Browse the repository at this point in the history
When altering to a regional by row table we only add partitions for
PUBLIC enum values (regions). The type schema change job that
negotiates promotion/demotion is responsible for re-partitioning
regional by row tables. Previously, this re-partitioning only happened
if the type schema change job succeeded.

This behavior missed an edge case, which looks like:
- Client 1 drops region A from the database. The user txn commits.
- Client 2 alters table t to a RBR.
- The drop region fails, and therefore rolls back.
- table t is now missing a partition for region A.

This patch fixes this problem by trigerring a re-partition even when
the type schema change job is rolling back.

Informs cockroachdb#63462

Release note (bug fix): Previously if a user altered a table to
REGIONAL BY ROW when a region was being dropped, and the drop failed
and had to be rolled back, it could have resulted in the Regional By
Row table missing a partition for this region. This is now fixed and
region drop failure rollbacks are sane.
  • Loading branch information
arulajmani committed Apr 13, 2021
1 parent 3b30f98 commit a05c4db
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 1 deletion.
133 changes: 133 additions & 0 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,139 @@ CREATE TABLE db.rbr () LOCALITY REGIONAL BY ROW`)
}
}

// TestRegionAddDropEnclosingRegionalByRowAlters tests adding/dropping regions
// (expected to fail) with a concurrent alter to a regional by row table. The
// sketch of the test is as follows:
// - Client 1 performs an ALTER ADD / DROP REGION. Let the user txn commit.
// - Block in the type schema changer.
// - Client 2 alters a REGIONAL / GLOBAL table to a REGIONAL BY ROW table. Let
// this operation finish.
// - Force a rollback on the REGION ADD / DROP by injecting an error.
// - Ensure the partitions on the REGIONAL BY ROW table are sane.
func TestRegionAddDropFailureEnclosingRegionalByRowAlters(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "times out under race")

// Decrease the adopt loop interval so that retries happen quickly.
defer sqltestutils.SetTestJobsAdoptInterval()()

testCases := []struct {
name string
setup string
regionAlterCmd string
}{
{
"region-drop-alter-from-global",
`CREATE TABLE db.t () LOCALITY GLOBAL`,
`ALTER DATABASE db DROP REGION "us-east3"`,
},
{
"region-drop-alter-from-regional",
`CREATE TABLE db.t () LOCALITY REGIONAL`,
`ALTER DATABASE db DROP REGION "us-east3"`,
},
{
"region-add-alter-from-global",
`CREATE TABLE db.t () LOCALITY GLOBAL`,
`ALTER DATABASE db ADD REGION "us-east4"`,
},
{
"region-add-alter-from-regional",
`CREATE TABLE db.t () LOCALITY REGIONAL`,
`ALTER DATABASE db ADD REGION "us-east4"`,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
var mu syncutil.Mutex
typeChangeStarted := make(chan struct{})
typeChangeFinished := make(chan struct{})
rbrOpFinished := make(chan struct{})

knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeExec: func() error {
mu.Lock()
defer mu.Unlock()
close(typeChangeStarted)
<-rbrOpFinished
// Trigger a roll-back.
return errors.New("boom")
},
},
}

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 4 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

_, err := sqlDB.Exec(`
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3";
`)
require.NoError(t, err)

_, err = sqlDB.Exec(tc.setup)
require.NoError(t, err)

go func() {
_, err := sqlDB.Exec(tc.regionAlterCmd)
if !testutils.IsError(err, "boom") {
t.Errorf("expected error boom, found %v", err)
}
close(typeChangeFinished)
}()

<-typeChangeStarted

_, err = sqlDB.Exec(`ALTER TABLE db.t SET LOCALITY REGIONAL BY ROW`)
close(rbrOpFinished)
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
rows, err := sqlDB.Query("SELECT partition_name FROM [SHOW PARTITIONS FROM TABLE db.t] ORDER BY partition_name")
if err != nil {
return err
}
defer rows.Close()

var partitionNames []string
for rows.Next() {
var partitionName string
if err := rows.Scan(&partitionName); err != nil {
return err
}

partitionNames = append(partitionNames, partitionName)
}

expectedPartitions := []string{"us-east1", "us-east2", "us-east3"}
if len(partitionNames) != len(expectedPartitions) {
return errors.AssertionFailedf(
"unexpected number of partitions; expected %d, found %d",
len(expectedPartitions),
len(partitionNames),
)
}
for i := range expectedPartitions {
if expectedPartitions[i] != partitionNames[i] {
return errors.AssertionFailedf(
"unexpected partitions; expected %v, found %v",
expectedPartitions,
partitionNames,
)
}
}
return nil
})
<-typeChangeFinished
})
}
}

func TestSettingPrimaryRegionAmidstDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
24 changes: 23 additions & 1 deletion pkg/sql/type_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ func applyFilterOnEnumMembers(
// 2. If an enum value was being removed as part of this txn, we promote
// it back to writable.
func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error {
var multiRegionFinalizer *multiRegionFinalizer
// Cleanup:

cleanup := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
typeDesc, err := descsCol.GetMutableTypeVersionByID(ctx, txn, t.typeID)
if err != nil {
Expand Down Expand Up @@ -477,7 +479,20 @@ func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error {
); err != nil {
return err
}
return txn.Run(ctx, b)

if err := txn.Run(ctx, b); err != nil {
return err
}

if typeDesc.Kind == descpb.TypeDescriptor_MULTIREGION_ENUM {
multiRegionFinalizer = NewMultiRegionFinalizer(t.typeID)

if err := multiRegionFinalizer.finalize(ctx, txn, descsCol, t.execCfg); err != nil {
return err
}
}

return nil
}
if err := descs.Txn(ctx, t.execCfg.Settings, t.execCfg.LeaseManager, t.execCfg.InternalExecutor,
t.execCfg.DB, cleanup); err != nil {
Expand All @@ -491,6 +506,13 @@ func (t *typeSchemaChanger) cleanupEnumValues(ctx context.Context) error {
}
return err
}

if multiRegionFinalizer != nil {
if err := multiRegionFinalizer.waitToUpdateLeases(ctx, t.execCfg.LeaseManager); err != nil {
return nil
}
}

return nil
}

Expand Down

0 comments on commit a05c4db

Please sign in to comment.