diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 47424f2e42d0..df50b98af385 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -363,7 +363,6 @@ func WriteDescriptors( table.GetID(), table) } } - // If the table descriptor is being written to a multi-region database and // the table does not have a locality config setup, set one up here. The // table's locality config will be set to the default locality - REGIONAL @@ -377,8 +376,20 @@ func WriteDescriptors( if err != nil { return err } - if dbDesc.GetRegionConfig() != nil && table.GetLocalityConfig() == nil { - table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionNotSpecifiedName) + if dbDesc.GetRegionConfig() != nil { + if table.GetLocalityConfig() == nil { + table.(*tabledesc.Mutable).SetTableLocalityRegionalByTable(tree.PrimaryRegionNotSpecifiedName) + } + } else { + // If the database is not multi-region enabled, ensure that we don't + // write any multi-region table descriptors into it. + if table.GetLocalityConfig() != nil { + return pgerror.Newf(pgcode.FeatureNotSupported, + "cannot write descriptor for multi-region table %s into non-multi-region database %s", + table.GetName(), + dbDesc.GetName(), + ) + } } if err := descsCol.WriteDescToBatch( diff --git a/pkg/ccl/importccl/BUILD.bazel b/pkg/ccl/importccl/BUILD.bazel index 4b408614b781..f62c6884da05 100644 --- a/pkg/ccl/importccl/BUILD.bazel +++ b/pkg/ccl/importccl/BUILD.bazel @@ -120,6 +120,9 @@ go_test( "//pkg/blobs", "//pkg/ccl/backupccl", "//pkg/ccl/kvccl", + "//pkg/ccl/multiregionccl", + "//pkg/ccl/multiregionccl/multiregionccltestutils", + "//pkg/ccl/partitionccl", "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/ccl/workloadccl/format", diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 712621fc02ec..a5b6b0a2b836 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -829,6 +829,14 @@ func importPlanHook( ) } } + if create.Locality != nil && + create.Locality.LocalityLevel == tree.LocalityLevelRow { + return unimplemented.NewWithIssueDetailf( + 61133, + "import.regional-by-row", + "IMPORT to REGIONAL BY ROW table not supported", + ) + } tbl, err := MakeSimpleTableDescriptor( ctx, p.SemaCtx(), p.ExecCfg().Settings, create, parentID, parentSchemaID, defaultCSVTableID, NoFKs, walltime) if err != nil { @@ -1209,6 +1217,9 @@ func prepareExistingTableDescForIngestion( if len(desc.Mutations) > 0 { return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String()) } + if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil { + return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported") + } // Note that desc is just used to verify that the version matches. importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 3ea368a95a9e..762923518c47 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -31,6 +31,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/blobs" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" + "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils" + _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" @@ -1214,6 +1217,7 @@ func TestImportUserDefinedTypes(t *testing.T) { intoCols string verifyQuery string expected [][]string + errString string }{ // Test CSV imports. { @@ -1251,6 +1255,14 @@ func TestImportUserDefinedTypes(t *testing.T) { verifyQuery: "SELECT * FROM t ORDER BY a", expected: [][]string{{"hello", "hello"}, {"hi", "hi"}}, }, + // Test table with default value + { + create: "a greeting, b greeting default 'hi'", + intoCols: "a, b", + typ: "PGCOPY", + contents: "hello\nhi\thi\n", + errString: "type OID 100052 does not exist", + }, } // Test IMPORT INTO. @@ -1263,11 +1275,18 @@ func TestImportUserDefinedTypes(t *testing.T) { require.Equal(t, len(test.contents), n) // Run the import statement. sqlDB.Exec(t, fmt.Sprintf("CREATE TABLE t (%s)", test.create)) - sqlDB.Exec(t, - fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ), - fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))) - // Ensure that the table data is as we expect. - sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected) + + importStmt := fmt.Sprintf("IMPORT INTO t (%s) %s DATA ($1)", test.intoCols, test.typ) + importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name())) + + if test.errString == "" { + sqlDB.Exec(t, importStmt, importArgs) + // Ensure that the table data is as we expect. + sqlDB.CheckQueryResults(t, test.verifyQuery, test.expected) + } else { + sqlDB.ExpectErr(t, test.errString, importStmt, importArgs) + } + // Clean up after the test. sqlDB.Exec(t, "DROP TABLE t") } @@ -6455,6 +6474,117 @@ func TestImportAvro(t *testing.T) { }) } +func TestImportMultiRegion(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + baseDir := filepath.Join("testdata", "avro") + _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( + t, 1 /* numServers */, base.TestingKnobs{}, &baseDir, + ) + defer cleanup() + + _, err := sqlDB.Exec(`SET CLUSTER SETTING kv.bulk_ingest.batch_size = '10KB'`) + require.NoError(t, err) + + // Create the databases + _, err = sqlDB.Exec(`CREATE DATABASE foo`) + require.NoError(t, err) + + _, err = sqlDB.Exec(`CREATE DATABASE multi_region PRIMARY REGION "us-east1"`) + require.NoError(t, err) + + simpleOcf := fmt.Sprintf("nodelocal://0/%s", "simple.ocf") + + // Table schemas for USING + tableSchemaMR := fmt.Sprintf("nodelocal://0/%s", "simple-schema-multi-region.sql") + tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/%s", + "simple-schema-multi-region-regional-by-row.sql") + + tests := []struct { + name string + db string + table string + sql string + create string + args []interface{} + errString string + }{ + { + name: "import-create-using-multi-region-to-non-multi-region-database", + db: "foo", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMR, simpleOcf}, + errString: "cannot write descriptor for multi-region table", + }, + { + name: "import-create-using-multi-region-regional-by-table-to-multi-region-database", + db: "multi_region", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMR, simpleOcf}, + }, + { + name: "import-create-using-multi-region-regional-by-row-to-multi-region-database", + db: "multi_region", + table: "simple", + sql: "IMPORT TABLE simple CREATE USING $1 AVRO DATA ($2)", + args: []interface{}{tableSchemaMRRegionalByRow, simpleOcf}, + errString: "IMPORT to REGIONAL BY ROW table not supported", + }, + { + name: "import-into-multi-region-regional-by-row-to-multi-region-database", + db: "multi_region", + table: "mr_regional_by_row", + create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW", + sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)", + args: []interface{}{simpleOcf}, + errString: "IMPORT into REGIONAL BY ROW table not supported", + }, + { + name: "import-into-using-multi-region-global-to-multi-region-database", + db: "multi_region", + table: "mr_global", + create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL", + sql: "IMPORT INTO mr_global AVRO DATA ($1)", + args: []interface{}{simpleOcf}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, err = sqlDB.Exec(fmt.Sprintf(`SET DATABASE = %q`, test.db)) + require.NoError(t, err) + + _, err = sqlDB.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table)) + require.NoError(t, err) + + if test.create != "" { + _, err = sqlDB.Exec(test.create) + require.NoError(t, err) + } + + _, err = sqlDB.ExecContext(context.Background(), test.sql, test.args...) + if test.errString != "" { + testutils.IsError(err, test.errString) + } else { + require.NoError(t, err) + res := sqlDB.QueryRow(fmt.Sprintf("SELECT count(*) FROM %q", test.table)) + require.NoError(t, res.Err()) + + var numRows int + err = res.Scan(&numRows) + require.NoError(t, err) + + if numRows == 0 { + t.Error("expected some rows after import") + } + } + }) + } +} + // TestImportClientDisconnect ensures that an import job can complete even if // the client connection which started it closes. This test uses a helper // subprocess to force a closed client connection without needing to rely diff --git a/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql new file mode 100644 index 000000000000..d41f27f9e3cc --- /dev/null +++ b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region-regional-by-row.sql @@ -0,0 +1,10 @@ +-- noinspection SqlDialectInspectionForFile + +-- noinspection SqlNoDataSourceInspectionForFile + +CREATE TABLE public.simple ( + i integer PRIMARY KEY, + s text, + b bytea +) LOCALITY REGIONAL BY ROW; + diff --git a/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql new file mode 100644 index 000000000000..79c3ee3ece54 --- /dev/null +++ b/pkg/ccl/importccl/testdata/avro/simple-schema-multi-region.sql @@ -0,0 +1,10 @@ +-- noinspection SqlDialectInspectionForFile + +-- noinspection SqlNoDataSourceInspectionForFile + +CREATE TABLE public.simple ( + i integer PRIMARY KEY, + s text, + b bytea +) LOCALITY REGIONAL BY TABLE; + diff --git a/pkg/ccl/multiregionccl/multiregion_test.go b/pkg/ccl/multiregionccl/multiregion_test.go index 8f46f7648380..acf0e3570c45 100644 --- a/pkg/ccl/multiregionccl/multiregion_test.go +++ b/pkg/ccl/multiregionccl/multiregion_test.go @@ -28,7 +28,7 @@ func TestMultiRegionNoLicense(t *testing.T) { defer utilccl.TestingDisableEnterprise()() _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( - t, 3 /* numServers */, base.TestingKnobs{}, + t, 3 /* numServers */, base.TestingKnobs{}, nil, /* baseDir */ ) defer cleanup() @@ -55,7 +55,7 @@ func TestMultiRegionAfterEnterpriseDisabled(t *testing.T) { skip.UnderRace(t, "#61163") _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( - t, 3 /* numServers */, base.TestingKnobs{}, + t, 3 /* numServers */, base.TestingKnobs{}, nil, /* baseDir */ ) defer cleanup() diff --git a/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go b/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go index fad1ebb16d9b..58116f8ae902 100644 --- a/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go +++ b/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go @@ -23,7 +23,7 @@ import ( // of nodes and the provided testing knobs applied to each of the nodes. Every // node is placed in its own locality, named "us-east1", "us-east2", and so on. func TestingCreateMultiRegionCluster( - t *testing.T, numServers int, knobs base.TestingKnobs, + t *testing.T, numServers int, knobs base.TestingKnobs, baseDir *string, ) (serverutils.TestClusterInterface, *gosql.DB, func()) { serverArgs := make(map[int]base.TestServerArgs) regionNames := make([]string, numServers) @@ -32,9 +32,15 @@ func TestingCreateMultiRegionCluster( regionNames[i] = fmt.Sprintf("us-east%d", i+1) } + b := "" + if baseDir != nil { + b = *baseDir + } + for i := 0; i < numServers; i++ { serverArgs[i] = base.TestServerArgs{ - Knobs: knobs, + Knobs: knobs, + ExternalIODir: b, Locality: roachpb.Locality{ Tiers: []roachpb.Tier{{Key: "region", Value: regionNames[i]}}, }, diff --git a/pkg/ccl/multiregionccl/region_test.go b/pkg/ccl/multiregionccl/region_test.go index 8ffc352727a0..5c129e95778f 100644 --- a/pkg/ccl/multiregionccl/region_test.go +++ b/pkg/ccl/multiregionccl/region_test.go @@ -47,7 +47,7 @@ func TestSettingPrimaryRegionAmidstDrop(t *testing.T) { } _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( - t, 2 /* numServers */, knobs, + t, 2 /* numServers */, knobs, nil, /* baseDir */ ) defer cleanup() @@ -143,7 +143,7 @@ func TestDroppingPrimaryRegionAsyncJobFailure(t *testing.T) { } _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( - t, 1 /* numServers */, knobs, + t, 1 /* numServers */, knobs, nil, /* baseDir */ ) defer cleanup() @@ -201,7 +201,7 @@ func TestRollbackDuringAddDropRegionAsyncJobFailure(t *testing.T) { // Setup. _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( - t, 3 /* numServers */, knobs, + t, 3 /* numServers */, knobs, nil, /* baseDir */ ) defer cleanup() _, err := sqlDB.Exec(`CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2"`) diff --git a/pkg/ccl/multiregionccl/regional_by_row_test.go b/pkg/ccl/multiregionccl/regional_by_row_test.go index 024bfdea4fe4..9015f1d39003 100644 --- a/pkg/ccl/multiregionccl/regional_by_row_test.go +++ b/pkg/ccl/multiregionccl/regional_by_row_test.go @@ -504,7 +504,7 @@ func TestRepartitionFailureRollback(t *testing.T) { }, } _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster( - t, 3 /* numServers */, knobs, + t, 3 /* numServers */, knobs, nil, /* baseDir */ ) defer cleanup() diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index c721697f128c..9ae07509154b 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -406,6 +406,10 @@ func (n *createTableNode) startExec(params runParams) error { return err } + if err := validateDescriptor(params.ctx, params.p, desc); err != nil { + return err + } + if desc.LocalityConfig != nil { _, dbDesc, err := params.p.Descriptors().GetImmutableDatabaseByID( params.ctx, @@ -446,10 +450,6 @@ func (n *createTableNode) startExec(params runParams) error { } } - if err := validateDescriptor(params.ctx, params.p, desc); err != nil { - return err - } - // Log Create Table event. This is an auditable log event and is // recorded in the same transaction as the table descriptor update. if err := params.p.logEvent(params.ctx, @@ -2335,11 +2335,6 @@ func NewTableDesc( } else { return nil, errors.Newf("unknown locality level: %v", n.Locality.LocalityLevel) } - - bdg := catalogkv.NewOneLevelUncachedDescGetter(txn, evalCtx.Codec) - if err := catalog.ValidateSelfAndCrossReferences(ctx, bdg, &desc); err != nil { - return nil, err - } } return &desc, nil