Skip to content

Commit

Permalink
sql: partition REGIONAL BY ROW on all indexes during region changes
Browse files Browse the repository at this point in the history
Only doing REGIONAL BY ROW repartitions on non-drop indexes leads us to
a failure mode where if the DROP INDEX fails during an ADD/DROP REGION,
we do not reset the partitions appropriately.

This is now resolved.

Release note (bug fix): Fix a bug where a concurrent DROP INDEX which
fails whilst an ADD/DROP REGION is in progress leads to an invalid
partitioning set up.
  • Loading branch information
otan committed Apr 21, 2021
1 parent 3ed3826 commit faa2e64
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 49 deletions.
201 changes: 156 additions & 45 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
package multiregionccl_test

import (
gosql "database/sql"
"fmt"
"strings"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -349,6 +352,92 @@ CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"
}
}

// TestDropIndexFailureDuringAddOrDropRegion a DROP INDEX failing during
// ADD/DROP REGION, in which case the dropped index should still contain
// the new indexes.
func TestDropIndexFailureDuringAddOrDropRegion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer sqltestutils.SetTestJobsAdoptInterval()()

for _, tc := range []struct {
regionChangeSQL string
expectedPartitions []string
}{
{
regionChangeSQL: `ALTER DATABASE db DROP REGION "us-east2"`,
expectedPartitions: []string{"us-east1"},
},
{
regionChangeSQL: `ALTER DATABASE db ADD REGION "us-east3"`,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3"},
},
} {
t.Run(tc.regionChangeSQL, func(t *testing.T) {
block := false
typeChangeFinished := make(chan struct{})
knobs := base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeBackfill: func() error {
if block {
block = false
<-typeChangeFinished
return errors.New("crikey!")
}
return nil
},
},
}

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

_, err := sqlDB.Exec(`
DROP DATABASE IF EXISTS db;
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2";
CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL, INDEX v_idx(v)) LOCALITY REGIONAL BY ROW;
`)
require.NoError(t, err)

block = true
var wg sync.WaitGroup
go func() {
defer wg.Done()

_, err := sqlDB.Exec(`DROP INDEX db.rbr@v_idx`)
if err != nil {
if !strings.Contains(err.Error(), "crikey!") {
t.Errorf("failed to run drop index for unknown reason: %+v", err)
}
} else {
t.Errorf("expected error during DROP INDEX")
}
}()
wg.Add(1)

_, err = sqlDB.Exec(tc.regionChangeSQL)
close(typeChangeFinished)
require.NoError(t, err)

// Wait for DROP INDEX to complete.
wg.Wait()

require.NoError(
t,
validateIndexAndPartitions(
sqlDB,
"db.rbr",
[]string{"rbr@primary", "rbr@v_idx"},
tc.expectedPartitions,
),
)
})
}
}

// TestRegionAddDropEnclosingRegionalByRowOps tests adding/dropping regions
// (which may or may not succeed) with a concurrent operation on a regional by
// row table. The sketch of the test is as follows:
Expand Down Expand Up @@ -400,9 +489,10 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {
}

testCases := []struct {
name string
op string
expectedIndexes []string
name string
op string
expectedIndexes []string
additionalSetupQuery string
}{
{
name: "create-rbr-table",
Expand Down Expand Up @@ -434,11 +524,17 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {
op: `CREATE UNIQUE INDEX uniq ON db.rbr(v)`,
expectedIndexes: []string{"rbr@primary", "rbr@uniq"},
},
{
name: "drop-index",
op: "DROP INDEX db.rbr@v_idx",
additionalSetupQuery: "CREATE INDEX v_idx ON db.rbr(v)",
expectedIndexes: []string{"rbr@primary"},
},
}

for _, tc := range testCases {
for _, regionAlterCmd := range regionAlterCmds {
t.Run(regionAlterCmd.name+"-"+tc.name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s/%s", regionAlterCmd.name, tc.name), func(t *testing.T) {
var mu syncutil.Mutex
typeChangeStarted := make(chan struct{})
typeChangeFinished := make(chan struct{})
Expand Down Expand Up @@ -470,7 +566,7 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {
DROP DATABASE IF EXISTS db;
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3";
CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW;
`)
` + tc.additionalSetupQuery)
require.NoError(t, err)

go func() {
Expand All @@ -496,52 +592,67 @@ CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW;
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
rows, err := sqlDB.Query("SELECT index_name, partition_name FROM [SHOW PARTITIONS FROM TABLE db.rbr] ORDER BY partition_name")
if err != nil {
return err
}
defer rows.Close()
return validateIndexAndPartitions(
sqlDB,
"db.rbr",
tc.expectedIndexes,
regionAlterCmd.expectedPartitions,
)
})
<-typeChangeFinished
})
}
}
}

indexPartitions := make(map[string][]string)
for rows.Next() {
var indexName string
var partitionName string
if err := rows.Scan(&indexName, &partitionName); err != nil {
return err
}
func validateIndexAndPartitions(
sqlDB *gosql.DB, tableName string, expectedIndexes []string, expectedPartitions []string,
) error {
rows, err := sqlDB.Query(fmt.Sprintf(
"SELECT index_name, partition_name FROM [SHOW PARTITIONS FROM TABLE %s] ORDER BY partition_name",
tableName,
))
if err != nil {
return err
}
defer rows.Close()

indexPartitions[indexName] = append(indexPartitions[indexName], partitionName)
}
indexPartitions := make(map[string][]string)
for rows.Next() {
var indexName string
var partitionName string
if err := rows.Scan(&indexName, &partitionName); err != nil {
return err
}

for _, expectedIndex := range tc.expectedIndexes {
partitions, found := indexPartitions[expectedIndex]
if !found {
return errors.AssertionFailedf("did not find index %s", expectedIndex)
}
indexPartitions[indexName] = append(indexPartitions[indexName], partitionName)
}

if len(partitions) != len(regionAlterCmd.expectedPartitions) {
return errors.AssertionFailedf(
"unexpected number of partitions; expected %d, found %d",
len(partitions),
len(regionAlterCmd.expectedPartitions),
)
}
for i, expectedPartition := range regionAlterCmd.expectedPartitions {
if expectedPartition != partitions[i] {
return errors.AssertionFailedf(
"unexpected partitions; expected %v, found %v",
regionAlterCmd.expectedPartitions,
partitions,
)
}
}
}
return nil
})
<-typeChangeFinished
})
for _, expectedIndex := range expectedIndexes {
partitions, found := indexPartitions[expectedIndex]
if !found {
return errors.AssertionFailedf("did not find index %s", expectedIndex)
}

if len(partitions) != len(expectedPartitions) {
return errors.AssertionFailedf(
"unexpected number of partitions; expected %d, found %d",
len(partitions),
len(expectedPartitions),
)
}
for i, expectedPartition := range expectedPartitions {
if expectedPartition != partitions[i] {
return errors.AssertionFailedf(
"unexpected partitions; expected %v, found %v",
expectedPartitions,
partitions,
)
}
}
}
return nil

}

func TestSettingPrimaryRegionAmidstDrop(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/database_region_change_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,8 @@ func (r *databaseRegionChangeFinalizer) repartitionRegionalByRowTables(
// configurations from any partitions that are removed.
oldPartitioningDescs := make(map[descpb.IndexID]descpb.PartitioningDescriptor)

// Update the partitioning on all indexes of the table that aren't being
// dropped.
for _, index := range tableDesc.NonDropIndexes() {
// Update the partitioning on all indexes of the table.
for _, index := range tableDesc.AllIndexes() {
newIdx, err := CreatePartitioning(
ctx,
r.localPlanner.extendedEvalCtx.Settings,
Expand Down Expand Up @@ -215,7 +214,7 @@ func (r *databaseRegionChangeFinalizer) repartitionRegionalByRowTables(
// the type descriptor (removed enum values obviously aren't), so we must
// remove the partition from all indexes before trying to delete zone
// configurations.
for _, index := range tableDesc.NonDropIndexes() {
for _, index := range tableDesc.AllIndexes() {
oldPartitioning := oldPartitioningDescs[index.GetID()]

// Remove zone configurations that reference partition values we removed
Expand Down

0 comments on commit faa2e64

Please sign in to comment.