Skip to content

Commit

Permalink
importccl: add support for IMPORT INTO RBR table
Browse files Browse the repository at this point in the history
This change overrides the `default_to_database_primary_region`
and `gateway_region` to always return the primary region of the
database of the table being imported into. This allows for
IMPORT INTO an RBR table.

To ensure that the import is idempotent across resumptions, we cache
the primary region of the database being imported into, during planning.
This information is store in the job details and flow spec to be used
when evaluating the relevant default expr/computed column

Since IMPORT is a job, it does not have an associated session data
and so it cannot rely on the planners' implementation of the regional
operator. This change also implements the relevant methods in the
`importRegionOperator` to allow resolution of the primary region
of the database being imported into.

Fixes: cockroachdb#69616

Release note (sql change): IMPORT INTO regional by row tables
is supported.

Release justification: fixes for high-priority or high-severity bugs in existing functionality
  • Loading branch information
adityamaru committed Sep 15, 2021
1 parent 51b45b0 commit d20c0e7
Show file tree
Hide file tree
Showing 10 changed files with 749 additions and 520 deletions.
42 changes: 29 additions & 13 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,24 @@ func importPlanHook(
}
}

// Store the primary region of the database being imported into. This is
// used during job execution to evaluate certain default expressions and
// computed columns such as `gateway_region`.
var databasePrimaryRegion descpb.RegionName
if db.IsMultiRegion() {
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(ctx context.Context, txn *kv.Txn,
descsCol *descs.Collection) error {
regionConfig, err := sql.SynthesizeRegionConfig(ctx, txn, db.GetID(), descsCol)
if err != nil {
return err
}
databasePrimaryRegion = regionConfig.PrimaryRegion()
return nil
}); err != nil {
return errors.Wrap(err, "failed to resolve region config for multi region database")
}
}

telemetry.CountBucketed("import.files", int64(len(files)))

// Record telemetry for userfile being used as the import target.
Expand Down Expand Up @@ -1002,16 +1020,17 @@ func importPlanHook(
// StartableJob which we attached to the connExecutor somehow.

importDetails := jobspb.ImportDetails{
URIs: files,
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
DefaultIntSize: p.SessionData().DefaultIntSize,
URIs: files,
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
DefaultIntSize: p.SessionData().DefaultIntSize,
DatabasePrimaryRegion: databasePrimaryRegion,
}

jr := jobs.Record{
Expand Down Expand Up @@ -1295,9 +1314,6 @@ func prepareExistingTableDescForIngestion(
if len(desc.Mutations) > 0 {
return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String())
}
if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil {
return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported")
}

// Note that desc is just used to verify that the version matches.
importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn)
Expand Down
43 changes: 37 additions & 6 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6429,6 +6429,14 @@ func TestImportMultiRegion(t *testing.T) {

simpleOcf := fmt.Sprintf("nodelocal://0/avro/%s", "simple.ocf")

var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

// Table schemas for USING
tableSchemaMR := fmt.Sprintf("nodelocal://0/avro/%s", "simple-schema-multi-region.sql")
tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/avro/%s",
Expand Down Expand Up @@ -6498,6 +6506,7 @@ DROP VIEW IF EXISTS v`,
create string
args []interface{}
errString string
data string
}{
{
name: "import-create-using-multi-region-to-non-multi-region-database",
Expand All @@ -6523,16 +6532,34 @@ DROP VIEW IF EXISTS v`,
errString: "IMPORT to REGIONAL BY ROW table not supported",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
name: "import-into-multi-region-regional-by-row-default-col-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-east1\n",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database-wrong-value",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
errString: "IMPORT into REGIONAL BY ROW table not supported",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-west1\n",
errString: "invalid input value for enum crdb_internal_region",
},
{
name: "import-into-using-multi-region-global-to-multi-region-database",
name: "import-into-multi-region-global-to-multi-region-database",
db: "multi_region",
table: "mr_global",
create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL",
Expand All @@ -6549,14 +6576,18 @@ DROP VIEW IF EXISTS v`,
_, err = sqlDB.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table))
require.NoError(t, err)

if test.data != "" {
data = test.data
}

if test.create != "" {
_, err = sqlDB.Exec(test.create)
require.NoError(t, err)
}

_, err = sqlDB.ExecContext(context.Background(), test.sql, test.args...)
if test.errString != "" {
testutils.IsError(err, test.errString)
require.True(t, testutils.IsError(err, test.errString))
} else {
require.NoError(t, err)
res := sqlDB.QueryRow(fmt.Sprintf("SELECT count(*) FROM %q", test.table))
Expand Down
31 changes: 28 additions & 3 deletions pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func MakeSimpleTableDescriptor(
evalCtx := tree.EvalContext{
Context: ctx,
Sequence: &importSequenceOperators{},
Regions: &importRegionOperator{},
Regions: makeImportRegionOperator(""),
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{},
Settings: st,
Expand Down Expand Up @@ -264,13 +264,38 @@ var (
)

// Implements the tree.RegionOperator interface.
type importRegionOperator struct{}
type importRegionOperator struct {
primaryRegion descpb.RegionName
}

func makeImportRegionOperator(primaryRegion descpb.RegionName) *importRegionOperator {
return &importRegionOperator{primaryRegion: primaryRegion}
}

// importDatabaseRegionConfig is a stripped down version of
// multiregion.RegionConfig that is used by import.
type importDatabaseRegionConfig struct {
primaryRegion descpb.RegionName
}

// IsValidRegionNameString implements the tree.DatabaseRegionConfig interface.
func (i importDatabaseRegionConfig) IsValidRegionNameString(_ string) bool {
// Unimplemented.
return false
}

// PrimaryRegionString implements the tree.DatabaseRegionConfig interface.
func (i importDatabaseRegionConfig) PrimaryRegionString() string {
return string(i.primaryRegion)
}

var _ tree.DatabaseRegionConfig = &importDatabaseRegionConfig{}

// CurrentDatabaseRegionConfig is part of the tree.EvalDatabase interface.
func (so *importRegionOperator) CurrentDatabaseRegionConfig(
_ context.Context,
) (tree.DatabaseRegionConfig, error) {
return nil, errors.WithStack(errRegionOperator)
return importDatabaseRegionConfig{primaryRegion: so.primaryRegion}, nil
}

// ValidateAllMultiRegionZoneConfigsInCurrentDatabase is part of the tree.EvalDatabase interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func runImport(
// TODO(adityamaru): Should we just plumb the flowCtx instead of this
// assignment.
evalCtx.DB = flowCtx.Cfg.DB
evalCtx.Regions = makeImportRegionOperator(spec.DatabasePrimaryRegion)
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = importResolver
conv, err := makeInputConverter(ctx, &semaCtx, spec, evalCtx, kvCh, seqChunkProvider)
Expand Down
Loading

0 comments on commit d20c0e7

Please sign in to comment.