Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
69674: importccl: fail IMPORT INTO on concurrent type change r=arulajmani,dt a=adityamaru

This change adds logic to check that the type descriptors
referenced by the table being imported into have not
undergone any changes (adding/dropping values) during the
the course of the import job execution. This is done by
matching the current type desc version against the version
on the type desc that was cached in the import job during
planning. If there is a mismatch, we should fail the job
since the data could be referencing a value that has been
dropped, leading to corrupt kv entries.

Informs: #61133

Release note (bug fix): add protection to import into to
guard against concurrent type changes on UDTs referenced by
the target table.

Release justification: fixes for high-priority or high-severity bugs in existing functionality


69881: Add Stephanie Bodoff to AUTHORS r=jlinder a=stbof

Release note: None

69910: authors: add jameswsj10 to authors r=jameswsj10 a=jameswsj10

Release justification: non-production change
Release note: None

69922: jobs,schedule: Annotate context with schedule ID.  r=miretskiy a=miretskiy

Annotate context passed to job execution with the schedule ID.

Release Justification: Low danger, observability imporovement; category 4.
Release Notes: None



Co-authored-by: Aditya Maru <[email protected]>
Co-authored-by: Stephanie Bodoff <[email protected]>
Co-authored-by: jameswsj10 <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
  • Loading branch information
5 people committed Sep 8, 2021
5 parents a146902 + 3188a26 + 07f5e62 + 4dcfd13 + 6279b9b commit 478a4d8
Show file tree
Hide file tree
Showing 7 changed files with 915 additions and 445 deletions.
2 changes: 2 additions & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ Ivan Kozik <[email protected]>
Jaewan Park <[email protected]>
Jackson Owens <[email protected]> <[email protected]>
James Graves <[email protected]>
James Jung <[email protected]> <[email protected]>
James H. Linder <[email protected]>
Jan Owsiany <[email protected]>
Jane Xing <[email protected]> <[email protected]>
Expand Down Expand Up @@ -357,6 +358,7 @@ Solon Gordon <[email protected]> solongordon <[email protected]>
Song Hao <[email protected]> songhao <[email protected]>
Spas Bojanov <[email protected]> <[email protected]>
Spencer Kimball <[email protected]> <[email protected]>
Stephanie Bodoff <[email protected]>
Stephen Mooney <[email protected]> <[email protected]>
Steven Danna <[email protected]>
Steven Hand <@cockroachlabs.com> hand <@cockroachlabs.com> hand-crdb <@cockroachlabs.com>
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ go_library(

go_test(
name = "importccl_test",
size = "medium",
size = "large",
srcs = [
"bench_test.go",
"client_import_test.go",
Expand Down
130 changes: 114 additions & 16 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,58 @@ func addToFileFormatTelemetry(fileFormat, state string) {
telemetry.Count(fmt.Sprintf("%s.%s.%s", "import", strings.ToLower(fileFormat), state))
}

// resolveUDTsUsedByImportInto resolves all the user defined types that are
// referenced by the table being imported into.
func resolveUDTsUsedByImportInto(
ctx context.Context, p sql.PlanHookState, table *tabledesc.Mutable,
) ([]catalog.TypeDescriptor, error) {
typeDescs := make([]catalog.TypeDescriptor, 0)
var dbDesc catalog.DatabaseDescriptor
err := sql.DescsTxn(ctx, p.ExecCfg(), func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) (err error) {
_, dbDesc, err = descriptors.GetImmutableDatabaseByID(ctx, txn, table.GetParentID(),
tree.DatabaseLookupFlags{
Required: true,
AvoidCached: true,
})
if err != nil {
return err
}
typeIDs, _, err := table.GetAllReferencedTypeIDs(dbDesc,
func(id descpb.ID) (catalog.TypeDescriptor, error) {
immutDesc, err := descriptors.GetImmutableTypeByID(ctx, txn, id, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
Required: true,
AvoidCached: true,
},
})
if err != nil {
return nil, err
}
return immutDesc, nil
})
if err != nil {
return errors.Wrap(err, "resolving type descriptors")
}

for _, typeID := range typeIDs {
immutDesc, err := descriptors.GetImmutableTypeByID(ctx, txn, typeID, tree.ObjectLookupFlags{
CommonLookupFlags: tree.CommonLookupFlags{
Required: true,
AvoidCached: true,
},
})
if err != nil {
return err
}
typeDescs = append(typeDescs, immutDesc)
}
return err
})
return typeDescs, err
}

// importPlanHook implements sql.PlanHookFn.
func importPlanHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
Expand Down Expand Up @@ -744,28 +796,13 @@ func importPlanHook(

var tableDetails []jobspb.ImportDetails_Table
var tableDescs []*tabledesc.Mutable // parallel with tableDetails
var typeDetails []jobspb.ImportDetails_Type
jobDesc, err := importJobDescription(p, importStmt, nil, filenamePatterns, opts)
if err != nil {
return err
}

if importStmt.Into {
// TODO(dt): this is a prototype for incremental import but there are many
// TODOs remaining before it is ready to graduate to prime-time. Some of
// them are captured in specific TODOs below, but some of the big, scary
// things to do are:
// - review planner vs txn use very carefully. We should try to get to a
// single txn used to plan the job and create it. Using the planner's
// txn today is very wrong since it will not commit until after the job
// has run, so starting a job based on reads it returned is very wrong.
// - audit every place that we resolve/lease/read table descs to be sure
// that the IMPORTING state is handled correctly. SQL lease acquisition
// is probably the easy one here since it has single read path -- the
// things that read directly like the queues or background jobs are the
// ones we'll need to really carefully look though.
// - Look at if/how cleanup/rollback works. Reconsider the cpu from the
// desc version (perhaps we should be re-reading instead?).
// - Write _a lot_ of tests.
if _, ok := allowedIntoFormats[importStmt.FileFormat]; !ok {
return errors.Newf(
"%s file format is currently unsupported by IMPORT INTO",
Expand Down Expand Up @@ -817,6 +854,21 @@ func importPlanHook(
}
}
}

{
// Resolve the UDTs used by the table being imported into.
typeDescs, err := resolveUDTsUsedByImportInto(ctx, p, found)
if err != nil {
return errors.Wrap(err, "resolving UDTs used by table being imported into")
}
if len(typeDescs) > 0 {
typeDetails = make([]jobspb.ImportDetails_Type, 0, len(typeDescs))
}
for _, typeDesc := range typeDescs {
typeDetails = append(typeDetails, jobspb.ImportDetails_Type{Desc: typeDesc.TypeDesc()})
}
}

tableDetails = []jobspb.ImportDetails_Table{{Desc: &found.TableDescriptor, IsNew: false, TargetCols: intoCols}}
} else {
seqVals := make(map[descpb.ID]int64)
Expand Down Expand Up @@ -954,6 +1006,7 @@ func importPlanHook(
Format: format,
ParentID: db.GetID(),
Tables: tableDetails,
Types: typeDetails,
SSTSize: sstSize,
Oversample: oversample,
SkipFKs: skipFKs,
Expand Down Expand Up @@ -2069,6 +2122,13 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
}
}

// If the table being imported into referenced UDTs, ensure that a concurrent
// schema change on any of the types has not modified the type descriptor. If
// it has, it is unsafe to import the data and we fail the import job.
if err := r.checkForUDTModification(ctx, p.ExecCfg()); err != nil {
return err
}

if err := r.publishSchemas(ctx, p.ExecCfg()); err != nil {
return err
}
Expand Down Expand Up @@ -2219,6 +2279,44 @@ func (r *importResumer) publishSchemas(ctx context.Context, execCfg *sql.Executo
})
}

// checkForUDTModification checks whether any of the types referenced by the
// table being imported into have been modified since they were read during
// import planning. If they have, it may be unsafe to continue with the import
// since we could be ingesting data that is no longer valid for the type.
//
// Egs: Renaming an enum value mid import could result in the import ingesting a
// value that is no longer valid.
//
// TODO(SQL Schema): This method might be unnecessarily aggressive in failing
// the import. The semantics of what concurrent type changes are/are not safe
// during an IMPORT still need to be ironed out. Once they are, we can make this
// method more conservative in what it uses to deem a type change dangerous.
func (r *importResumer) checkForUDTModification(
ctx context.Context, execCfg *sql.ExecutorConfig,
) error {
details := r.job.Details().(jobspb.ImportDetails)
if details.Types == nil {
return nil
}
return sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn *kv.Txn,
col *descs.Collection) error {
for _, savedTypeDesc := range details.Types {
typeDesc, err := catalogkv.MustGetTypeDescByID(ctx, txn, execCfg.Codec,
savedTypeDesc.Desc.GetID())
if err != nil {
return errors.Wrap(err, "resolving type descriptor when checking version mismatch")
}
if typeDesc.GetModificationTime() != savedTypeDesc.Desc.GetModificationTime() {
return errors.Newf("type descriptor %d has a different modification time than what"+
" was saved during import planning; unsafe to import since the type"+
" has changed during the course of the import",
typeDesc.GetID())
}
}
return nil
})
}

// publishTables updates the status of imported tables from OFFLINE to PUBLIC.
func (r *importResumer) publishTables(ctx context.Context, execCfg *sql.ExecutorConfig) error {
details := r.job.Details().(jobspb.ImportDetails)
Expand Down
138 changes: 137 additions & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,14 +1302,22 @@ func TestImportUserDefinedTypes(t *testing.T) {
verifyQuery: "SELECT * FROM t ORDER BY a",
expected: [][]string{{"hello", "hello"}, {"hi", "hi"}},
},
// Test table with default value
// Test table with default value.
{
create: "a greeting, b greeting default 'hi'",
intoCols: "a, b",
typ: "PGCOPY",
contents: "hello\nhi\thi\n",
errString: "type OID 100052 does not exist",
},
// Test table with an invalid enum value.
{
create: "a greeting",
intoCols: "a",
typ: "PGCOPY",
contents: "randomvalue\n",
errString: "encountered error invalid input value for enum greeting",
},
}

// Test IMPORT INTO.
Expand Down Expand Up @@ -7085,3 +7093,131 @@ CSV DATA ($1)
}
}
}

func TestUDTChangeDuringImport(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
baseDir, cleanup := testutils.TempDir(t)
defer cleanup()

// Write some data to the test file.
f, err := ioutil.TempFile(baseDir, "data")
require.NoError(t, err)
_, err = f.Write([]byte("1,hello\n2,hi\n"))
require.NoError(t, err)

importStmt := "IMPORT INTO t (a, b) CSV DATA ($1)"
importArgs := fmt.Sprintf("nodelocal://0/%s", filepath.Base(f.Name()))

testCases := []struct {
name string
query string
expectTypeChangeErr string
expectImportErr bool
}{
{
"add-value",
"ALTER TYPE d.greeting ADD VALUE 'cheers'",
"",
true,
},
{
"rename-value",
"ALTER TYPE d.greeting RENAME VALUE 'howdy' TO 'hola';",
"",
true,
},
{
"add-value-in-txn",
"BEGIN; ALTER TYPE d.greeting ADD VALUE 'cheers'; COMMIT;",
"",
true,
},
// Dropping a value does change the modification time on the descriptor,
// even though the enum value removal is forbidden during an import.
// As a result of this, the import is expected to fail.
{
"drop-value",
"ALTER TYPE d.greeting DROP VALUE 'howdy';",
"could not validate enum value removal",
true,
},
// Dropping a type does not change the modification time on the descriptor,
// and so the import is expected to succeed.
{
"drop-type",
"DROP TYPE d.greeting",
"cannot drop type \"greeting\"",
false,
},
}

for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
requestReceived := make(chan struct{})
allowResponse := make(chan struct{})
tc := testcluster.StartTestCluster(
t, 1, base.TestClusterArgs{ServerArgs: base.TestServerArgs{
ExternalIODir: baseDir,
Knobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
Store: &kvserver.StoreTestingKnobs{
TestingResponseFilter: jobutils.BulkOpResponseFilter(&allowResponse),
TestingRequestFilter: func(ctx context.Context, br roachpb.BatchRequest) *roachpb.Error {
for _, ru := range br.Requests {
switch ru.GetInner().(type) {
case *roachpb.AddSSTableRequest:
<-requestReceived
}
}
return nil
},
}},
}})
defer tc.Stopper().Stop(ctx)
conn := tc.Conns[0]
sqlDB := sqlutils.MakeSQLRunner(conn)

// Create a database with a type.
sqlDB.Exec(t, `
SET CLUSTER SETTING sql.defaults.drop_enum_value.enabled = true;
SET enable_drop_enum_value = true;
CREATE DATABASE d;
USE d;
CREATE TYPE d.greeting AS ENUM ('hello', 'howdy', 'hi');
CREATE TABLE t (a INT, b greeting);
`)

// Start the import.
errCh := make(chan error)
defer close(errCh)
go func() {
_, err := sqlDB.DB.ExecContext(ctx, importStmt, importArgs)
errCh <- err
}()

// Wait for the import to start.
requestReceived <- struct{}{}

if test.expectTypeChangeErr != "" {
sqlDB.ExpectErr(t, test.expectTypeChangeErr, test.query)
} else {
sqlDB.Exec(t, test.query)
}

// Allow the import to finish.
close(requestReceived)
close(allowResponse)

err := <-errCh
if test.expectImportErr {
testutils.IsError(err,
"unsafe to import since the type has changed during the course of the import")
} else {
require.NoError(t, err)
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)

// CreatedByScheduledJobs identifies the job that was created
Expand Down Expand Up @@ -180,7 +181,8 @@ func (s *jobScheduler) processSchedule(
schedule.ScheduleID(), schedule.ScheduleLabel(),
schedule.ScheduledRunTime(), schedule.NextRun())

if err := executor.ExecuteJob(ctx, s.JobExecutionConfig, s.env, schedule, txn); err != nil {
execCtx := logtags.AddTag(ctx, "schedule", schedule.ScheduleID())
if err := executor.ExecuteJob(execCtx, s.JobExecutionConfig, s.env, schedule, txn); err != nil {
return errors.Wrapf(err, "executing schedule %d", schedule.ScheduleID())
}

Expand Down
Loading

0 comments on commit 478a4d8

Please sign in to comment.