Skip to content

Commit

Permalink
multi-region: Unblock IMPORT into allowable multi-region tables
Browse files Browse the repository at this point in the history
Unblock IMPORTing into allowable multi-region tables.  We only support
importing into GLOBAL and REGIONAL BY TABLE tables as cockroachdb#61133 is blocking
IMPORT into any tables which have columns generated using a user defined
type (which covers REGIONAL BY ROW tables, as they have the crdb_region
column which is generated using the crdb_internal_region type).

This commit includes tests for both IMPORT and IMPORT INTO, as well as
cases which illustrate cockroachdb#61133 for non-multi-region tables.

One thing of note is the removing of the call to
ValidateTableLocalityConfig from NewTableDesc.  This is intentional, and
was required, as the call to NewTableDesc in IMPORT doesn't have the
necessary infrastructure setup (namely, a valid transaction and the
EvalContext Codec) to validate the LocalityConfig at that time. That
being said, IMPORT does perform a proper validation of the full table
descriptor before it's written to disk.

At this point there are no other calls to NewTableDesc which require
validation of the LocalityConfig.

Release note: None

Release justification: Fixes bug in interaction between existing
functionality and new multi-region feature.
  • Loading branch information
ajstorm committed Mar 2, 2021
1 parent 9bf97ac commit 7e80760
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 25 deletions.
17 changes: 14 additions & 3 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ func WriteDescriptors(
table.GetID(), table)
}
}

// If the table descriptor is being written to a multi-region database and
// the table does not have a locality config setup, set one up here. The
// table's locality config will be set to the default locality - REGIONAL
Expand All @@ -377,8 +376,20 @@ func WriteDescriptors(
if err != nil {
return err
}
if dbDesc.GetRegionConfig() != nil && table.GetLocalityConfig() == nil {
table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName)
if dbDesc.GetRegionConfig() != nil {
if table.GetLocalityConfig() == nil {
table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName)
}
} else {
// If the database is not multi-region enabled, ensure that we don't
// write any multi-region table descriptors into it.
if table.GetLocalityConfig() != nil {
return pgerror.Newf(pgcode.FeatureNotSupported,
"cannot write descriptor for multi-region table %s into non-multi-region database %s",
table.GetName(),
dbDesc.GetName(),
)
}
}

if err := descsCol.WriteDescToBatch(
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ go_test(
"//pkg/blobs",
"//pkg/ccl/backupccl",
"//pkg/ccl/kvccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/multiregionccl/multiregionccltestutils",
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/ccl/workloadccl/format",
Expand Down
11 changes: 11 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,14 @@ func importPlanHook(
)
}
}
if create.Locality != nil &&
create.Locality.LocalityLevel == tree.LocalityLevelRow {
return unimplemented.NewWithIssueDetailf(
61133,
"import.regional-by-row",
"IMPORT to REGIONAL BY ROW table not supported",
)
}
tbl, err := MakeSimpleTableDescriptor(
ctx, p.SemaCtx(), p.ExecCfg().Settings, create, parentID, parentSchemaID, defaultCSVTableID, NoFKs, walltime)
if err != nil {
Expand Down Expand Up @@ -1209,6 +1217,9 @@ func prepareExistingTableDescForIngestion(
if len(desc.Mutations) > 0 {
return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String())
}
if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil {
return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported")
}

// Note that desc is just used to verify that the version matches.
importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn)
Expand Down
140 changes: 135 additions & 5 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/blobs"
"github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -1214,6 +1217,7 @@ func TestImportUserDefinedTypes(t *testing.T) {
intoCols string
verifyQuery string
expected [][]string
errString string
}{
// Test CSV imports.
{
Expand Down Expand Up @@ -1251,6 +1255,14 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test table with default value
{
create: "a greeting, b greeting default 'hi'",
intoCols: "a, b",
typ: "PGCOPY",
contents: "hello\nhi\thi\n",
errString: "type OID 100052 does not exist",
},
}

// Test IMPORT INTO.
Expand All @@ -1263,11 +1275,18 @@ func TestImportUserDefinedTypes(t *testing.T) {
require.Equal(t, len(test.contents), n)
// Run the import statement.
sqlDB.Exec(t, fmt.Sprintf("CREATE TABLE t (%s)", test.create))
sqlDB.Exec(t,
fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ),
fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name())))
// Ensure that the table data is as we expect.
sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected)

importStmt := fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ)
importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))

if test.errString == "" {
sqlDB.Exec(t, importStmt, importArgs)
// Ensure that the table data is as we expect.
sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected)
} else {
sqlDB.ExpectErr(t, test.errString, importStmt, importArgs)
}

// Clean up after the test.
sqlDB.Exec(t, "DROP TABLE t")
}
Expand Down Expand Up @@ -6411,6 +6430,117 @@ func TestImportAvro(t *testing.T) {
})
}

func TestImportMultiRegion(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

baseDir := filepath.Join("testdata", "avro")
_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 1 /* numServers */, base.TestingKnobs{}, &baseDir,
)
defer cleanup()

_, err := sqlDB.Exec(`SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`)
require.NoError(t, err)

// Create the databases
_, err = sqlDB.Exec(`CREATE DATABASE foo`)
require.NoError(t, err)

_, err = sqlDB.Exec(`CREATE DATABASE multi_region PRIMARY REGION "us-east1"`)
require.NoError(t, err)

simpleOcf := fmt.Sprintf("nodelocal://0/%s", "simple.ocf")

// Table schemas for USING
tableSchemaMR := fmt.Sprintf("nodelocal://0/%s", "simple-schema-multi-region.sql")
tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/%s",
"simple-schema-multi-region-regional-by-row.sql")

tests := []struct {
name string
db string
table string
sql string
create string
args []interface{}
errString string
}{
{
name: "import-create-using-multi-region-to-non-multi-region-database",
db: "foo",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMR, simpleOcf},
errString: "cannot write descriptor for multi-region table",
},
{
name: "import-create-using-multi-region-regional-by-table-to-multi-region-database",
db: "multi_region",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMR, simpleOcf},
},
{
name: "import-create-using-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "simple",
sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)",
args: []interface{}{tableSchemaMRRegionalByRow, simpleOcf},
errString: "IMPORT to REGIONAL BY ROW table not supported",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
errString: "IMPORT into REGIONAL BY ROW table not supported",
},
{
name: "import-into-using-multi-region-global-to-multi-region-database",
db: "multi_region",
table: "mr_global",
create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL",
sql: "IMPORT INTO mr_global AVRO DATA ($1)",
args: []interface{}{simpleOcf},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err = sqlDB.Exec(fmt.Sprintf(`SET DATABASE = %q`, test.db))
require.NoError(t, err)

_, err = sqlDB.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table))
require.NoError(t, err)

if test.create != "" {
_, err = sqlDB.Exec(test.create)
require.NoError(t, err)
}

_, err = sqlDB.ExecContext(context.Background(), test.sql, test.args...)
if test.errString != "" {
testutils.IsError(err, test.errString)
} else {
require.NoError(t, err)
res := sqlDB.QueryRow(fmt.Sprintf("SELECT count(*) FROM %q", test.table))
require.NoError(t, res.Err())

var numRows int
err = res.Scan(&numRows)
require.NoError(t, err)

if numRows == 0 {
t.Error("expected some rows after import")
}
}
})
}
}

// TestImportClientDisconnect ensures that an import job can complete even if
// the client connection which started it closes. This test uses a helper
// subprocess to force a closed client connection without needing to rely
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- noinspection SqlDialectInspectionForFile

-- noinspection SqlNoDataSourceInspectionForFile

CREATE TABLE public.simple (
i integer PRIMARY KEY,
s text,
b bytea
) LOCALITY REGIONAL BY ROW;

10 changes: 10 additions & 0 deletions pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- noinspection SqlDialectInspectionForFile

-- noinspection SqlNoDataSourceInspectionForFile

CREATE TABLE public.simple (
i integer PRIMARY KEY,
s text,
b bytea
) LOCALITY REGIONAL BY TABLE;

4 changes: 2 additions & 2 deletions pkg/ccl/multiregionccl/multiregion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMultiRegionNoLicense(t *testing.T) {
defer utilccl.TestingDisableEnterprise()()

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, base.TestingKnobs{},
t, 3 /* numServers */, base.TestingKnobs{}, nil, /* baseDir */
)
defer cleanup()

Expand All @@ -55,7 +55,7 @@ func TestMultiRegionAfterEnterpriseDisabled(t *testing.T) {
skip.UnderRace(t, "#61163")

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, base.TestingKnobs{},
t, 3 /* numServers */, base.TestingKnobs{}, nil, /* baseDir */
)
defer cleanup()

Expand Down
10 changes: 8 additions & 2 deletions pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// of nodes and the provided testing knobs applied to each of the nodes. Every
// node is placed in its own locality, named "us-east1", "us-east2", and so on.
func TestingCreateMultiRegionCluster(
t *testing.T, numServers int, knobs base.TestingKnobs,
t *testing.T, numServers int, knobs base.TestingKnobs, baseDir *string,
) (serverutils.TestClusterInterface, *gosql.DB, func()) {
serverArgs := make(map[int]base.TestServerArgs)
regionNames := make([]string, numServers)
Expand All @@ -32,9 +32,15 @@ func TestingCreateMultiRegionCluster(
regionNames[i] = fmt.Sprintf("us-east%d", i+1)
}

b := ""
if baseDir != nil {
b = *baseDir
}

for i := 0; i < numServers; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: knobs,
Knobs: knobs,
ExternalIODir: b,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionNames[i]}},
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/multiregionccl/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) {
}

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 2 /* numServers */, knobs,
t, 2 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

Expand Down Expand Up @@ -143,7 +143,7 @@ func TestDroppingPrimaryRegionAsyncJobFailure(t *testing.T) {
}

_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 1 /* numServers */, knobs,
t, 1 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

Expand Down Expand Up @@ -201,7 +201,7 @@ func TestRollbackDuringAddDropRegionAsyncJobFailure(t *testing.T) {

// Setup.
_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, knobs,
t, 3 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()
_, err := sqlDB.Exec(`CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2"`)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/multiregionccl/regional_by_row_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func TestRepartitionFailureRollback(t *testing.T) {
},
}
_, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, knobs,
t, 3 /* numServers */, knobs, nil, /* baseDir */
)
defer cleanup()

Expand Down
13 changes: 4 additions & 9 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ func (n *createTableNode) startExec(params runParams) error {
return err
}

if err := validateDescriptor(params.ctx, params.p, desc); err != nil {
return err
}

if desc.LocalityConfig != nil {
_, dbDesc, err := params.p.Descriptors().GetImmutableDatabaseByID(
params.ctx,
Expand Down Expand Up @@ -446,10 +450,6 @@ func (n *createTableNode) startExec(params runParams) error {
}
}

if err := validateDescriptor(params.ctx, params.p, desc); err != nil {
return err
}

// Log Create Table event. This is an auditable log event and is
// recorded in the same transaction as the table descriptor update.
if err := params.p.logEvent(params.ctx,
Expand Down Expand Up @@ -2319,11 +2319,6 @@ func NewTableDesc(
} else {
return nil, errors.Newf("unknown locality level: %v", n.Locality.LocalityLevel)
}

bdg := catalogkv.NewOneLevelUncachedDescGetter(txn, evalCtx.Codec)
if err := catalog.ValidateSelfAndCrossReferences(ctx, bdg, &desc); err != nil {
return nil, err
}
}

return &desc, nil
Expand Down

0 comments on commit 7e80760

Please sign in to comment.