Skip to content

Commit

Permalink
importccl: add support for IMPORT INTO RBR table
Browse files Browse the repository at this point in the history
This change overrides the `default_to_database_primary_region`
and `gateway_region` to always return the primary region of the
database of the table being imported into. This allows for
IMPORT INTO an RBR table.

To ensure that the import is idempotent across resumptions, we cache
the primary region of the database being imported into, during planning.
This information is store in the job details and flow spec to be used
when evaluating the relevant default expr/computed column

Since IMPORT is a job, it does not have an associated session data
and so it cannot rely on the planners' implementation of the regional
operator. This change also implements the relevant methods in the
`importRegionOperator` to allow resolution of the primary region
of the database being imported into.

Fixes: cockroachdb#69616

Release note (sql change): IMPORT INTO regional by row tables
is supported.

Release justification: fixes for high-priority or high-severity bugs in existing functionality
  • Loading branch information
adityamaru committed Sep 10, 2021
1 parent c1a9c8b commit 7e152cb
Show file tree
Hide file tree
Showing 10 changed files with 734 additions and 523 deletions.
42 changes: 29 additions & 13 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,24 @@ func importPlanHook(
}
}

// Store the primary region of the database being imported into. This is
// used during job execution to evaluate certain default expressions and
// computed columns such as `gateway_region`.
var databasePrimaryRegion descpb.RegionName
if db.IsMultiRegion() {
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(ctx context.Context, txn *kv.Txn,
descsCol *descs.Collection) error {
regionConfig, err := sql.SynthesizeRegionConfig(ctx, txn, db.GetID(), descsCol)
if err != nil {
return err
}
databasePrimaryRegion = regionConfig.PrimaryRegion()
return nil
}); err != nil {
return errors.Wrap(err, "failed to resolve region config for multi region database")
}
}

telemetry.CountBucketed("import.files", int64(len(files)))

// Record telemetry for userfile being used as the import target.
Expand Down Expand Up @@ -1002,16 +1020,17 @@ func importPlanHook(
// StartableJob which we attached to the connExecutor somehow.

importDetails := jobspb.ImportDetails{
URIs: files,
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
DefaultIntSize: p.SessionData().DefaultIntSize,
URIs: files,
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
DefaultIntSize: p.SessionData().DefaultIntSize,
DatabasePrimaryRegion: databasePrimaryRegion,
}

jr := jobs.Record{
Expand Down Expand Up @@ -1295,9 +1314,6 @@ func prepareExistingTableDescForIngestion(
if len(desc.Mutations) > 0 {
return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String())
}
if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil {
return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported")
}

// Note that desc is just used to verify that the version matches.
importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn)
Expand Down
33 changes: 24 additions & 9 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6429,6 +6429,14 @@ func TestImportMultiRegion(t *testing.T) {

simpleOcf := fmt.Sprintf("nodelocal://0/avro/%s", "simple.ocf")

data := "1,\"foo\",NULL,us-east1\n"
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

// Table schemas for USING
tableSchemaMR := fmt.Sprintf("nodelocal://0/avro/%s", "simple-schema-multi-region.sql")
tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/avro/%s",
Expand Down Expand Up @@ -6523,16 +6531,23 @@ DROP VIEW IF EXISTS v`,
errString: "IMPORT to REGIONAL BY ROW table not supported",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
errString: "IMPORT into REGIONAL BY ROW table not supported",
name: "import-into-multi-region-regional-by-row-default-col-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
},
{
name: "import-into-using-multi-region-global-to-multi-region-database",
name: "import-into-multi-region-global-to-multi-region-database",
db: "multi_region",
table: "mr_global",
create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL",
Expand All @@ -6556,7 +6571,7 @@ DROP VIEW IF EXISTS v`,

_, err = sqlDB.ExecContext(context.Background(), test.sql, test.args...)
if test.errString != "" {
testutils.IsError(err, test.errString)
require.True(t, testutils.IsError(err, test.errString))
} else {
require.NoError(t, err)
res := sqlDB.QueryRow(fmt.Sprintf("SELECT count(*) FROM %q", test.table))
Expand Down
29 changes: 26 additions & 3 deletions pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func MakeSimpleTableDescriptor(
evalCtx := tree.EvalContext{
Context: ctx,
Sequence: &importSequenceOperators{},
Regions: &importRegionOperator{},
Regions: makeImportRegionOperator(""),
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{},
Settings: st,
Expand Down Expand Up @@ -264,13 +264,36 @@ var (
)

// Implements the tree.RegionOperator interface.
type importRegionOperator struct{}
type importRegionOperator struct {
primaryRegion descpb.RegionName
}

func makeImportRegionOperator(primaryRegion descpb.RegionName) *importRegionOperator {
return &importRegionOperator{primaryRegion: primaryRegion}
}

type importDatabaseRegionConfig struct {
primaryRegion descpb.RegionName
}

// IsValidRegionNameString implements the tree.DatabaseRegionConfig interface.
func (i importDatabaseRegionConfig) IsValidRegionNameString(_ string) bool {
// Unimplemented.
return false
}

// PrimaryRegionString implements the tree.DatabaseRegionConfig interface.
func (i importDatabaseRegionConfig) PrimaryRegionString() string {
return string(i.primaryRegion)
}

var _ tree.DatabaseRegionConfig = &importDatabaseRegionConfig{}

// CurrentDatabaseRegionConfig is part of the tree.EvalDatabase interface.
func (so *importRegionOperator) CurrentDatabaseRegionConfig(
_ context.Context,
) (tree.DatabaseRegionConfig, error) {
return nil, errors.WithStack(errRegionOperator)
return importDatabaseRegionConfig{primaryRegion: so.primaryRegion}, nil
}

// ValidateAllMultiRegionZoneConfigsInCurrentDatabase is part of the tree.EvalDatabase interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func runImport(
// TODO(adityamaru): Should we just plumb the flowCtx instead of this
// assignment.
evalCtx.DB = flowCtx.Cfg.DB
evalCtx.Regions = makeImportRegionOperator(spec.DatabasePrimaryRegion)
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = importResolver
conv, err := makeInputConverter(ctx, &semaCtx, spec, evalCtx, kvCh, seqChunkProvider)
Expand Down
Loading

0 comments on commit 7e152cb

Please sign in to comment.