From 175725433dde778604a8eacc6049c1b31b3591b5 Mon Sep 17 00:00:00 2001 From: Adam Storm Date: Fri, 26 Feb 2021 10:24:01 -0500 Subject: [PATCH] multi-region: Unblock IMPORT into allowable multi-region tables Unblock IMPORTing into allowable multi-region tables. We only support importing into GLOBAL and REGIONAL BY TABLE tables as #61133 is blocking IMPORT into any tables which have columns generated using a user defined type (which covers REGIONAL BY ROW tables, as they have the crdb_region column which is generated using the crdb_internal_region type). This commit includes tests for both IMPORT and IMPORT INTO, as well as cases which illustrate #61133 for non-multi-region tables. Release note: None Release justification: Fixes bug in interaction between existing functionality and new multi-region feature. --- pkg/ccl/backupccl/restore_job.go | 17 +- pkg/ccl/importccl/BUILD.bazel | 2 + pkg/ccl/importccl/import_stmt.go | 11 ++ pkg/ccl/importccl/import_stmt_test.go | 173 +++++++++++++++++- ...le-schema-multi-region-regional-by-row.sql | 10 + .../avro/simple-schema-multi-region.sql | 10 + pkg/sql/create_table.go | 11 +- 7 files changed, 220 insertions(+), 14 deletions(-) create mode 100644 pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql create mode 100644 pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 45ea9db5843d..75911e953b29 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -363,7 +363,6 @@ func WriteDescriptors( table.GetID(), table) } } - // If the table descriptor is being written to a multi-region database and // the table does not have a locality config setup, set one up here. The // table's locality config will be set to the default locality - REGIONAL @@ -377,8 +376,20 @@ func WriteDescriptors( if err != nil { return err } - if dbDesc.GetRegionConfig() != nil && table.GetLocalityConfig() == nil { - table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName) + if dbDesc.GetRegionConfig() != nil { + if table.GetLocalityConfig() == nil { + table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionLocalityName) + } + } else { + // If the database is not multi-region enabled, ensure that we don't + // write any multi-region table descriptors into it. + if table.GetLocalityConfig() != nil { + return pgerror.Newf(pgcode.FeatureNotSupported, + "cannot write descriptor for multi-region table %d into non-multi-region database %d", + table.GetID(), + table.GetParentID(), + ) + } } if err := descsCol.WriteDescToBatch( diff --git a/pkg/ccl/importccl/BUILD.bazel b/pkg/ccl/importccl/BUILD.bazel index 4b408614b781..eacba32be9bf 100644 --- a/pkg/ccl/importccl/BUILD.bazel +++ b/pkg/ccl/importccl/BUILD.bazel @@ -120,6 +120,8 @@ go_test( "//pkg/blobs", "//pkg/ccl/backupccl", "//pkg/ccl/kvccl", + "//pkg/ccl/multiregionccl", + "//pkg/ccl/partitionccl", "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/ccl/workloadccl/format", diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index de8a338aa83a..04e1881f289b 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -829,6 +829,14 @@ func importPlanHook( ) } } + if create.Locality != nil && + create.Locality.LocalityLevel == tree.LocalityLevelRow { + return unimplemented.NewWithIssueDetailf( + 61133, + "import.regional-by-row", + "IMPORT to REGIONAL BY ROW table not supported", + ) + } tbl, err := MakeSimpleTableDescriptor( ctx, p.SemaCtx(), p.ExecCfg().Settings, create, parentID, parentSchemaID, defaultCSVTableID, NoFKs, walltime) if err != nil { @@ -1209,6 +1217,9 @@ func prepareExistingTableDescForIngestion( if len(desc.Mutations) > 0 { return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String()) } + if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil { + return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported") + } // Note that desc is just used to verify that the version matches. importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 004af1726b93..2241f52ec2b5 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -31,6 +31,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -1214,6 +1216,8 @@ func TestImportUserDefinedTypes(t *testing.T) { intoCols string verifyQuery string expected [][]string + err bool + errString string }{ // Test CSV imports. { @@ -1251,6 +1255,15 @@ func TestImportUserDefinedTypes(t *testing.T) { verifyQuery: "SELECT * FROM t ORDER BY a", expected: [][]string{{"hello", "hello"}, {"hi", "hi"}}, }, + // Test table with default value + { + create: "a greeting, b greeting default 'hi'", + intoCols: "a, b", + typ: "PGCOPY", + contents: "hello\nhi\thi\n", + err: true, + errString: "type OID 100052 does not exist", + }, } // Test IMPORT INTO. @@ -1263,11 +1276,18 @@ func TestImportUserDefinedTypes(t *testing.T) { require.Equal(t, len(test.contents), n) // Run the import statement. sqlDB.Exec(t, fmt.Sprintf("CREATE TABLE t (%s)", test.create)) - sqlDB.Exec(t, - fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ), - fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))) - // Ensure that the table data is as we expect. - sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected) + + importStmt := fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ) + importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name())) + + if !test.err { + sqlDB.Exec(t, importStmt, importArgs) + // Ensure that the table data is as we expect. + sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected) + } else { + sqlDB.ExpectErr(t, test.errString, importStmt, importArgs) + } + // Clean up after the test. sqlDB.Exec(t, "DROP TABLE t") } @@ -6411,6 +6431,149 @@ func TestImportAvro(t *testing.T) { }) } +func TestImportMultiRegion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const ( + nodes = 3 + ) + ctx := context.Background() + baseDir := filepath.Join("testdata", "avro") + clusterArgs := base.TestClusterArgs{ + ServerArgsPerNode: map[int]base.TestServerArgs{}, + } + + // Create a multi-region cluster + clusterArgs.ServerArgsPerNode[0] = base.TestServerArgs{ + ExternalIODir: baseDir, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "ap-southeast-2"}, + {Key: "availability-zone", Value: "ap-az1"}, + }, + }, + } + clusterArgs.ServerArgsPerNode[1] = base.TestServerArgs{ + ExternalIODir: baseDir, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "ca-central-1"}, + {Key: "availability-zone", Value: "ca-az1"}, + }, + }, + } + clusterArgs.ServerArgsPerNode[2] = base.TestServerArgs{ + ExternalIODir: baseDir, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: "us-east-1"}, + {Key: "availability-zone", Value: "us-az1"}, + }, + }, + } + + tc := testcluster.StartTestCluster(t, nodes, clusterArgs) + defer tc.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + sqlDB.Exec(t, `SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`) + + // Create the databases + sqlDB.Exec(t, `CREATE DATABASE foo`) + sqlDB.Exec(t, `CREATE DATABASE multi_region PRIMARY REGION "ca-central-1" REGIONS + "ap-southeast-2", "us-east-1"`) + + simpleOcf := fmt.Sprintf("nodelocal://0/%s", "simple.ocf") + + // Table schemas for USING + tableSchemaMR := fmt.Sprintf("nodelocal://0/%s", "simple-schema-multi-region.sql") + tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/%s", + "simple-schema-multi-region-regional-by-row.sql") + + tests := []struct { + name string + db string + table string + sql string + create string + args []interface{} + err bool + errString string + }{ + { + name: "import-create-using-multi-region-to-non-multi-region-database", + db: "foo", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMR, simpleOcf}, + err: true, + errString: "cannot write descriptor for multi-region table", + }, + { + name: "import-create-using-multi-region-regional-by-table-to-multi-region-database", + db: "multi_region", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMR, simpleOcf}, + }, + { + name: "import-create-using-multi-region-regional-by-row-to-multi-region-database", + db: "multi_region", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMRRegionalByRow, simpleOcf}, + err: true, + errString: "IMPORT to REGIONAL BY ROW table not supported", + }, + { + name: "import-into-multi-region-regional-by-row-to-multi-region-database", + db: "multi_region", + table: "mr_regional_by_row", + create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW", + sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)", + args: []interface{}{simpleOcf}, + err: true, + errString: "IMPORT into REGIONAL BY ROW table not supported", + }, + { + name: "import-into-using-multi-region-global-to-multi-region-database", + db: "multi_region", + table: "mr_global", + create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL", + sql: "IMPORT INTO mr_global AVRO DATA ($1)", + args: []interface{}{simpleOcf}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + sqlDB.Exec(t, fmt.Sprintf(`SET DATABASE = %q`, test.db)) + sqlDB.Exec(t, fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table)) + + if test.create != "" { + sqlDB.Exec(t, test.create) + } + + if test.err { + if test.errString == "" { + t.Error(`invalid test case: expects error, but no error string provided`) + } + sqlDB.ExpectErr(t, test.errString, test.sql, test.args...) + } else { + _, err := sqlDB.DB.ExecContext(context.Background(), test.sql, test.args...) + require.NoError(t, err) + + var numRows int + sqlDB.QueryRow(t, fmt.Sprintf("SELECT count(*) FROM %q", test.table)).Scan(&numRows) + if numRows == 0 { + t.Error("expected some rows after import") + } + } + }) + } +} + // TestImportClientDisconnect ensures that an import job can complete even if // the client connection which started it closes. This test uses a helper // subprocess to force a closed client connection without needing to rely diff --git a/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql new file mode 100644 index 000000000000..d41f27f9e3cc --- /dev/null +++ b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql @@ -0,0 +1,10 @@ +-- noinspection SqlDialectInspectionForFile + +-- noinspection SqlNoDataSourceInspectionForFile + +CREATE TABLE public.simple ( + i integer PRIMARY KEY, + s text, + b bytea +) LOCALITY REGIONAL BY ROW; + diff --git a/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql new file mode 100644 index 000000000000..79c3ee3ece54 --- /dev/null +++ b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql @@ -0,0 +1,10 @@ +-- noinspection SqlDialectInspectionForFile + +-- noinspection SqlNoDataSourceInspectionForFile + +CREATE TABLE public.simple ( + i integer PRIMARY KEY, + s text, + b bytea +) LOCALITY REGIONAL BY TABLE; + diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index 09eaf63bc2d3..8ead88abf08d 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -364,7 +364,12 @@ func (n *createTableNode) startExec(params runParams) error { return err } + dg := catalogkv.NewOneLevelUncachedDescGetter(params.p.txn, params.ExecCfg().Codec) if desc.LocalityConfig != nil { + if err := desc.ValidateTableLocalityConfig(params.ctx, dg); err != nil { + return err + } + _, dbDesc, err := params.p.Descriptors().GetImmutableDatabaseByID( params.ctx, params.p.txn, @@ -404,7 +409,6 @@ func (n *createTableNode) startExec(params runParams) error { } } - dg := catalogkv.NewOneLevelUncachedDescGetter(params.p.txn, params.ExecCfg().Codec) if err := desc.Validate(params.ctx, dg); err != nil { return err } @@ -2278,11 +2282,6 @@ func NewTableDesc( } else { return nil, errors.Newf("unknown locality level: %v", n.Locality.LocalityLevel) } - - dg := catalogkv.NewOneLevelUncachedDescGetter(txn, evalCtx.Codec) - if err := desc.ValidateTableLocalityConfig(ctx, dg); err != nil { - return nil, err - } } return &desc, nil