From 2a3ee17d58a2802c67b2bac4c38e9e53b89de9ea Mon Sep 17 00:00:00 2001 From: Aditya Maru Date: Tue, 31 Aug 2021 20:12:51 -0400 Subject: [PATCH] importccl: fail IMPORT INTO on concurrent type change This change adds logic to check that the type descriptors referenced by the table being imported into have not undergone any changes (adding/dropping values) during the course of the import job execution. This is done by matching the current type desc modification time against the modification time on the type desc that was cached in the import job during planning. If there is a mismatch, we should fail the job since the data could be referencing a value that has been renamed, leading to corrupt kv entries. Release note (bug fix): add protection to import into to guard against concurrent type changes on UDTs referenced by the target table. Release justification: fixes for high-priority or high-severity bugs in existing functionality --- pkg/ccl/importccl/import_stmt.go | 48 ++++++++- pkg/ccl/importccl/import_stmt_test.go | 138 +++++++++++++++++++++++++- 2 files changed, 183 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 70192ee9d198..0f98a9ac5e66 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -865,8 +865,7 @@ func importPlanHook( typeDetails = make([]jobspb.ImportDetails_Type, 0, len(typeDescs)) } for _, typeDesc := range typeDescs { - typeDetails = append(typeDetails, jobspb.ImportDetails_Type{ - Desc: typeDesc.DescriptorProto().GetType()}) + typeDetails = append(typeDetails, jobspb.ImportDetails_Type{Desc: typeDesc.TypeDesc()}) } } @@ -2123,6 +2122,13 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error { } } + // If the table being imported into referenced UDTs, ensure that a concurrent + // schema change on any of the types has not modified the type descriptor. If + // it has, it is unsafe to import the data and we fail the import job. + if err := r.checkForUDTModification(ctx, p.ExecCfg()); err != nil { + return err + } + if err := r.publishSchemas(ctx, p.ExecCfg()); err != nil { return err } @@ -2273,6 +2279,44 @@ func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.Executo }) } +// checkForUDTModification checks whether any of the types referenced by the +// table being imported into have been modified since they were read during +// import planning. If they have, it may be unsafe to continue with the import +// since we could be ingesting data that is no longer valid for the type. +// +// Egs: Renaming an enum value mid import could result in the import ingesting a +// value that is no longer valid. +// +// TODO(SQL Schema): This method might be unnecessarily aggressive in failing +// the import. The semantics of what concurrent type changes are/are not safe +// during an IMPORT still need to be ironed out. Once they are, we can make this +// method more conservative in what it uses to deem a type change dangerous. +func (r *importResumer) checkForUDTModification( + ctx context.Context, execCfg *sql.ExecutorConfig, +) error { + details := r.job.Details().(jobspb.ImportDetails) + if details.Types == nil { + return nil + } + return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn, + col *descs.Collection) error { + for _, savedTypeDesc := range details.Types { + typeDesc, err := catalogkv.MustGetTypeDescByID(ctx, txn, execCfg.Codec, + savedTypeDesc.Desc.GetID()) + if err != nil { + return errors.Wrap(err, "resolving type descriptor when checking version mismatch") + } + if typeDesc.GetModificationTime() != savedTypeDesc.Desc.GetModificationTime() { + return errors.Newf("type descriptor %d has a different modification time than what"+ + " was saved during import planning; unsafe to import since the type"+ + " has changed during the course of the import", + typeDesc.GetID()) + } + } + return nil + }) +} + // publishTables updates the status of imported tables from OFFLINE to PUBLIC. func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.ExecutorConfig) error { details := r.job.Details().(jobspb.ImportDetails) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index da2f106b1af5..f25086d5e7c6 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -1302,7 +1302,7 @@ func TestImportUserDefinedTypes(t *testing.T) { verifyQuery: "SELECT * FROM t ORDER BY a", expected: [][]string{{"hello", "hello"}, {"hi", "hi"}}, }, - // Test table with default value + // Test table with default value. { create: "a greeting, b greeting default 'hi'", intoCols: "a, b", @@ -1310,6 +1310,14 @@ func TestImportUserDefinedTypes(t *testing.T) { contents: "hello\nhi\thi\n", errString: "type OID 100052 does not exist", }, + // Test table with an invalid enum value. + { + create: "a greeting", + intoCols: "a", + typ: "PGCOPY", + contents: "randomvalue\n", + errString: "encountered error invalid input value for enum greeting", + }, } // Test IMPORT INTO. @@ -7085,3 +7093,131 @@ CSV DATA ($1) } } } + +func TestUDTChangeDuringImport(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + baseDir, cleanup := testutils.TempDir(t) + defer cleanup() + + // Write some data to the test file. + f, err := ioutil.TempFile(baseDir, "data") + require.NoError(t, err) + _, err = f.Write([]byte("1,hello\n2,hi\n")) + require.NoError(t, err) + + importStmt := "IMPORT INTO t (a, b) CSV DATA ($1)" + importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name())) + + testCases := []struct { + name string + query string + expectTypeChangeErr string + expectImportErr bool + }{ + { + "add-value", + "ALTER TYPE d.greeting ADD VALUE 'cheers'", + "", + true, + }, + { + "rename-value", + "ALTER TYPE d.greeting RENAME VALUE 'howdy' TO 'hola';", + "", + true, + }, + { + "add-value-in-txn", + "BEGIN; ALTER TYPE d.greeting ADD VALUE 'cheers'; COMMIT;", + "", + true, + }, + // Dropping a value does change the modification time on the descriptor, + // even though the enum value removal is forbidden during an import. + // As a result of this, the import is expected to fail. + { + "drop-value", + "ALTER TYPE d.greeting DROP VALUE 'howdy';", + "could not validate enum value removal", + true, + }, + // Dropping a type does not change the modification time on the descriptor, + // and so the import is expected to succeed. + { + "drop-type", + "DROP TYPE d.greeting", + "cannot drop type \"greeting\"", + false, + }, + } + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + requestReceived := make(chan struct{}) + allowResponse := make(chan struct{}) + tc := testcluster.StartTestCluster( + t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{ + ExternalIODir: baseDir, + Knobs: base.TestingKnobs{ + JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), + Store: &kvserver.StoreTestingKnobs{ + TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse), + TestingRequestFilter: func(ctx context.Context, br roachpb.BatchRequest) *roachpb.Error { + for _, ru := range br.Requests { + switch ru.GetInner().(type) { + case *roachpb.AddSSTableRequest: + <-requestReceived + } + } + return nil + }, + }}, + }}) + defer tc.Stopper().Stop(ctx) + conn := tc.Conns[0] + sqlDB := sqlutils.MakeSQLRunner(conn) + + // Create a database with a type. + sqlDB.Exec(t, ` +SET CLUSTER SETTING sql.defaults.drop_enum_value.enabled = true; +SET enable_drop_enum_value = true; +CREATE DATABASE d; +USE d; +CREATE TYPE d.greeting AS ENUM ('hello', 'howdy', 'hi'); +CREATE TABLE t (a INT, b greeting); +`) + + // Start the import. + errCh := make(chan error) + defer close(errCh) + go func() { + _, err := sqlDB.DB.ExecContext(ctx, importStmt, importArgs) + errCh <- err + }() + + // Wait for the import to start. + requestReceived <- struct{}{} + + if test.expectTypeChangeErr != "" { + sqlDB.ExpectErr(t, test.expectTypeChangeErr, test.query) + } else { + sqlDB.Exec(t, test.query) + } + + // Allow the import to finish. + close(requestReceived) + close(allowResponse) + + err := <-errCh + if test.expectImportErr { + testutils.IsError(err, + "unsafe to import since the type has changed during the course of the import") + } else { + require.NoError(t, err) + } + }) + } +}