Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: add support for IMPORT INTO RBR table #69903

Merged
merged 1 commit into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 29 additions & 13 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,24 @@ func importPlanHook(
}
}

// Store the primary region of the database being imported into. This is
// used during job execution to evaluate certain default expressions and
// computed columns such as `gateway_region`.
var databasePrimaryRegion descpb.RegionName
if db.IsMultiRegion() {
if err := sql.DescsTxn(ctx, p.ExecCfg(), func(ctx context.Context, txn *kv.Txn,
descsCol *descs.Collection) error {
regionConfig, err := sql.SynthesizeRegionConfig(ctx, txn, db.GetID(), descsCol)
if err != nil {
return err
}
databasePrimaryRegion = regionConfig.PrimaryRegion()
return nil
}); err != nil {
return errors.Wrap(err, "failed to resolve region config for multi region database")
}
}

telemetry.CountBucketed("import.files", int64(len(files)))

// Record telemetry for userfile being used as the import target.
Expand Down Expand Up @@ -1002,16 +1020,17 @@ func importPlanHook(
// StartableJob which we attached to the connExecutor somehow.

importDetails := jobspb.ImportDetails{
URIs: files,
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
DefaultIntSize: p.SessionData().DefaultIntSize,
URIs: files,
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
ParseBundleSchema: importStmt.Bundle,
DefaultIntSize: p.SessionData().DefaultIntSize,
DatabasePrimaryRegion: databasePrimaryRegion,
}

jr := jobs.Record{
Expand Down Expand Up @@ -1295,9 +1314,6 @@ func prepareExistingTableDescForIngestion(
if len(desc.Mutations) > 0 {
return nil, errors.Errorf("cannot IMPORT INTO a table with schema changes in progress -- try again later (pending mutation %s)", desc.Mutations[0].String())
}
if desc.LocalityConfig != nil && desc.LocalityConfig.GetRegionalByRow() != nil {
return nil, unimplemented.NewWithIssueDetailf(61133, "import.regional-by-row", "IMPORT into REGIONAL BY ROW table not supported")
}

// Note that desc is just used to verify that the version matches.
importing, err := descsCol.GetMutableTableVersionByID(ctx, desc.ID, txn)
Expand Down
43 changes: 37 additions & 6 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6429,6 +6429,14 @@ func TestImportMultiRegion(t *testing.T) {

simpleOcf := fmt.Sprintf("nodelocal://0/avro/%s", "simple.ocf")

var data string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
_, _ = w.Write([]byte(data))
}
}))
defer srv.Close()

// Table schemas for USING
tableSchemaMR := fmt.Sprintf("nodelocal://0/avro/%s", "simple-schema-multi-region.sql")
tableSchemaMRRegionalByRow := fmt.Sprintf("nodelocal://0/avro/%s",
Expand Down Expand Up @@ -6498,6 +6506,7 @@ DROP VIEW IF EXISTS v`,
create string
args []interface{}
errString string
data string
}{
{
name: "import-create-using-multi-region-to-non-multi-region-database",
Expand All @@ -6523,16 +6532,34 @@ DROP VIEW IF EXISTS v`,
errString: "IMPORT to REGIONAL BY ROW table not supported",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
name: "import-into-multi-region-regional-by-row-default-col-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-east1\n",
},
{
name: "import-into-multi-region-regional-by-row-to-multi-region-database-wrong-value",
db: "multi_region",
table: "mr_regional_by_row",
create: "CREATE TABLE mr_regional_by_row (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY REGIONAL BY ROW",
sql: "IMPORT INTO mr_regional_by_row AVRO DATA ($1)",
args: []interface{}{simpleOcf},
errString: "IMPORT into REGIONAL BY ROW table not supported",
sql: "IMPORT INTO mr_regional_by_row (i, s, b, crdb_region) CSV DATA ($1)",
args: []interface{}{srv.URL},
data: "1,\"foo\",NULL,us-west1\n",
errString: "invalid input value for enum crdb_internal_region",
},
{
name: "import-into-using-multi-region-global-to-multi-region-database",
name: "import-into-multi-region-global-to-multi-region-database",
db: "multi_region",
table: "mr_global",
create: "CREATE TABLE mr_global (i INT8 PRIMARY KEY, s text, b bytea) LOCALITY GLOBAL",
Expand All @@ -6549,14 +6576,18 @@ DROP VIEW IF EXISTS v`,
_, err = sqlDB.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %q CASCADE", test.table))
require.NoError(t, err)

if test.data != "" {
data = test.data
}

if test.create != "" {
_, err = sqlDB.Exec(test.create)
require.NoError(t, err)
}

_, err = sqlDB.ExecContext(context.Background(), test.sql, test.args...)
if test.errString != "" {
testutils.IsError(err, test.errString)
require.True(t, testutils.IsError(err, test.errString))
} else {
require.NoError(t, err)
res := sqlDB.QueryRow(fmt.Sprintf("SELECT count(*) FROM %q", test.table))
Expand Down
31 changes: 28 additions & 3 deletions pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func MakeSimpleTableDescriptor(
evalCtx := tree.EvalContext{
Context: ctx,
Sequence: &importSequenceOperators{},
Regions: &importRegionOperator{},
Regions: makeImportRegionOperator(""),
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
ClientNoticeSender: &faketreeeval.DummyClientNoticeSender{},
Settings: st,
Expand Down Expand Up @@ -264,13 +264,38 @@ var (
)

// Implements the tree.RegionOperator interface.
type importRegionOperator struct{}
type importRegionOperator struct {
primaryRegion descpb.RegionName
}

func makeImportRegionOperator(primaryRegion descpb.RegionName) *importRegionOperator {
return &importRegionOperator{primaryRegion: primaryRegion}
}

// importDatabaseRegionConfig is a stripped down version of
// multiregion.RegionConfig that is used by import.
type importDatabaseRegionConfig struct {
primaryRegion descpb.RegionName
}

// IsValidRegionNameString implements the tree.DatabaseRegionConfig interface.
func (i importDatabaseRegionConfig) IsValidRegionNameString(_ string) bool {
// Unimplemented.
return false
}

// PrimaryRegionString implements the tree.DatabaseRegionConfig interface.
func (i importDatabaseRegionConfig) PrimaryRegionString() string {
return string(i.primaryRegion)
}

var _ tree.DatabaseRegionConfig = &importDatabaseRegionConfig{}

// CurrentDatabaseRegionConfig is part of the tree.EvalDatabase interface.
func (so *importRegionOperator) CurrentDatabaseRegionConfig(
_ context.Context,
) (tree.DatabaseRegionConfig, error) {
return nil, errors.WithStack(errRegionOperator)
return importDatabaseRegionConfig{primaryRegion: so.primaryRegion}, nil
}

// ValidateAllMultiRegionZoneConfigsInCurrentDatabase is part of the tree.EvalDatabase interface.
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/read_import_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func runImport(
// TODO(adityamaru): Should we just plumb the flowCtx instead of this
// assignment.
evalCtx.DB = flowCtx.Cfg.DB
evalCtx.Regions = makeImportRegionOperator(spec.DatabasePrimaryRegion)
semaCtx := tree.MakeSemaContext()
semaCtx.TypeResolver = importResolver
conv, err := makeInputConverter(ctx, &semaCtx, spec, evalCtx, kvCh, seqChunkProvider)
Expand Down
Loading