Skip to content

Commit

Permalink
importccl: add support for IMPORT INTO RBR table
Browse files Browse the repository at this point in the history
This change overrides the `default_to_database_primary_region`
and `gateway_region` to always return the primary region of the
database of the table being imported into. This allows for
IMPORT INTO an RBR table.

Since IMPORT is a job, it does not have an associated session data
and so it cannot rely on the planners' implementation of the regional
operator. This change also implements the relevant methods in the
`importRegionOperator` to allow resolution of the primary region
of the database being imported into.

Fixes: cockroachdb#69616

Release note (sql change): IMPORT INTO regional by row tables
is supported.

Release justification: fixes for high-priority or high-severity bugs in existing functionality
  • Loading branch information
adityamaru committed Sep 10, 2021
1 parent c1a9c8b commit d0d70dc
Show file tree
Hide file tree
Showing 10 changed files with 736 additions and 523 deletions.
42 changes: 29 additions & 13 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,24 @@ func importPlanHook(
}
}

// Store the primary region of the database being imported into. This is
// used during job execution to evaluate certain default expressions and
// computed columns such as `gateway_region`.
var databasePrimaryRegion descpb.RegionName
if db.IsMultiRegion() {
if err := p.ExecCfg().CollectionFactory.Txn(ctx, p.ExecCfg().InternalExecutor, p.ExecCfg().DB,
func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
regionConfig, err := sql.SynthesizeRegionConfig(ctx, txn, db.GetID(), descsCol)
if err != nil {
return err
}
databasePrimaryRegion = regionConfig.PrimaryRegion()
return nil
}); err != nil {
return errors.Wrap(err, "failed to resolve region config for multi region database")
}
}

telemetry.CountBucketed("import.files", int64(len(files)))

// Record telemetry for userfile being used as the import target.
Expand Down Expand Up @@ -1002,16 +1020,17 @@ func importPlanHook(
// StartableJob which we attached to the connExecutor somehow.

importDetails := jobspb.ImportDetails{
URIs: files,
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
DefaultIntSize: p.SessionData().DefaultIntSize,
URIs: files,
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
DefaultIntSize: p.SessionData().DefaultIntSize,
DatabasePrimaryRegion: databasePrimaryRegion,
}

jr := jobs.Record{
Expand Down Expand Up @@ -1295,9 +1314,6 @@ func prepareExistingTableDescForIngestion(
if len(desc.Mutations) > 0 {
return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String())
}
if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil {
return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported")
}

// Note that desc is just used to verify that the version matches.
importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn)
Expand Down
33 changes: 24 additions & 9 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6429,6 +6429,14 @@ func TestImportMultiRegion(t *testing.T) {

simpleOcf := fmt.Sprintf("nodelocal://0/avro/%s", "simple.ocf")

data := "1,\"foo\",NULL,us-east1\n"
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

// Table schemas for USING
tableSchemaMR := fmt.Sprintf("nodelocal://0/avro/%s", "simple-schema-multi-region.sql")
tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/avro/%s",
Expand Down Expand Up @@ -6523,16 +6531,23 @@ DROP VIEW IF EXISTS v`,
errString: "IMPORT to REGIONAL BY ROW table not supported",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
errString: "IMPORT into REGIONAL BY ROW table not supported",
name: "import-into-multi-region-regional-by-row-default-col-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
},
{
name: "import-into-using-multi-region-global-to-multi-region-database",
name: "import-into-multi-region-global-to-multi-region-database",
db: "multi_region",
table: "mr_global",
create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL",
Expand All @@ -6556,7 +6571,7 @@ DROP VIEW IF EXISTS v`,

_, err = sqlDB.ExecContext(context.Background(), test.sql, test.args...)
if test.errString != "" {
testutils.IsError(err, test.errString)
require.True(t, testutils.IsError(err, test.errString))
} else {
require.NoError(t, err)
res := sqlDB.QueryRow(fmt.Sprintf("SELECT count(*) FROM %q", test.table))
Expand Down
29 changes: 26 additions & 3 deletions pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func MakeSimpleTableDescriptor(
evalCtx := tree.EvalContext{
Context: ctx,
Sequence: &importSequenceOperators{},
Regions: &importRegionOperator{},
Regions: makeImportRegionOperator(""),
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{},
Settings: st,
Expand Down Expand Up @@ -264,13 +264,36 @@ var (
)

// Implements the tree.RegionOperator interface.
type importRegionOperator struct{}
type importRegionOperator struct {
primaryRegion descpb.RegionName
}

func makeImportRegionOperator(primaryRegion descpb.RegionName) *importRegionOperator {
return &importRegionOperator{primaryRegion: primaryRegion}
}

type importDatabaseRegionConfig struct {
primaryRegion descpb.RegionName
}

// IsValidRegionNameString implements the tree.DatabaseRegionConfig interface.
func (i importDatabaseRegionConfig) IsValidRegionNameString(_ string) bool {
// Unimplemented.
return false
}

// PrimaryRegionString implements the tree.DatabaseRegionConfig interface.
func (i importDatabaseRegionConfig) PrimaryRegionString() string {
return string(i.primaryRegion)
}

var _ tree.DatabaseRegionConfig = &importDatabaseRegionConfig{}

// CurrentDatabaseRegionConfig is part of the tree.EvalDatabase interface.
func (so *importRegionOperator) CurrentDatabaseRegionConfig(
_ context.Context,
) (tree.DatabaseRegionConfig, error) {
return nil, errors.WithStack(errRegionOperator)
return importDatabaseRegionConfig{primaryRegion: so.primaryRegion}, nil
}

// ValidateAllMultiRegionZoneConfigsInCurrentDatabase is part of the tree.EvalDatabase interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func runImport(
// TODO(adityamaru): Should we just plumb the flowCtx instead of this
// assignment.
evalCtx.DB = flowCtx.Cfg.DB
evalCtx.Regions = makeImportRegionOperator(*spec.DatabasePrimaryRegion)
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = importResolver
conv, err := makeInputConverter(ctx, &semaCtx, spec, evalCtx, kvCh, seqChunkProvider)
Expand Down
Loading

0 comments on commit d0d70dc

Please sign in to comment.