Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

importccl: fail IMPORT INTO on concurrent type change #69674

Merged
merged 2 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ go_library(

go_test(
name = "importccl_test",
size = "medium",
size = "large",
srcs = [
"bench_test.go",
"client_import_test.go",
Expand Down
130 changes: 114 additions & 16 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,58 @@ func addToFileFormatTelemetry(fileFormat, state string) {
telemetry.Count(fmt.Sprintf("%s.%s.%s", "import", strings.ToLower(fileFormat), state))
}

// resolveUDTsUsedByImportInto resolves all the user defined types that are
// referenced by the table being imported into.
func resolveUDTsUsedByImportInto(
ctx context.Context, p sql.PlanHookState, table *tabledesc.Mutable,
) ([]catalog.TypeDescriptor, error) {
typeDescs := make([]catalog.TypeDescriptor, 0)
var dbDesc catalog.DatabaseDescriptor
err := sql.DescsTxn(ctx, p.ExecCfg(), func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) (err error) {
_, dbDesc, err = descriptors.GetImmutableDatabaseByID(ctx, txn, table.GetParentID(),
tree.DatabaseLookupFlags{
Required: true,
AvoidCached: true,
})
if err != nil {
return err
}
typeIDs, _, err := table.GetAllReferencedTypeIDs(dbDesc,
func(id descpb.ID) (catalog.TypeDescriptor, error) {
immutDesc, err := descriptors.GetImmutableTypeByID(ctx, txn, id, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
Required: true,
AvoidCached: true,
},
})
if err != nil {
return nil, err
}
return immutDesc, nil
})
if err != nil {
return errors.Wrap(err, "resolving type descriptors")
}

for _, typeID := range typeIDs {
immutDesc, err := descriptors.GetImmutableTypeByID(ctx, txn, typeID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
Required: true,
AvoidCached: true,
},
})
if err != nil {
return err
}
typeDescs = append(typeDescs, immutDesc)
}
return err
})
return typeDescs, err
}

// importPlanHook implements sql.PlanHookFn.
func importPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
Expand Down Expand Up @@ -744,28 +796,13 @@ func importPlanHook(

var tableDetails []jobspb.ImportDetails_Table
var tableDescs []*tabledesc.Mutable // parallel with tableDetails
var typeDetails []jobspb.ImportDetails_Type
jobDesc, err := importJobDescription(p, importStmt, nil, filenamePatterns, opts)
if err != nil {
return err
}

if importStmt.Into {
// TODO(dt): this is a prototype for incremental import but there are many
// TODOs remaining before it is ready to graduate to prime-time. Some of
// them are captured in specific TODOs below, but some of the big, scary
// things to do are:
// - review planner vs txn use very carefully. We should try to get to a
// single txn used to plan the job and create it. Using the planner's
// txn today is very wrong since it will not commit until after the job
// has run, so starting a job based on reads it returned is very wrong.
// - audit every place that we resolve/lease/read table descs to be sure
// that the IMPORTING state is handled correctly. SQL lease acquisition
// is probably the easy one here since it has single read path -- the
// things that read directly like the queues or background jobs are the
// ones we'll need to really carefully look though.
// - Look at if/how cleanup/rollback works. Reconsider the cpu from the
// desc version (perhaps we should be re-reading instead?).
// - Write _a lot_ of tests.
if _, ok := allowedIntoFormats[importStmt.FileFormat]; !ok {
return errors.Newf(
"%s file format is currently unsupported by IMPORT INTO",
Expand Down Expand Up @@ -817,6 +854,21 @@ func importPlanHook(
}
}
}

{
// Resolve the UDTs used by the table being imported into.
typeDescs, err := resolveUDTsUsedByImportInto(ctx, p, found)
if err != nil {
return errors.Wrap(err, "resolving UDTs used by table being imported into")
}
if len(typeDescs) > 0 {
typeDetails = make([]jobspb.ImportDetails_Type, 0, len(typeDescs))
}
for _, typeDesc := range typeDescs {
typeDetails = append(typeDetails, jobspb.ImportDetails_Type{Desc: typeDesc.TypeDesc()})
}
}

tableDetails = []jobspb.ImportDetails_Table{{Desc: &found.TableDescriptor, IsNew: false, TargetCols: intoCols}}
} else {
seqVals := make(map[descpb.ID]int64)
Expand Down Expand Up @@ -954,6 +1006,7 @@ func importPlanHook(
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
Expand Down Expand Up @@ -2069,6 +2122,13 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

// If the table being imported into referenced UDTs, ensure that a concurrent
// schema change on any of the types has not modified the type descriptor. If
// it has, it is unsafe to import the data and we fail the import job.
if err := r.checkForUDTModification(ctx, p.ExecCfg()); err != nil {
return err
}

if err := r.publishSchemas(ctx, p.ExecCfg()); err != nil {
return err
}
Expand Down Expand Up @@ -2219,6 +2279,44 @@ func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.Executo
})
}

// checkForUDTModification checks whether any of the types referenced by the
// table being imported into have been modified since they were read during
// import planning. If they have, it may be unsafe to continue with the import
// since we could be ingesting data that is no longer valid for the type.
//
// Egs: Renaming an enum value mid import could result in the import ingesting a
// value that is no longer valid.
//
// TODO(SQL Schema): This method might be unnecessarily aggressive in failing
// the import. The semantics of what concurrent type changes are/are not safe
// during an IMPORT still need to be ironed out. Once they are, we can make this
// method more conservative in what it uses to deem a type change dangerous.
func (r *importResumer) checkForUDTModification(
ctx context.Context, execCfg *sql.ExecutorConfig,
) error {
details := r.job.Details().(jobspb.ImportDetails)
if details.Types == nil {
return nil
}
return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn,
col *descs.Collection) error {
for _, savedTypeDesc := range details.Types {
typeDesc, err := catalogkv.MustGetTypeDescByID(ctx, txn, execCfg.Codec,
savedTypeDesc.Desc.GetID())
if err != nil {
return errors.Wrap(err, "resolving type descriptor when checking version mismatch")
}
if typeDesc.GetModificationTime() != savedTypeDesc.Desc.GetModificationTime() {
return errors.Newf("type descriptor %d has a different modification time than what"+
" was saved during import planning; unsafe to import since the type"+
" has changed during the course of the import",
typeDesc.GetID())
}
}
return nil
})
}

// publishTables updates the status of imported tables from OFFLINE to PUBLIC.
func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.ExecutorConfig) error {
details := r.job.Details().(jobspb.ImportDetails)
Expand Down
138 changes: 137 additions & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,14 +1302,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test table with default value
// Test table with default value.
{
create: "a greeting, b greeting default 'hi'",
intoCols: "a, b",
typ: "PGCOPY",
contents: "hello\nhi\thi\n",
errString: "type OID 100052 does not exist",
},
// Test table with an invalid enum value.
{
create: "a greeting",
intoCols: "a",
typ: "PGCOPY",
contents: "randomvalue\n",
errString: "encountered error invalid input value for enum greeting",
},
}

// Test IMPORT INTO.
Expand Down Expand Up @@ -7085,3 +7093,131 @@ CSV DATA ($1)
}
}
}

func TestUDTChangeDuringImport(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir, cleanup := testutils.TempDir(t)
defer cleanup()

// Write some data to the test file.
f, err := ioutil.TempFile(baseDir, "data")
require.NoError(t, err)
_, err = f.Write([]byte("1,hello\n2,hi\n"))
require.NoError(t, err)

importStmt := "IMPORT INTO t (a, b) CSV DATA ($1)"
importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))

testCases := []struct {
name string
query string
expectTypeChangeErr string
expectImportErr bool
}{
{
"add-value",
"ALTER TYPE d.greeting ADD VALUE 'cheers'",
"",
true,
},
{
"rename-value",
"ALTER TYPE d.greeting RENAME VALUE 'howdy' TO 'hola';",
"",
true,
},
{
"add-value-in-txn",
"BEGIN; ALTER TYPE d.greeting ADD VALUE 'cheers'; COMMIT;",
"",
true,
},
// Dropping a value does change the modification time on the descriptor,
// even though the enum value removal is forbidden during an import.
// As a result of this, the import is expected to fail.
{
"drop-value",
"ALTER TYPE d.greeting DROP VALUE 'howdy';",
"could not validate enum value removal",
true,
},
// Dropping a type does not change the modification time on the descriptor,
// and so the import is expected to succeed.
{
"drop-type",
"DROP TYPE d.greeting",
"cannot drop type \"greeting\"",
false,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
requestReceived := make(chan struct{})
allowResponse := make(chan struct{})
tc := testcluster.StartTestCluster(
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
ExternalIODir: baseDir,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse),
TestingRequestFilter: func(ctx context.Context, br roachpb.BatchRequest) *roachpb.Error {
for _, ru := range br.Requests {
switch ru.GetInner().(type) {
case *roachpb.AddSSTableRequest:
<-requestReceived
}
}
return nil
},
}},
}})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

// Create a database with a type.
sqlDB.Exec(t, `
SET CLUSTER SETTING sql.defaults.drop_enum_value.enabled = true;
SET enable_drop_enum_value = true;
CREATE DATABASE d;
USE d;
CREATE TYPE d.greeting AS ENUM ('hello', 'howdy', 'hi');
CREATE TABLE t (a INT, b greeting);
`)

// Start the import.
errCh := make(chan error)
defer close(errCh)
go func() {
_, err := sqlDB.DB.ExecContext(ctx, importStmt, importArgs)
errCh <- err
}()

// Wait for the import to start.
requestReceived <- struct{}{}

if test.expectTypeChangeErr != "" {
sqlDB.ExpectErr(t, test.expectTypeChangeErr, test.query)
} else {
sqlDB.Exec(t, test.query)
}

// Allow the import to finish.
close(requestReceived)
close(allowResponse)

err := <-errCh
if test.expectImportErr {
testutils.IsError(err,
"unsafe to import since the type has changed during the course of the import")
} else {
require.NoError(t, err)
}
})
}
}
Loading