Skip to content

Commit

Permalink
multi-region: Unblock IMPORT into allowable multi-region tables
Browse files Browse the repository at this point in the history
Unblock IMPORTing into allowable multi-region tables.  We only support
importing into GLOBAL and REGIONAL BY TABLE tables as #61133 is blocking
IMPORT into any tables which have columns generated using a user defined
type (which covers REGIONAL BY ROW tables, as they have the crdb_region
column which is generated using the crdb_internal_region type).

This commit includes tests for both IMPORT and IMPORT INTO, as well as
cases which illustrate #61133 for non-multi-region tables.

Release note: None

Release justification: Fixes bug in interaction between existing
functionality and new multi-region feature.
  • Loading branch information
ajstorm committed Feb 28, 2021
1 parent dac79a2 commit 1757254
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 14 deletions.
17 changes: 14 additions & 3 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ func WriteDescriptors(
table.GetID(), table)
}
}

// If the table descriptor is being written to a multi-region database and
// the table does not have a locality config setup, set one up here. The
// table's locality config will be set to the default locality - REGIONAL
Expand All @@ -377,8 +376,20 @@ func WriteDescriptors(
if err != nil {
return err
}
if dbDesc.GetRegionConfig() != nil && table.GetLocalityConfig() == nil {
table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName)
if dbDesc.GetRegionConfig() != nil {
if table.GetLocalityConfig() == nil {
table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName)
}
} else {
// If the database is not multi-region enabled, ensure that we don't
// write any multi-region table descriptors into it.
if table.GetLocalityConfig() != nil {
return pgerror.Newf(pgcode.FeatureNotSupported,
"cannot write descriptor for multi-region table %d into non-multi-region database %d",
table.GetID(),
table.GetParentID(),
)
}
}

if err := descsCol.WriteDescToBatch(
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ go_test(
"//pkg/blobs",
"//pkg/ccl/backupccl",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl/format",
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,14 @@ func importPlanHook(
)
}
}
if create.Locality != nil &&
create.Locality.LocalityLevel == tree.LocalityLevelRow {
return unimplemented.NewWithIssueDetailf(
61133,
"import.regional-by-row",
"IMPORT to REGIONAL BY ROW table not supported",
)
}
tbl, err := MakeSimpleTableDescriptor(
ctx, p.SemaCtx(), p.ExecCfg().Settings, create, parentID, parentSchemaID, defaultCSVTableID, NoFKs, walltime)
if err != nil {
Expand Down Expand Up @@ -1209,6 +1217,9 @@ func prepareExistingTableDescForIngestion(
if len(desc.Mutations) > 0 {
return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String())
}
if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil {
return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported")
}

// Note that desc is just used to verify that the version matches.
importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn)
Expand Down
173 changes: 168 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -1214,6 +1216,8 @@ func TestImportUserDefinedTypes(t *testing.T) {
intoCols string
verifyQuery string
expected [][]string
err bool
errString string
}{
// Test CSV imports.
{
Expand Down Expand Up @@ -1251,6 +1255,15 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test table with default value
{
create: "a greeting, b greeting default 'hi'",
intoCols: "a, b",
typ: "PGCOPY",
contents: "hello\nhi\thi\n",
err: true,
errString: "type OID 100052 does not exist",
},
}

// Test IMPORT INTO.
Expand All @@ -1263,11 +1276,18 @@ func TestImportUserDefinedTypes(t *testing.T) {
require.Equal(t, len(test.contents), n)
// Run the import statement.
sqlDB.Exec(t, fmt.Sprintf("CREATE TABLE t (%s)", test.create))
sqlDB.Exec(t,
fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ),
fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name())))
// Ensure that the table data is as we expect.
sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected)

importStmt := fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ)
importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))

if !test.err {
sqlDB.Exec(t, importStmt, importArgs)
// Ensure that the table data is as we expect.
sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected)
} else {
sqlDB.ExpectErr(t, test.errString, importStmt, importArgs)
}

// Clean up after the test.
sqlDB.Exec(t, "DROP TABLE t")
}
Expand Down Expand Up @@ -6411,6 +6431,149 @@ func TestImportAvro(t *testing.T) {
})
}

func TestImportMultiRegion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const (
nodes = 3
)
ctx := context.Background()
baseDir := filepath.Join("testdata", "avro")
clusterArgs := base.TestClusterArgs{
ServerArgsPerNode: map[int]base.TestServerArgs{},
}

// Create a multi-region cluster
clusterArgs.ServerArgsPerNode[0] = base.TestServerArgs{
ExternalIODir: baseDir,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: "ap-southeast-2"},
{Key: "availability-zone", Value: "ap-az1"},
},
},
}
clusterArgs.ServerArgsPerNode[1] = base.TestServerArgs{
ExternalIODir: baseDir,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: "ca-central-1"},
{Key: "availability-zone", Value: "ca-az1"},
},
},
}
clusterArgs.ServerArgsPerNode[2] = base.TestServerArgs{
ExternalIODir: baseDir,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: "us-east-1"},
{Key: "availability-zone", Value: "us-az1"},
},
},
}

tc := testcluster.StartTestCluster(t, nodes, clusterArgs)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`)

// Create the databases
sqlDB.Exec(t, `CREATE DATABASE foo`)
sqlDB.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "ca-central-1" REGIONS
"ap-southeast-2", "us-east-1"`)

simpleOcf := fmt.Sprintf("nodelocal://0/%s", "simple.ocf")

// Table schemas for USING
tableSchemaMR := fmt.Sprintf("nodelocal://0/%s", "simple-schema-multi-region.sql")
tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/%s",
"simple-schema-multi-region-regional-by-row.sql")

tests := []struct {
name string
db string
table string
sql string
create string
args []interface{}
err bool
errString string
}{
{
name: "import-create-using-multi-region-to-non-multi-region-database",
db: "foo",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMR, simpleOcf},
err: true,
errString: "cannot write descriptor for multi-region table",
},
{
name: "import-create-using-multi-region-regional-by-table-to-multi-region-database",
db: "multi_region",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMR, simpleOcf},
},
{
name: "import-create-using-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMRRegionalByRow, simpleOcf},
err: true,
errString: "IMPORT to REGIONAL BY ROW table not supported",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
err: true,
errString: "IMPORT into REGIONAL BY ROW table not supported",
},
{
name: "import-into-using-multi-region-global-to-multi-region-database",
db: "multi_region",
table: "mr_global",
create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL",
sql: "IMPORT INTO mr_global AVRO DATA ($1)",
args: []interface{}{simpleOcf},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
sqlDB.Exec(t, fmt.Sprintf(`SET DATABASE = %q`, test.db))
sqlDB.Exec(t, fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table))

if test.create != "" {
sqlDB.Exec(t, test.create)
}

if test.err {
if test.errString == "" {
t.Error(`invalid test case: expects error, but no error string provided`)
}
sqlDB.ExpectErr(t, test.errString, test.sql, test.args...)
} else {
_, err := sqlDB.DB.ExecContext(context.Background(), test.sql, test.args...)
require.NoError(t, err)

var numRows int
sqlDB.QueryRow(t, fmt.Sprintf("SELECT count(*) FROM %q", test.table)).Scan(&numRows)
if numRows == 0 {
t.Error("expected some rows after import")
}
}
})
}
}

// TestImportClientDisconnect ensures that an import job can complete even if
// the client connection which started it closes. This test uses a helper
// subprocess to force a closed client connection without needing to rely
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- noinspection SqlDialectInspectionForFile

-- noinspection SqlNoDataSourceInspectionForFile

CREATE TABLE public.simple (
i integer PRIMARY KEY,
s text,
b bytea
) LOCALITY REGIONAL BY ROW;

10 changes: 10 additions & 0 deletions pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- noinspection SqlDialectInspectionForFile

-- noinspection SqlNoDataSourceInspectionForFile

CREATE TABLE public.simple (
i integer PRIMARY KEY,
s text,
b bytea
) LOCALITY REGIONAL BY TABLE;

11 changes: 5 additions & 6 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ func (n *createTableNode) startExec(params runParams) error {
return err
}

dg := catalogkv.NewOneLevelUncachedDescGetter(params.p.txn, params.ExecCfg().Codec)
if desc.LocalityConfig != nil {
if err := desc.ValidateTableLocalityConfig(params.ctx, dg); err != nil {
return err
}

_, dbDesc, err := params.p.Descriptors().GetImmutableDatabaseByID(
params.ctx,
params.p.txn,
Expand Down Expand Up @@ -404,7 +409,6 @@ func (n *createTableNode) startExec(params runParams) error {
}
}

dg := catalogkv.NewOneLevelUncachedDescGetter(params.p.txn, params.ExecCfg().Codec)
if err := desc.Validate(params.ctx, dg); err != nil {
return err
}
Expand Down Expand Up @@ -2278,11 +2282,6 @@ func NewTableDesc(
} else {
return nil, errors.Newf("unknown locality level: %v", n.Locality.LocalityLevel)
}

dg := catalogkv.NewOneLevelUncachedDescGetter(txn, evalCtx.Codec)
if err := desc.ValidateTableLocalityConfig(ctx, dg); err != nil {
return nil, err
}
}

return &desc, nil
Expand Down

0 comments on commit 1757254

Please sign in to comment.