Skip to content

Commit

Permalink
multi-region: Unblock IMPORT into allowable multi-region tables
Browse files Browse the repository at this point in the history
Unblock IMPORTing into allowable multi-region tables.  We only support
importing into GLOBAL and REGIONAL BY TABLE tables as #61133 is blocking
IMPORT into any tables which have columns generated using a user defined
type (which covers REGIONAL BY ROW tables, as they have the crdb_region
column which is generated using the crdb_internal_region type).

This commit includes tests for both IMPORT and IMPORT INTO, as well as
cases which illustrate #61133 for non-multi-region tables.

One thing of note is the moving of the call to
ValidateTableLocalityConfig from NewTableDesc to the startExec of
createTableNode. This is intentional, and was required, as the call to
NewTableDesc in IMPORT doesn't have the necessary infrastructure setup
(namely, a valid transaction and the EvalContext Codec) to validate the
LocalityConfig at that time. That being said, IMPORT does perform a proper
validation of the full table descriptor before it's written to disk.
Once it was deemed safe to remove the validation from NewTableDesc, the
validate had to be added to startExec of createTableNode, as the
LocalityConfig is used in there to generate a zone configuration,
and we want to ensure that the LocalityConfig is valid before we do that.

At this point there are no other calls to NewTableDesc which require
validation of the LocalityConfig.

Release note: None

Release justification: Fixes bug in interaction between existing
functionality and new multi-region feature.
  • Loading branch information
ajstorm committed Mar 1, 2021
1 parent dac79a2 commit a185494
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 14 deletions.
17 changes: 14 additions & 3 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ func WriteDescriptors(
table.GetID(), table)
}
}

// If the table descriptor is being written to a multi-region database and
// the table does not have a locality config setup, set one up here. The
// table's locality config will be set to the default locality - REGIONAL
Expand All @@ -377,8 +376,20 @@ func WriteDescriptors(
if err != nil {
return err
}
if dbDesc.GetRegionConfig() != nil && table.GetLocalityConfig() == nil {
table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName)
if dbDesc.GetRegionConfig() != nil {
if table.GetLocalityConfig() == nil {
table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName)
}
} else {
// If the database is not multi-region enabled, ensure that we don't
// write any multi-region table descriptors into it.
if table.GetLocalityConfig() != nil {
return pgerror.Newf(pgcode.FeatureNotSupported,
"cannot write descriptor for multi-region table %s into non-multi-region database %s",
table.GetName(),
dbDesc.GetName(),
)
}
}

if err := descsCol.WriteDescToBatch(
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ go_test(
"//pkg/blobs",
"//pkg/ccl/backupccl",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl/format",
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,14 @@ func importPlanHook(
)
}
}
if create.Locality != nil &&
create.Locality.LocalityLevel == tree.LocalityLevelRow {
return unimplemented.NewWithIssueDetailf(
61133,
"import.regional-by-row",
"IMPORT to REGIONAL BY ROW table not supported",
)
}
tbl, err := MakeSimpleTableDescriptor(
ctx, p.SemaCtx(), p.ExecCfg().Settings, create, parentID, parentSchemaID, defaultCSVTableID, NoFKs, walltime)
if err != nil {
Expand Down Expand Up @@ -1209,6 +1217,9 @@ func prepareExistingTableDescForIngestion(
if len(desc.Mutations) > 0 {
return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String())
}
if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil {
return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported")
}

// Note that desc is just used to verify that the version matches.
importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn)
Expand Down
173 changes: 168 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -1214,6 +1216,8 @@ func TestImportUserDefinedTypes(t *testing.T) {
intoCols string
verifyQuery string
expected [][]string
err bool
errString string
}{
// Test CSV imports.
{
Expand Down Expand Up @@ -1251,6 +1255,15 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test table with default value
{
create: "a greeting, b greeting default 'hi'",
intoCols: "a, b",
typ: "PGCOPY",
contents: "hello\nhi\thi\n",
err: true,
errString: "type OID 100052 does not exist",
},
}

// Test IMPORT INTO.
Expand All @@ -1263,11 +1276,18 @@ func TestImportUserDefinedTypes(t *testing.T) {
require.Equal(t, len(test.contents), n)
// Run the import statement.
sqlDB.Exec(t, fmt.Sprintf("CREATE TABLE t (%s)", test.create))
sqlDB.Exec(t,
fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ),
fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name())))
// Ensure that the table data is as we expect.
sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected)

importStmt := fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ)
importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))

if !test.err {
sqlDB.Exec(t, importStmt, importArgs)
// Ensure that the table data is as we expect.
sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected)
} else {
sqlDB.ExpectErr(t, test.errString, importStmt, importArgs)
}

// Clean up after the test.
sqlDB.Exec(t, "DROP TABLE t")
}
Expand Down Expand Up @@ -6411,6 +6431,149 @@ func TestImportAvro(t *testing.T) {
})
}

func TestImportMultiRegion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const (
nodes = 3
)
ctx := context.Background()
baseDir := filepath.Join("testdata", "avro")
clusterArgs := base.TestClusterArgs{
ServerArgsPerNode: map[int]base.TestServerArgs{},
}

// Create a multi-region cluster
clusterArgs.ServerArgsPerNode[0] = base.TestServerArgs{
ExternalIODir: baseDir,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: "ap-southeast-2"},
{Key: "availability-zone", Value: "ap-az1"},
},
},
}
clusterArgs.ServerArgsPerNode[1] = base.TestServerArgs{
ExternalIODir: baseDir,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: "ca-central-1"},
{Key: "availability-zone", Value: "ca-az1"},
},
},
}
clusterArgs.ServerArgsPerNode[2] = base.TestServerArgs{
ExternalIODir: baseDir,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: "us-east-1"},
{Key: "availability-zone", Value: "us-az1"},
},
},
}

tc := testcluster.StartTestCluster(t, nodes, clusterArgs)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`)

// Create the databases
sqlDB.Exec(t, `CREATE DATABASE foo`)
sqlDB.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "ca-central-1" REGIONS
"ap-southeast-2", "us-east-1"`)

simpleOcf := fmt.Sprintf("nodelocal://0/%s", "simple.ocf")

// Table schemas for USING
tableSchemaMR := fmt.Sprintf("nodelocal://0/%s", "simple-schema-multi-region.sql")
tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/%s",
"simple-schema-multi-region-regional-by-row.sql")

tests := []struct {
name string
db string
table string
sql string
create string
args []interface{}
err bool
errString string
}{
{
name: "import-create-using-multi-region-to-non-multi-region-database",
db: "foo",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMR, simpleOcf},
err: true,
errString: "cannot write descriptor for multi-region table",
},
{
name: "import-create-using-multi-region-regional-by-table-to-multi-region-database",
db: "multi_region",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMR, simpleOcf},
},
{
name: "import-create-using-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMRRegionalByRow, simpleOcf},
err: true,
errString: "IMPORT to REGIONAL BY ROW table not supported",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
err: true,
errString: "IMPORT into REGIONAL BY ROW table not supported",
},
{
name: "import-into-using-multi-region-global-to-multi-region-database",
db: "multi_region",
table: "mr_global",
create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL",
sql: "IMPORT INTO mr_global AVRO DATA ($1)",
args: []interface{}{simpleOcf},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(`SET DATABASE = %q`, test.db))
sqlDB.Exec(t, fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table))

if test.create != "" {
sqlDB.Exec(t, test.create)
}

if test.err {
if test.errString == "" {
t.Error(`invalid test case: expects error, but no error string provided`)
}
sqlDB.ExpectErr(t, test.errString, test.sql, test.args...)
} else {
_, err := sqlDB.DB.ExecContext(context.Background(), test.sql, test.args...)
require.NoError(t, err)

var numRows int
sqlDB.QueryRow(t, fmt.Sprintf("SELECT count(*) FROM %q", test.table)).Scan(&numRows)
if numRows == 0 {
t.Error("expected some rows after import")
}
}
})
}
}

// TestImportClientDisconnect ensures that an import job can complete even if
// the client connection which started it closes. This test uses a helper
// subprocess to force a closed client connection without needing to rely
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- noinspection SqlDialectInspectionForFile

-- noinspection SqlNoDataSourceInspectionForFile

CREATE TABLE public.simple (
i integer PRIMARY KEY,
s text,
b bytea
) LOCALITY REGIONAL BY ROW;

10 changes: 10 additions & 0 deletions pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- noinspection SqlDialectInspectionForFile

-- noinspection SqlNoDataSourceInspectionForFile

CREATE TABLE public.simple (
i integer PRIMARY KEY,
s text,
b bytea
) LOCALITY REGIONAL BY TABLE;

11 changes: 5 additions & 6 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ func (n *createTableNode) startExec(params runParams) error {
return err
}

dg := catalogkv.NewOneLevelUncachedDescGetter(params.p.txn, params.ExecCfg().Codec)
if desc.LocalityConfig != nil {
if err := desc.ValidateTableLocalityConfig(params.ctx, dg); err != nil {
return err
}

_, dbDesc, err := params.p.Descriptors().GetImmutableDatabaseByID(
params.ctx,
params.p.txn,
Expand Down Expand Up @@ -404,7 +409,6 @@ func (n *createTableNode) startExec(params runParams) error {
}
}

dg := catalogkv.NewOneLevelUncachedDescGetter(params.p.txn, params.ExecCfg().Codec)
if err := desc.Validate(params.ctx, dg); err != nil {
return err
}
Expand Down Expand Up @@ -2278,11 +2282,6 @@ func NewTableDesc(
} else {
return nil, errors.Newf("unknown locality level: %v", n.Locality.LocalityLevel)
}

dg := catalogkv.NewOneLevelUncachedDescGetter(txn, evalCtx.Codec)
if err := desc.ValidateTableLocalityConfig(ctx, dg); err != nil {
return nil, err
}
}

return &desc, nil
Expand Down

0 comments on commit a185494

Please sign in to comment.