Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: sql: Repartition tables before dropping regions #65984

Merged
merged 1 commit into from
Jun 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 209 additions & 0 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,3 +570,212 @@ func TestRollbackDuringAddDropRegionAsyncJobFailure(t *testing.T) {
})
}
}

// TestRegionAddDropEnclosingBackupOps tests adding/dropping regions
// (which may or may not succeed) with a concurrent backup operation
// The sketch of the test is as follows:
// - Client 1 performs an ALTER ADD / DROP REGION. Let the user txn commit.
// - Block in the type schema changer.
// - Client 2 performs a backup operation.
// - Resume blocked schema change job.
// - Startup a new cluster.
// - Restore the database, block in the schema changer.
// - Fail or succeed the schema change job.
// - Validate that the database and its tables look as expected.
func TestRegionAddDropWithConcurrentBackupOps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t, "times out under race")

// Decrease the adopt loop interval so that retries happen quickly.
defer sqltestutils.SetTestJobsAdoptInterval()()

regionAlterCmds := []struct {
name string
cmd string
shouldSucceed bool
expectedPartitions []string
}{
{
name: "drop-region-fail",
cmd: `ALTER DATABASE db DROP REGION "us-east3"`,
shouldSucceed: false,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3"},
},
{
name: "drop-region-succeed",
cmd: `ALTER DATABASE db DROP REGION "us-east3"`,
shouldSucceed: true,
expectedPartitions: []string{"us-east1", "us-east2"},
},
{
name: "add-region-fail",
cmd: `ALTER DATABASE db ADD REGION "us-east4"`,
shouldSucceed: false,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3"},
},
{
name: "add-region-succeed",
cmd: `ALTER DATABASE db ADD REGION "us-east4"`,
shouldSucceed: true,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3", "us-east4"},
},
}

testCases := []struct {
name string
backupOp string
restoreOp string
}{
{
name: "backup-database",
backupOp: `BACKUP DATABASE db TO 'nodelocal://0/db_backup'`,
restoreOp: `RESTORE DATABASE db FROM 'nodelocal://0/db_backup'`,
},
}

for _, tc := range testCases {
for _, regionAlterCmd := range regionAlterCmds {
t.Run(regionAlterCmd.name+"-"+tc.name, func(t *testing.T) {
var mu syncutil.Mutex
typeChangeStarted := make(chan struct{})
typeChangeFinished := make(chan struct{})
backupOpFinished := make(chan struct{})
waitInTypeSchemaChangerDuringBackup := true

backupKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
defer mu.Unlock()
if waitInTypeSchemaChangerDuringBackup {
waitInTypeSchemaChangerDuringBackup = false
close(typeChangeStarted)
<-backupOpFinished
}
// Always return success here. The goal of this test isn't to
// fail during the backup, but to do so during the restore.
return nil
},
},
}

tempExternalIODir, tempDirCleanup := testutils.TempDir(t)
defer tempDirCleanup()

_, sqlDBBackup, cleanupBackup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 4 /* numServers */, backupKnobs, &tempExternalIODir,
)
defer cleanupBackup()

_, err := sqlDBBackup.Exec(`
DROP DATABASE IF EXISTS db;
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3";
USE db;
CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW;
INSERT INTO db.rbr VALUES (1,1),(2,2),(3,3);
`)
require.NoError(t, err)

go func() {
defer func() {
close(typeChangeFinished)
}()
_, err := sqlDBBackup.Exec(regionAlterCmd.cmd)
if err != nil {
t.Errorf("expected success, got %v when executing %s", err, regionAlterCmd.cmd)
}
}()

<-typeChangeStarted

_, err = sqlDBBackup.Exec(tc.backupOp)
close(backupOpFinished)
require.NoError(t, err)

<-typeChangeFinished

restoreKnobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeEnumMemberPromotion: func() error {
mu.Lock()
defer mu.Unlock()
if !regionAlterCmd.shouldSucceed {
// Trigger a roll-back.
return errors.New("nope")
}
// Trod on.
return nil
},
},
}

// Start a new cluster (with new testing knobs) for restore.
_, sqlDBRestore, cleanupRestore := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 4 /* numServers */, restoreKnobs, &tempExternalIODir,
)
defer cleanupRestore()

_, err = sqlDBRestore.Exec(tc.restoreOp)
require.NoError(t, err)

// First ensure that the data was restored correctly.
numRows := sqlDBRestore.QueryRow(`SELECT count(*) from db.rbr`)
require.NoError(t, numRows.Err())
var count int
err = numRows.Scan(&count)
require.NoError(t, err)
if count != 3 {
t.Logf("unexpected number of rows after restore: expected 3, found %d", count)
}

// Now validate that the background job has completed and the
// regions are in the expected state.
testutils.SucceedsSoon(t, func() error {
dbRegions := make([]string, 0, len(regionAlterCmd.expectedPartitions))
rowsRegions, err := sqlDBRestore.Query("SELECT region FROM [SHOW REGIONS FROM DATABASE db]")
require.NoError(t, err)
defer rowsRegions.Close()
for {
done := rowsRegions.Next()
if !done {
require.NoError(t, rowsRegions.Err())
break
}
var region string
err := rowsRegions.Scan(&region)
require.NoError(t, err)
dbRegions = append(dbRegions, region)
}
if len(dbRegions) != len(regionAlterCmd.expectedPartitions) {
return errors.Newf("unexpected number of regions, expected: %v found %v",
regionAlterCmd.expectedPartitions,
dbRegions,
)
}
for i, expectedRegion := range regionAlterCmd.expectedPartitions {
if expectedRegion != dbRegions[i] {
return errors.Newf("unexpected regions, expected: %v found %v",
regionAlterCmd.expectedPartitions,
dbRegions,
)
}
}
return nil
})

// Finally, confirm that all of the tables were repartitioned
// correctly by the above ADD/DROP region job.
testutils.SucceedsSoon(t, func() error {
return multiregionccltestutils.TestingEnsureCorrectPartitioning(
sqlDBRestore,
"db",
"rbr",
[]string{"rbr@primary"},
)
})
})
}
}
}
9 changes: 9 additions & 0 deletions pkg/sql/database_region_change_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ func (r *databaseRegionChangeFinalizer) finalize(ctx context.Context, txn *kv.Tx
return r.repartitionRegionalByRowTables(ctx, txn)
}

// preDrop is called in advance of dropping regions from a multi-region
// database. This function just re-partitions the REGIONAL BY ROW tables in
// advance of the type descriptor change, to ensure that the table and type
// descriptors never become incorrect (from a query perspective). For more info,
// see the caller.
func (r *databaseRegionChangeFinalizer) preDrop(ctx context.Context, txn *kv.Txn) error {
return r.repartitionRegionalByRowTables(ctx, txn)
}

// updateDatabaseZoneConfig updates the zone config of the database that
// encloses the multi-region enum such that there is an entry for all PUBLIC
// region values.
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/opt_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,8 @@ func (oi *optIndex) init(
valueEncBuf, nil, /* prefixDatums */
)
if err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "while decoding partition tuple"))
panic(errors.NewAssertionErrorWithWrappedErrf(err,
"while decoding partition tuple: %+v", oi.tab.desc))
}
oi.partitions[i].datums = append(oi.partitions[i].datums, t.Datums)
// TODO(radu): split into multiple prefixes if Subpartition is also by list.
Expand Down
73 changes: 71 additions & 2 deletions pkg/sql/type_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,8 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error {
}
}

multiRegionPreDropIsNecessary := false

// First, we check if any of the enum values that are being removed are in
// use and fail. This is done in a separate txn to the one that mutates the
// descriptor, as this validation can take arbitrarily long.
Expand All @@ -296,6 +298,9 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error {
if err := t.canRemoveEnumValue(ctx, typeDesc, txn, &member, descsCol); err != nil {
return err
}
if typeDesc.Kind == descpb.TypeDescriptor_MULTIREGION_ENUM {
multiRegionPreDropIsNecessary = true
}
}
}
return nil
Expand All @@ -309,9 +314,73 @@ func (t *typeSchemaChanger) exec(ctx context.Context) error {

var regionChangeFinalizer *databaseRegionChangeFinalizer

// Now that we've ascertained that the enum values can be removed, we can
// actually go about modifying the type descriptor.
// In the case where we're dropping elements from a multi-region enum,
// we first re-partition all REGIONAL BY ROW tables. This is to handle
// the dependency which exist between the partitioning and the enum.
//
// There are places in the query path (specifically, when we decode
// the partitioning tuple) where we validate that for a given partition,
// that it's respective value exists in the multi-region enum. In cases
// where we're in the process of a DROP REGION however, if we don't
// repartition the table first, we can get into a situation where the
// query holds the new version of the enum type descriptor (in which
// the partition has already been dropped) and the old version of the
// table descriptor (in which the partition still exists). This
// situation causes a panic, and the query fails.
//
// To address this issue, and only in the DROP REGION case, we
// repartition the tables first, and drop the value from the enum in a
// separate transaction. Note that we must refresh the table descriptors
// before we proceed to the drop enum portion, so that we ensure that
// any concurrent queries see the descriptor updates in the correct
// order.
//
// It's also worth noting that we don't need to be concerned about
// exposing things in the right order in OnFailOrCancel. This is because
// OnFailOrCancel doesn't expose any new state in the type descriptor
// (it just cleans up non-public states).
if multiRegionPreDropIsNecessary {
preDrop := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
typeDesc, err := descsCol.GetMutableTypeVersionByID(ctx, txn, t.typeID)
if err != nil {
return err
}

regionChangeFinalizer, err = newDatabaseRegionChangeFinalizer(
ctx,
txn,
t.execCfg,
descsCol,
typeDesc.GetParentID(),
typeDesc.GetID(),
)
if err != nil {
return err
}
defer regionChangeFinalizer.cleanup()

if err := regionChangeFinalizer.preDrop(ctx, txn); err != nil {
return err
}
return nil
}
if err := descs.Txn(
ctx, t.execCfg.Settings, t.execCfg.LeaseManager,
t.execCfg.InternalExecutor, t.execCfg.DB, preDrop,
); err != nil {
return err
}

// Now update the leases to ensure the that new table descriptor is
// visible to all nodes.
if err := regionChangeFinalizer.waitToUpdateLeases(ctx, leaseMgr); err != nil {
return err
}
}

// Now that we've ascertained that the enum values can be removed, and
// have performed any necessary pre-drop work, we can actually go about
// modifying the type descriptor.
run := func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
typeDesc, err := descsCol.GetMutableTypeVersionByID(ctx, txn, t.typeID)
if err != nil {
Expand Down