Skip to content

Commit

Permalink
importccl: fail IMPORT INTO on concurrent type change
Browse files Browse the repository at this point in the history
This change adds logic to check that the type descriptors
referenced by the table being imported into have not
undergone any changes (adding/dropping values) during the
course of the import job execution. This is done by
matching the current type desc modification time against
the modification time on the type desc that was cached
in the import job during planning. If there is a mismatch,
we should fail the job since the data could be referencing
a value that has been renamed, leading to corrupt kv entries.

Release note (bug fix): add protection to import into to
guard against concurrent type changes on UDTs referenced by
the target table.

Release justification: fixes for high-priority or high-severity bugs in existing functionality
  • Loading branch information
adityamaru committed Sep 2, 2021
1 parent de35d3d commit 597ec62
Show file tree
Hide file tree
Showing 2 changed files with 182 additions and 1 deletion.
45 changes: 45 additions & 0 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2123,6 +2123,13 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

// If the table being imported into referenced UDTs, ensure that a concurrent
// schema change on any of the types has not modified the type descriptor. If
// it has, it is unsafe to import the data and we fail the import job.
if err := r.checkForUDTModification(ctx, p.ExecCfg()); err != nil {
return err
}

if err := r.publishSchemas(ctx, p.ExecCfg()); err != nil {
return err
}
Expand Down Expand Up @@ -2273,6 +2280,44 @@ func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.Executo
})
}

// checkForUDTModification checks whether any of the types referenced by the
// table being imported into have been modified since they were read during
// import planning. If they have, it may be unsafe to continue with the import
// since we could be ingesting data that is no longer valid for the type.
//
// Egs: Renaming an enum value mid import could result in the import ingesting a
// value that is no longer valid.
//
// TODO(SQL Schema): This method might be unnecessarily aggressive in failing
// the import. The semantics of what concurrent type changes are/are not safe
// during an IMPORT still need to be ironed out. Once they are, we can make this
// method more conservative in what it uses to deem a type change dangerous.
func (r *importResumer) checkForUDTModification(
ctx context.Context, execCfg *sql.ExecutorConfig,
) error {
details := r.job.Details().(jobspb.ImportDetails)
if details.Types == nil {
return nil
}
return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn,
col *descs.Collection) error {
for _, savedTypeDesc := range details.Types {
typeDesc, err := catalogkv.MustGetTypeDescByID(ctx, txn, execCfg.Codec,
savedTypeDesc.Desc.GetID())
if err != nil {
return errors.Wrap(err, "resolving type descriptor when checking version mismatch")
}
if typeDesc.GetModificationTime() != savedTypeDesc.Desc.GetModificationTime() {
return errors.Newf("type descriptor %d has a different modification time than what"+
" was saved during import planning; unsafe to import since the type"+
" has changed during the course of the import",
typeDesc.GetID())
}
}
return nil
})
}

// publishTables updates the status of imported tables from OFFLINE to PUBLIC.
func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.ExecutorConfig) error {
details := r.job.Details().(jobspb.ImportDetails)
Expand Down
138 changes: 137 additions & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,14 +1302,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test table with default value
// Test table with default value.
{
create: "a greeting, b greeting default 'hi'",
intoCols: "a, b",
typ: "PGCOPY",
contents: "hello\nhi\thi\n",
errString: "type OID 100052 does not exist",
},
// Test table with an invalid enum value.
{
create: "a greeting",
intoCols: "a",
typ: "PGCOPY",
contents: "randomvalue\n",
errString: "encountered error invalid input value for enum greeting",
},
}

// Test IMPORT INTO.
Expand Down Expand Up @@ -7085,3 +7093,131 @@ CSV DATA ($1)
}
}
}

func TestUDTChangeDuringImport(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir, cleanup := testutils.TempDir(t)
defer cleanup()

// Write some data to the test file.
f, err := ioutil.TempFile(baseDir, "data")
require.NoError(t, err)
_, err = f.Write([]byte("1,hello\n2,hi\n"))
require.NoError(t, err)

importStmt := "IMPORT INTO t (a, b) CSV DATA ($1)"
importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))

testCases := []struct {
name string
query string
expectTypeChangeErr string
expectImportErr bool
}{
{
"add-value",
"ALTER TYPE d.greeting ADD VALUE 'cheers'",
"",
true,
},
{
"rename-value",
"ALTER TYPE d.greeting RENAME VALUE 'howdy' TO 'hola';",
"",
true,
},
{
"add-value-in-txn",
"BEGIN; ALTER TYPE d.greeting ADD VALUE 'cheers'; COMMIT;",
"",
true,
},
// Dropping a value does change the modification time on the descriptor,
// even though the enum value removal is forbidden during an import.
// As a result of this, the import is expected to fail.
{
"drop-value",
"ALTER TYPE d.greeting DROP VALUE 'howdy';",
"could not validate enum value removal",
true,
},
// Dropping a type does not change the modification time on the descriptor,
// and so the import is expected to succeed.
{
"drop-type",
"DROP TYPE d.greeting",
"cannot drop type \"greeting\"",
false,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
requestReceived := make(chan struct{})
allowResponse := make(chan struct{})
tc := testcluster.StartTestCluster(
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
ExternalIODir: baseDir,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse),
TestingRequestFilter: func(ctx context.Context, br roachpb.BatchRequest) *roachpb.Error {
for _, ru := range br.Requests {
switch ru.GetInner().(type) {
case *roachpb.AddSSTableRequest:
<-requestReceived
}
}
return nil
},
}},
}})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

// Create a database with a type.
sqlDB.Exec(t, `
SET CLUSTER SETTING sql.defaults.drop_enum_value.enabled = true;
SET enable_drop_enum_value = true;
CREATE DATABASE d;
USE d;
CREATE TYPE d.greeting AS ENUM ('hello', 'howdy', 'hi');
CREATE TABLE t (a INT, b greeting);
`)

// Start the import.
errCh := make(chan error)
defer close(errCh)
go func() {
_, err := sqlDB.DB.ExecContext(ctx, importStmt, importArgs)
errCh <- err
}()

// Wait for the import to start.
requestReceived <- struct{}{}

if test.expectTypeChangeErr != "" {
sqlDB.ExpectErr(t, test.expectTypeChangeErr, test.query)
} else {
sqlDB.Exec(t, test.query)
}

// Allow the import to finish.
close(requestReceived)
close(allowResponse)

err := <-errCh
if test.expectImportErr {
testutils.IsError(err,
"unsafe to import since the type has changed during the course of the import")
} else {
require.NoError(t, err)
}
})
}
}

0 comments on commit 597ec62

Please sign in to comment.