Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] sql: partition REGIONAL BY ROW on all indexes during region changes #63968

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2625,7 +2625,7 @@ CREATE TABLE hash_sharded_idx_table (
pk INT PRIMARY KEY USING HASH WITH BUCKET_COUNT = 8
)

statement error cannot convert a table to REGIONAL BY ROW if table table contains hash sharded indexes
statement error cannot convert hash_sharded_idx_table to REGIONAL BY ROW as the table contains hash sharded indexes
ALTER TABLE hash_sharded_idx_table SET LOCALITY REGIONAL BY ROW

statement ok
Expand Down
201 changes: 156 additions & 45 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@
package multiregionccl_test

import (
gosql "database/sql"
"fmt"
"strings"
"sync"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -349,6 +352,92 @@ CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3"
}
}

// TestDropIndexFailureDuringAddOrDropRegion a DROP INDEX failing during
// ADD/DROP REGION, in which case the dropped index should still contain
// the new indexes.
func TestDropIndexFailureDuringAddOrDropRegion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

defer sqltestutils.SetTestJobsAdoptInterval()()

for _, tc := range []struct {
regionChangeSQL string
expectedPartitions []string
}{
{
regionChangeSQL: `ALTER DATABASE db DROP REGION "us-east2"`,
expectedPartitions: []string{"us-east1"},
},
{
regionChangeSQL: `ALTER DATABASE db ADD REGION "us-east3"`,
expectedPartitions: []string{"us-east1", "us-east2", "us-east3"},
},
} {
t.Run(tc.regionChangeSQL, func(t *testing.T) {
block := false
typeChangeFinished := make(chan struct{})
knobs := base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
RunBeforeBackfill: func() error {
if block {
block = false
<-typeChangeFinished
return errors.New("crikey!")
}
return nil
},
},
}

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

_, err := sqlDB.Exec(`
DROP DATABASE IF EXISTS db;
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2";
CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL, INDEX v_idx(v)) LOCALITY REGIONAL BY ROW;
`)
require.NoError(t, err)

block = true
var wg sync.WaitGroup
go func() {
defer wg.Done()

_, err := sqlDB.Exec(`DROP INDEX db.rbr@v_idx`)
if err != nil {
if !strings.Contains(err.Error(), "crikey!") {
t.Errorf("failed to run drop index for unknown reason: %+v", err)
}
} else {
t.Errorf("expected error during DROP INDEX")
}
}()
wg.Add(1)

_, err = sqlDB.Exec(tc.regionChangeSQL)
close(typeChangeFinished)
require.NoError(t, err)

// Wait for DROP INDEX to complete.
wg.Wait()

require.NoError(
t,
validateIndexAndPartitions(
sqlDB,
"db.rbr",
[]string{"rbr@primary", "rbr@v_idx"},
tc.expectedPartitions,
),
)
})
}
}

// TestRegionAddDropEnclosingRegionalByRowOps tests adding/dropping regions
// (which may or may not succeed) with a concurrent operation on a regional by
// row table. The sketch of the test is as follows:
Expand Down Expand Up @@ -400,9 +489,10 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {
}

testCases := []struct {
name string
op string
expectedIndexes []string
name string
op string
expectedIndexes []string
additionalSetupQuery string
}{
{
name: "create-rbr-table",
Expand Down Expand Up @@ -434,11 +524,17 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {
op: `CREATE UNIQUE INDEX uniq ON db.rbr(v)`,
expectedIndexes: []string{"rbr@primary", "rbr@uniq"},
},
{
name: "drop-index",
op: "DROP INDEX db.rbr@v_idx",
additionalSetupQuery: "CREATE INDEX v_idx ON db.rbr(v)",
expectedIndexes: []string{"rbr@primary"},
},
}

for _, tc := range testCases {
for _, regionAlterCmd := range regionAlterCmds {
t.Run(regionAlterCmd.name+"-"+tc.name, func(t *testing.T) {
t.Run(fmt.Sprintf("%s/%s", regionAlterCmd.name, tc.name), func(t *testing.T) {
var mu syncutil.Mutex
typeChangeStarted := make(chan struct{})
typeChangeFinished := make(chan struct{})
Expand Down Expand Up @@ -470,7 +566,7 @@ func TestRegionAddDropEnclosingRegionalByRowOps(t *testing.T) {
DROP DATABASE IF EXISTS db;
CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2", "us-east3";
CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW;
`)
` + tc.additionalSetupQuery)
require.NoError(t, err)

go func() {
Expand All @@ -496,52 +592,67 @@ CREATE TABLE db.rbr(k INT PRIMARY KEY, v INT NOT NULL) LOCALITY REGIONAL BY ROW;
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
rows, err := sqlDB.Query("SELECT index_name, partition_name FROM [SHOW PARTITIONS FROM TABLE db.rbr] ORDER BY partition_name")
if err != nil {
return err
}
defer rows.Close()
return validateIndexAndPartitions(
sqlDB,
"db.rbr",
tc.expectedIndexes,
regionAlterCmd.expectedPartitions,
)
})
<-typeChangeFinished
})
}
}
}

indexPartitions := make(map[string][]string)
for rows.Next() {
var indexName string
var partitionName string
if err := rows.Scan(&indexName, &partitionName); err != nil {
return err
}
func validateIndexAndPartitions(
sqlDB *gosql.DB, tableName string, expectedIndexes []string, expectedPartitions []string,
) error {
rows, err := sqlDB.Query(fmt.Sprintf(
"SELECT index_name, partition_name FROM [SHOW PARTITIONS FROM TABLE %s] ORDER BY partition_name",
tableName,
))
if err != nil {
return err
}
defer rows.Close()

indexPartitions[indexName] = append(indexPartitions[indexName], partitionName)
}
indexPartitions := make(map[string][]string)
for rows.Next() {
var indexName string
var partitionName string
if err := rows.Scan(&indexName, &partitionName); err != nil {
return err
}

for _, expectedIndex := range tc.expectedIndexes {
partitions, found := indexPartitions[expectedIndex]
if !found {
return errors.AssertionFailedf("did not find index %s", expectedIndex)
}
indexPartitions[indexName] = append(indexPartitions[indexName], partitionName)
}

if len(partitions) != len(regionAlterCmd.expectedPartitions) {
return errors.AssertionFailedf(
"unexpected number of partitions; expected %d, found %d",
len(partitions),
len(regionAlterCmd.expectedPartitions),
)
}
for i, expectedPartition := range regionAlterCmd.expectedPartitions {
if expectedPartition != partitions[i] {
return errors.AssertionFailedf(
"unexpected partitions; expected %v, found %v",
regionAlterCmd.expectedPartitions,
partitions,
)
}
}
}
return nil
})
<-typeChangeFinished
})
for _, expectedIndex := range expectedIndexes {
partitions, found := indexPartitions[expectedIndex]
if !found {
return errors.AssertionFailedf("did not find index %s", expectedIndex)
}

if len(partitions) != len(expectedPartitions) {
return errors.AssertionFailedf(
"unexpected number of partitions; expected %d, found %d",
len(partitions),
len(expectedPartitions),
)
}
for i, expectedPartition := range expectedPartitions {
if expectedPartition != partitions[i] {
return errors.AssertionFailedf(
"unexpected partitions; expected %v, found %v",
expectedPartitions,
partitions,
)
}
}
}
return nil

}

func TestSettingPrimaryRegionAmidstDrop(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,7 @@ func checkCanConvertTableToMultiRegion(
tableDesc.GetName(),
)
}
for _, idx := range tableDesc.NonDropIndexes() {
for _, idx := range tableDesc.AllIndexes() {
if idx.GetPartitioning().NumColumns > 0 {
return errors.WithDetailf(
pgerror.Newf(
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/alter_table_locality.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,13 @@ func (n *alterTableSetLocalityNode) alterTableLocalityToRegionalByRow(
return interleaveOnRegionalByRowError()
}

for _, idx := range n.tableDesc.NonDropIndexes() {
for _, idx := range n.tableDesc.AllIndexes() {
if idx.IsSharded() {
return pgerror.New(pgcode.FeatureNotSupported, "cannot convert a table to REGIONAL BY ROW if table table contains hash sharded indexes")
return pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot convert %s to REGIONAL BY ROW as the table contains hash sharded indexes",
tree.Name(n.tableDesc.GetName()),
)
}
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/sql/database_region_change_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,8 @@ func (r *databaseRegionChangeFinalizer) repartitionRegionalByRowTables(
// configurations from any partitions that are removed.
oldPartitioningDescs := make(map[descpb.IndexID]descpb.PartitioningDescriptor)

// Update the partitioning on all indexes of the table that aren't being
// dropped.
for _, index := range tableDesc.NonDropIndexes() {
// Update the partitioning on all indexes of the table.
for _, index := range tableDesc.AllIndexes() {
newIdx, err := CreatePartitioning(
ctx,
r.localPlanner.extendedEvalCtx.Settings,
Expand Down Expand Up @@ -215,7 +214,7 @@ func (r *databaseRegionChangeFinalizer) repartitionRegionalByRowTables(
// the type descriptor (removed enum values obviously aren't), so we must
// remove the partition from all indexes before trying to delete zone
// configurations.
for _, index := range tableDesc.NonDropIndexes() {
for _, index := range tableDesc.AllIndexes() {
oldPartitioning := oldPartitioningDescs[index.GetID()]

// Remove zone configurations that reference partition values we removed
Expand Down