Skip to content

Commit

Permalink
sql: Repartition tables before dropping regions
Browse files Browse the repository at this point in the history
Previously we could get into a situation where on dropping a region,
concurrent queries on REGIONAL BY ROW tables could fail. This was due to
the fact that when resolving the partition tuple in the optimizer, we'd
encounter a partition without a corresponding enum value. This issue was
timing dependant, and would only be hit if the query had a leased type
descriptor from after the drop region, along with a table descriptor
from before the drop region.

To get around this problem, we introduce a new transaction to the drop
region schema changer which performs a pre-drop action of repartitioning
all REGIONAL BY ROW tables, and updating their leases. This ensures that
the table descriptors will be seen _before_ the modified type
descriptors.

Of note is the fact that this is only required on drop region. In the
add region case, having this mismatch occur and seeing an extra region
(with no corresponding partition) is not a problem for the query engine.

Release note (sql change): Fix a bug where queries on REGIONAL BY ROW tables
could fail in the brief window in which a DROP REGION operation is in
progress.
  • Loading branch information
ajstorm committed Apr 29, 2021
1 parent 97955d4 commit 4a5db31
Show file tree
Hide file tree
Showing 4 changed files with 291 additions and 3 deletions.
209 changes: 209 additions & 0 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,3 +921,212 @@ func TestRollbackDuringAddDropRegionAsyncJobFailure(t *testing.T) {
})
}
}

// TestRegionAddDropEnclosingBackupOps tests adding/dropping regions
// (which may or may not succeed) with a concurrent backup operation
// The sketch of the test is as follows:
// - Client 1 performs an ALTER ADD / DROP REGION. Let the user txn commit.
// - Block in the type schema changer.
// - Client 2 performs a backup operation.
// - Resume blocked schema change job.
// - Startup a new cluster.
// - Restore the database, block in the schema changer.
// - Fail or succeed the schema change job.
// - Validate that the database and its tables look as expected.
func TestRegionAddDropWithConcurrentBackupOps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "times out under race")

// Decrease the adopt loop interval so that retries happen quickly.
defer sqltestutils.SetTestJobsAdoptInterval()()

regionAlterCmds := []struct {
name string
cmd string
shouldSucceed bool
expectedPartitions []string
}{
{
name: "drop-region-fail",
cmd: `ALTER DATABASE db DROP REGION "us-east3"`,
shouldSucceed: false,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3"},
},
{
name: "drop-region-succeed",
cmd: `ALTER DATABASE db DROP REGION "us-east3"`,
shouldSucceed: true,
expectedPartitions: []string{"us-east1", "us-east2"},
},
{
name: "add-region-fail",
cmd: `ALTER DATABASE db ADD REGION "us-east4"`,
shouldSucceed: false,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3"},
},
{
name: "add-region-succeed",
cmd: `ALTER DATABASE db ADD REGION "us-east4"`,
shouldSucceed: true,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3", "us-east4"},
},
}

testCases := []struct {
name string
backupOp string
restoreOp string
}{
{
name: "backup-database",
backupOp: `BACKUP DATABASE db TO 'nodelocal://0/db_backup'`,
restoreOp: `RESTORE DATABASE db FROM 'nodelocal://0/db_backup'`,
},
}

for _, tc := range testCases {
for _, regionAlterCmd := range regionAlterCmds {
t.Run(regionAlterCmd.name+"-"+tc.name, func(t *testing.T) {
var mu syncutil.Mutex
typeChangeStarted := make(chan struct{})
typeChangeFinished := make(chan struct{})
backupOpFinished := make(chan struct{})
waitInTypeSchemaChangerDuringBackup := true

backupKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
defer mu.Unlock()
if waitInTypeSchemaChangerDuringBackup {
waitInTypeSchemaChangerDuringBackup = false
close(typeChangeStarted)
<-backupOpFinished
}
// Always return success here. The goal of this test isn't to
// fail during the backup, but to do so during the restore.
return nil
},
},
}

tempExternalIODir, tempDirCleanup := testutils.TempDir(t)
defer tempDirCleanup()

_, sqlDBBackup, cleanupBackup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 4 /* numServers */, backupKnobs, &tempExternalIODir,
)
defer cleanupBackup()

_, err := sqlDBBackup.Exec(`
DROP DATABASE IF EXISTS db;
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3";
USE db;
CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW;
INSERT INTO db.rbr VALUES (1,1),(2,2),(3,3);
`)
require.NoError(t, err)

go func() {
defer func() {
close(typeChangeFinished)
}()
_, err := sqlDBBackup.Exec(regionAlterCmd.cmd)
if err != nil {
t.Errorf("expected success, got %v when executing %s", err, regionAlterCmd.cmd)
}
}()

<-typeChangeStarted

_, err = sqlDBBackup.Exec(tc.backupOp)
close(backupOpFinished)
require.NoError(t, err)

<-typeChangeFinished

restoreKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
defer mu.Unlock()
if !regionAlterCmd.shouldSucceed {
// Trigger a roll-back.
return errors.New("nope")
}
// Trod on.
return nil
},
},
}

// Start a new cluster (with new testing knobs) for restore.
_, sqlDBRestore, cleanupRestore := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 4 /* numServers */, restoreKnobs, &tempExternalIODir,
)
defer cleanupRestore()

_, err = sqlDBRestore.Exec(tc.restoreOp)
require.NoError(t, err)

// First ensure that the data was restored correctly.
numRows := sqlDBRestore.QueryRow(`SELECT count(*) from db.rbr`)
require.NoError(t, numRows.Err())
var count int
err = numRows.Scan(&count)
require.NoError(t, err)
if count != 3 {
t.Logf("unexpected number of rows after restore: expected 3, found %d", count)
}

// Now validate that the background job has completed and the
// regions are in the expected state.
testutils.SucceedsSoon(t, func() error {
dbRegions := make([]string, 0, len(regionAlterCmd.expectedPartitions))
rowsRegions, err := sqlDBRestore.Query("SELECT region FROM [SHOW REGIONS FROM DATABASE db]")
require.NoError(t, err)
defer rowsRegions.Close()
for {
done := rowsRegions.Next()
if !done {
require.NoError(t, rowsRegions.Err())
break
}
var region string
err := rowsRegions.Scan(&region)
require.NoError(t, err)
dbRegions = append(dbRegions, region)
}
if len(dbRegions) != len(regionAlterCmd.expectedPartitions) {
return errors.Newf("unexpected number of regions, expected: %v found %v",
regionAlterCmd.expectedPartitions,
dbRegions,
)
}
for i, expectedRegion := range regionAlterCmd.expectedPartitions {
if expectedRegion != dbRegions[i] {
return errors.Newf("unexpected regions, expected: %v found %v",
regionAlterCmd.expectedPartitions,
dbRegions,
)
}
}
return nil
})

// Finally, confirm that all of the tables were repartitioned
// correctly by the above ADD/DROP region job.
testutils.SucceedsSoon(t, func() error {
return multiregionccltestutils.TestingEnsureCorrectPartitioning(
sqlDBRestore,
"db",
"rbr",
[]string{"rbr@primary"},
)
})
})
}
}
}
9 changes: 9 additions & 0 deletions pkg/sql/database_region_change_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ func (r *databaseRegionChangeFinalizer) finalize(ctx context.Context, txn *kv.Tx
return r.repartitionRegionalByRowTables(ctx, txn)
}

// preDrop is called in advance of dropping regions from a multi-region
// database. This function just re-partitions the REGIONAL BY ROW tables in
// advance of the type descriptor change, to ensure that the table and type
// descriptors never become incorrect (from a query perspective). For more info,
// see the caller.
func (r *databaseRegionChangeFinalizer) preDrop(ctx context.Context, txn *kv.Txn) error {
return r.repartitionRegionalByRowTables(ctx, txn)
}

// updateDatabaseZoneConfig updates the zone config of the database that
// encloses the multi-region enum such that there is an entry for all PUBLIC
// region values.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/opt_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,8 @@ func (oi *optIndex) init(
valueEncBuf, nil, /* prefixDatums */
)
if err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "while decoding partition tuple"))
panic(errors.NewAssertionErrorWithWrappedErrf(err,
"while decoding partition tuple: %+v %+v", oi.tab.desc, oi.tab.desc.GetDependsOnTypes()))
}
oi.partitions[i].datums = append(oi.partitions[i].datums, t.Datums)
// TODO(radu): split into multiple prefixes if Subpartition is also by list.
Expand Down
73 changes: 71 additions & 2 deletions pkg/sql/type_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error {
}
}

multiRegionPreDropIsNecessary := false

// First, we check if any of the enum values that are being removed are in
// use and fail. This is done in a separate txn to the one that mutates the
// descriptor, as this validation can take arbitrarily long.
Expand All @@ -299,6 +301,9 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error {
if err := t.canRemoveEnumValue(ctx, typeDesc, txn, &member, descsCol); err != nil {
return err
}
if typeDesc.Kind == descpb.TypeDescriptor_MULTIREGION_ENUM {
multiRegionPreDropIsNecessary = true
}
}
}
return nil
Expand All @@ -312,9 +317,73 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error {

var regionChangeFinalizer *databaseRegionChangeFinalizer

// Now that we've ascertained that the enum values can be removed, we can
// actually go about modifying the type descriptor.
// In the case where we're dropping elements from a multi-region enum,
// we first re-partition all REGIONAL BY ROW tables. This is to handle
// the dependency which exist between the partitioning and the enum.
//
// There are places in the query path (specifically, when we decode
// the partitioning tuple) where we validate that for a given partition,
// that it's respective value exists in the multi-region enum. In cases
// where we're in the process of a DROP REGION however, if we don't
// repartition the table first, we can get into a situation where the
// query holds the new version of the enum type descriptor (in which
// the partition has already been dropped) and the old version of the
// table descriptor (in which the partition still exists). This
// situation causes a panic, and the query fails.
//
// To address this issue, and only in the DROP REGION case, we
// repartition the tables first, and drop the value from the enum in a
// separate transaction. Note that we must refresh the table descriptors
// before we proceed to the drop enum portion, so that we ensure that
// any concurrent queries see the descriptor updates in the correct
// order.
//
// It's also worth noting that we don't need to be concerned about
// exposing things in the right order in OnFailOrCancel. This is because
// OnFailOrCancel doesn't expose any new state in the type descriptor
// (it just cleans up non-public states).
if multiRegionPreDropIsNecessary {
preDrop := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
typeDesc, err := descsCol.GetMutableTypeVersionByID(ctx, txn, t.typeID)
if err != nil {
return err
}

regionChangeFinalizer, err = newDatabaseRegionChangeFinalizer(
ctx,
txn,
t.execCfg,
descsCol,
typeDesc.GetParentID(),
typeDesc.GetID(),
)
if err != nil {
return err
}
defer regionChangeFinalizer.cleanup()

if err := regionChangeFinalizer.preDrop(ctx, txn); err != nil {
return err
}
return nil
}
if err := descs.Txn(
ctx, t.execCfg.Settings, t.execCfg.LeaseManager,
t.execCfg.InternalExecutor, t.execCfg.DB, preDrop,
); err != nil {
return err
}

// Now update the leases to ensure the that new table descriptor is
// visible to all nodes.
if err := regionChangeFinalizer.waitToUpdateLeases(ctx, leaseMgr); err != nil {
return err
}
}

// Now that we've ascertained that the enum values can be removed, and
// have performed any necessary pre-drop work, we can actually go about
// modifying the type descriptor.
run := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
typeDesc, err := descsCol.GetMutableTypeVersionByID(ctx, txn, t.typeID)
if err != nil {
Expand Down

0 comments on commit 4a5db31

Please sign in to comment.