From d6210fb760c798f84b87bffefc9c10f4fbf16109 Mon Sep 17 00:00:00 2001 From: Adam Storm Date: Fri, 26 Feb 2021 10:24:01 -0500 Subject: [PATCH] multi-region: Unblock IMPORT into allowable multi-region tables Unblock IMPORTing into allowable multi-region tables. We only support importing into GLOBAL and REGIONAL BY TABLE tables as #61133 is blocking IMPORT into any tables which have columns generated using a user defined type (which covers REGIONAL BY ROW tables, as they have the crdb_region column which is generated using the crdb_internal_region type). This commit includes tests for both IMPORT and IMPORT INTO, as well as cases which illustrate #61133 for non-multi-region tables. One thing of note is the moving of the call to ValidateTableLocalityConfig from NewTableDesc to the startExec of createTableNode. This is intentional, and was required, as the call to NewTableDesc in IMPORT doesn't have the necessary infrastructure setup (namely, a valid transaction and the EvalContext Codec) to validate the LocalityConfig at that time. That being said, IMPORT does perform a proper validation of the full table descriptor before it's written to disk. Once it was deemed safe to remove the validation from NewTableDesc, the validate had to be added to startExec of createTableNode, as the LocalityConfig is used in there to generate a zone configuration, and we want to ensure that the LocalityConfig is valid before we do that. At this point there are no other calls to NewTableDesc which require validation of the LocalityConfig. Release note: None Release justification: Fixes bug in interaction between existing functionality and new multi-region feature. --- pkg/ccl/backupccl/restore_job.go | 17 +- pkg/ccl/importccl/BUILD.bazel | 2 + pkg/ccl/importccl/import_stmt.go | 11 ++ pkg/ccl/importccl/import_stmt_test.go | 173 +++++++++++++++++- ...le-schema-multi-region-regional-by-row.sql | 10 + .../avro/simple-schema-multi-region.sql | 10 + pkg/sql/create_table.go | 11 +- 7 files changed, 220 insertions(+), 14 deletions(-) create mode 100644 pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql create mode 100644 pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 45ea9db5843d..75911e953b29 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -363,7 +363,6 @@ func WriteDescriptors( table.GetID(), table) } } - // If the table descriptor is being written to a multi-region database and // the table does not have a locality config setup, set one up here. The // table's locality config will be set to the default locality - REGIONAL @@ -377,8 +376,20 @@ func WriteDescriptors( if err != nil { return err } - if dbDesc.GetRegionConfig() != nil && table.GetLocalityConfig() == nil { - table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName) + if dbDesc.GetRegionConfig() != nil { + if table.GetLocalityConfig() == nil { + table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName) + } + } else { + // If the database is not multi-region enabled, ensure that we don't + // write any multi-region table descriptors into it. + if table.GetLocalityConfig() != nil { + return pgerror.Newf(pgcode.FeatureNotSupported, + "cannot write descriptor for multi-region table %d into non-multi-region database %d", + table.GetID(), + table.GetParentID(), + ) + } } if err := descsCol.WriteDescToBatch( diff --git a/pkg/ccl/importccl/BUILD.bazel b/pkg/ccl/importccl/BUILD.bazel index 4b408614b781..eacba32be9bf 100644 --- a/pkg/ccl/importccl/BUILD.bazel +++ b/pkg/ccl/importccl/BUILD.bazel @@ -120,6 +120,8 @@ go_test( "//pkg/blobs", "//pkg/ccl/backupccl", "//pkg/ccl/kvccl", + "//pkg/ccl/multiregionccl", + "//pkg/ccl/partitionccl", "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/ccl/workloadccl/format", diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index de8a338aa83a..04e1881f289b 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -829,6 +829,14 @@ func importPlanHook( ) } } + if create.Locality != nil && + create.Locality.LocalityLevel == tree.LocalityLevelRow { + return unimplemented.NewWithIssueDetailf( + 61133, + "import.regional-by-row", + "IMPORT to REGIONAL BY ROW table not supported", + ) + } tbl, err := MakeSimpleTableDescriptor( ctx, p.SemaCtx(), p.ExecCfg().Settings, create, parentID, parentSchemaID, defaultCSVTableID, NoFKs, walltime) if err != nil { @@ -1209,6 +1217,9 @@ func prepareExistingTableDescForIngestion( if len(desc.Mutations) > 0 { return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String()) } + if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil { + return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported") + } // Note that desc is just used to verify that the version matches. importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 004af1726b93..2241f52ec2b5 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -31,6 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -1214,6 +1216,8 @@ func TestImportUserDefinedTypes(t *testing.T) { intoCols string verifyQuery string expected [][]string + err bool + errString string }{ // Test CSV imports. { @@ -1251,6 +1255,15 @@ func TestImportUserDefinedTypes(t *testing.T) { verifyQuery: "SELECT * FROM t ORDER BY a", expected: [][]string{{"hello", "hello"}, {"hi", "hi"}}, }, + // Test table with default value + { + create: "a greeting, b greeting default 'hi'", + intoCols: "a, b", + typ: "PGCOPY", + contents: "hello\nhi\thi\n", + err: true, + errString: "type OID 100052 does not exist", + }, } // Test IMPORT INTO. @@ -1263,11 +1276,18 @@ func TestImportUserDefinedTypes(t *testing.T) { require.Equal(t, len(test.contents), n) // Run the import statement. sqlDB.Exec(t, fmt.Sprintf("CREATE TABLE t (%s)", test.create)) - sqlDB.Exec(t, - fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ), - fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))) - // Ensure that the table data is as we expect. - sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected) + + importStmt := fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ) + importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name())) + + if !test.err { + sqlDB.Exec(t, importStmt, importArgs) + // Ensure that the table data is as we expect. + sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected) + } else { + sqlDB.ExpectErr(t, test.errString, importStmt, importArgs) + } + // Clean up after the test. sqlDB.Exec(t, "DROP TABLE t") } @@ -6411,6 +6431,149 @@ func TestImportAvro(t *testing.T) { }) } +func TestImportMultiRegion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const ( + nodes = 3 + ) + ctx := context.Background() + baseDir := filepath.Join("testdata", "avro") + clusterArgs := base.TestClusterArgs{ + ServerArgsPerNode: map[int]base.TestServerArgs{}, + } + + // Create a multi-region cluster + clusterArgs.ServerArgsPerNode[0] = base.TestServerArgs{ + ExternalIODir: baseDir, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "ap-southeast-2"}, + {Key: "availability-zone", Value: "ap-az1"}, + }, + }, + } + clusterArgs.ServerArgsPerNode[1] = base.TestServerArgs{ + ExternalIODir: baseDir, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "ca-central-1"}, + {Key: "availability-zone", Value: "ca-az1"}, + }, + }, + } + clusterArgs.ServerArgsPerNode[2] = base.TestServerArgs{ + ExternalIODir: baseDir, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "us-east-1"}, + {Key: "availability-zone", Value: "us-az1"}, + }, + }, + } + + tc := testcluster.StartTestCluster(t, nodes, clusterArgs) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`) + + // Create the databases + sqlDB.Exec(t, `CREATE DATABASE foo`) + sqlDB.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "ca-central-1" REGIONS + "ap-southeast-2", "us-east-1"`) + + simpleOcf := fmt.Sprintf("nodelocal://0/%s", "simple.ocf") + + // Table schemas for USING + tableSchemaMR := fmt.Sprintf("nodelocal://0/%s", "simple-schema-multi-region.sql") + tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/%s", + "simple-schema-multi-region-regional-by-row.sql") + + tests := []struct { + name string + db string + table string + sql string + create string + args []interface{} + err bool + errString string + }{ + { + name: "import-create-using-multi-region-to-non-multi-region-database", + db: "foo", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMR, simpleOcf}, + err: true, + errString: "cannot write descriptor for multi-region table", + }, + { + name: "import-create-using-multi-region-regional-by-table-to-multi-region-database", + db: "multi_region", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMR, simpleOcf}, + }, + { + name: "import-create-using-multi-region-regional-by-row-to-multi-region-database", + db: "multi_region", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMRRegionalByRow, simpleOcf}, + err: true, + errString: "IMPORT to REGIONAL BY ROW table not supported", + }, + { + name: "import-into-multi-region-regional-by-row-to-multi-region-database", + db: "multi_region", + table: "mr_regional_by_row", + create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW", + sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)", + args: []interface{}{simpleOcf}, + err: true, + errString: "IMPORT into REGIONAL BY ROW table not supported", + }, + { + name: "import-into-using-multi-region-global-to-multi-region-database", + db: "multi_region", + table: "mr_global", + create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL", + sql: "IMPORT INTO mr_global AVRO DATA ($1)", + args: []interface{}{simpleOcf}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sqlDB.Exec(t, fmt.Sprintf(`SET DATABASE = %q`, test.db)) + sqlDB.Exec(t, fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table)) + + if test.create != "" { + sqlDB.Exec(t, test.create) + } + + if test.err { + if test.errString == "" { + t.Error(`invalid test case: expects error, but no error string provided`) + } + sqlDB.ExpectErr(t, test.errString, test.sql, test.args...) + } else { + _, err := sqlDB.DB.ExecContext(context.Background(), test.sql, test.args...) + require.NoError(t, err) + + var numRows int + sqlDB.QueryRow(t, fmt.Sprintf("SELECT count(*) FROM %q", test.table)).Scan(&numRows) + if numRows == 0 { + t.Error("expected some rows after import") + } + } + }) + } +} + // TestImportClientDisconnect ensures that an import job can complete even if // the client connection which started it closes. This test uses a helper // subprocess to force a closed client connection without needing to rely diff --git a/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql new file mode 100644 index 000000000000..d41f27f9e3cc --- /dev/null +++ b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql @@ -0,0 +1,10 @@ +-- noinspection SqlDialectInspectionForFile + +-- noinspection SqlNoDataSourceInspectionForFile + +CREATE TABLE public.simple ( + i integer PRIMARY KEY, + s text, + b bytea +) LOCALITY REGIONAL BY ROW; + diff --git a/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql new file mode 100644 index 000000000000..79c3ee3ece54 --- /dev/null +++ b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql @@ -0,0 +1,10 @@ +-- noinspection SqlDialectInspectionForFile + +-- noinspection SqlNoDataSourceInspectionForFile + +CREATE TABLE public.simple ( + i integer PRIMARY KEY, + s text, + b bytea +) LOCALITY REGIONAL BY TABLE; + diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 09eaf63bc2d3..8ead88abf08d 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -364,7 +364,12 @@ func (n *createTableNode) startExec(params runParams) error { return err } + dg := catalogkv.NewOneLevelUncachedDescGetter(params.p.txn, params.ExecCfg().Codec) if desc.LocalityConfig != nil { + if err := desc.ValidateTableLocalityConfig(params.ctx, dg); err != nil { + return err + } + _, dbDesc, err := params.p.Descriptors().GetImmutableDatabaseByID( params.ctx, params.p.txn, @@ -404,7 +409,6 @@ func (n *createTableNode) startExec(params runParams) error { } } - dg := catalogkv.NewOneLevelUncachedDescGetter(params.p.txn, params.ExecCfg().Codec) if err := desc.Validate(params.ctx, dg); err != nil { return err } @@ -2278,11 +2282,6 @@ func NewTableDesc( } else { return nil, errors.Newf("unknown locality level: %v", n.Locality.LocalityLevel) } - - dg := catalogkv.NewOneLevelUncachedDescGetter(txn, evalCtx.Codec) - if err := desc.ValidateTableLocalityConfig(ctx, dg); err != nil { - return nil, err - } } return &desc, nil