Skip to content

Commit

Permalink
Merge #63389 #63478
Browse files Browse the repository at this point in the history
63389: storage: remove unused storage.Version and related functions r=irfansharif a=stevendanna

This appears to be unused as of 4970b3d.

Release note: None

63478: importccl: fix bug swallowing txn errors r=adityamaru a=ajwerner

If we hit an error updating the descriptor IDs, we would just swallow it. This
is illegal and causes correctness violations. The bug was introduced in #62658
which added observability regarding descriptor IDs. This change which fixes the
bug performs the update of the descriptors IDs in the same operation which
updates the rest of the import payload.

Omitting a release note because this code has note been released.

Fixes #63476.

Release note: None

Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Apr 13, 2021
3 parents 1a73dc8 + 33ad10b + ccc7c84 commit a0008ec
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 203 deletions.
48 changes: 21 additions & 27 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1900,9 +1900,12 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {
// Skip prepare stage on job resumption, if it has already been completed.
if !details.PrepareComplete {
var schemaMetadata *preparedSchemaMetadata
err := descs.Txn(ctx, p.ExecCfg().Settings, p.ExecCfg().LeaseManager,
p.ExecCfg().InternalExecutor, p.ExecCfg().DB, func(ctx context.Context, txn *kv.Txn,
descsCol *descs.Collection) error {
if err := descs.Txn(
ctx, p.ExecCfg().Settings, p.ExecCfg().LeaseManager,
p.ExecCfg().InternalExecutor, p.ExecCfg().DB,
func(
ctx context.Context, txn *kv.Txn, descsCol *descs.Collection,
) error {
var preparedDetails jobspb.ImportDetails
schemaMetadata = &preparedSchemaMetadata{
newSchemaIDToName: make(map[descpb.ID]string),
Expand Down Expand Up @@ -1938,38 +1941,29 @@ func (r *importResumer) Resume(ctx context.Context, execCtx interface{}) error {

// Update the job details now that the schemas and table descs have
// been "prepared".
err = r.job.SetDetails(ctx, txn, preparedDetails)
if err != nil {
return err
}

// Update the job record with the schema and table IDs we will be
// ingesting into.
err = r.job.SetDescriptorIDs(ctx, txn, func(ctx context.Context,
descIDs []descpb.ID) ([]descpb.ID, error) {
var descriptorIDs []descpb.ID
if descIDs == nil {
return r.job.Update(ctx, txn, func(
txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater,
) error {
pl := md.Payload
*pl.GetImport() = preparedDetails

// Update the set of descriptors for later observability.
// TODO(ajwerner): Do we need this idempotence test?
prev := md.Payload.DescriptorIDs
if prev == nil {
var descriptorIDs []descpb.ID
for _, schema := range preparedDetails.Schemas {
descriptorIDs = append(descriptorIDs, schema.Desc.GetID())
}
for _, table := range preparedDetails.Tables {
descriptorIDs = append(descriptorIDs, table.Desc.GetID())
}
return descriptorIDs, nil
pl.DescriptorIDs = descriptorIDs
}
log.Warningf(ctx, "unexpected descriptor IDs %+v set in import job %d", descIDs,
r.job.ID())
return nil, nil
ju.UpdatePayload(pl)
return nil
})
if err != nil {
// We don't want to fail the import if we fail to update the
// descriptor IDs as this is only for observability.
log.Warningf(ctx, "failed to update import job %d with target descriptor IDs",
r.job.ID())
}
return nil
})
if err != nil {
}); err != nil {
return err
}

Expand Down
20 changes: 0 additions & 20 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,22 +288,6 @@ func (j *Job) SetDescription(ctx context.Context, txn *kv.Txn, updateFn Descript
})
}

// SetDescriptorIDs updates the description of a created job.
func (j *Job) SetDescriptorIDs(
ctx context.Context, txn *kv.Txn, updateFn DescriptorIdsUpdateFn,
) error {
return j.Update(ctx, txn, func(_ *kv.Txn, md JobMetadata, ju *JobUpdater) error {
prev := md.Payload.DescriptorIDs
descIDs, err := updateFn(ctx, prev)
if err != nil {
return err
}
md.Payload.DescriptorIDs = descIDs
ju.UpdatePayload(md.Payload)
return nil
})
}

// SetNonCancelable updates the NonCancelable field of a created job.
func (j *Job) SetNonCancelable(
ctx context.Context, txn *kv.Txn, updateFn NonCancelableUpdateFn,
Expand All @@ -328,10 +312,6 @@ type RunningStatusFn func(ctx context.Context, details jobspb.Details) (RunningS
// given its current one.
type DescriptionUpdateFn func(ctx context.Context, description string) (string, error)

// DescriptorIdsUpdateFn is a callback that computes a job's descriptor IDs given
// its current one.
type DescriptorIdsUpdateFn func(ctx context.Context, descIDs []descpb.ID) ([]descpb.ID, error)

// NonCancelableUpdateFn is a callback that computes a job's non-cancelable
// status given its current one.
type NonCancelableUpdateFn func(ctx context.Context, nonCancelable bool) bool
Expand Down
2 changes: 0 additions & 2 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ go_library(
"stacks.go",
"temp_dir.go",
"temp_engine.go",
"version.go",
],
# List out all the c-dependencies pkg/storage depends on.
cdeps = [
Expand Down Expand Up @@ -139,7 +138,6 @@ go_test(
"sst_writer_test.go",
"temp_dir_test.go",
"temp_engine_test.go",
"version_test.go",
],
data = glob(["testdata/**"]),
embed = [":storage"],
Expand Down
83 changes: 0 additions & 83 deletions pkg/storage/version.go

This file was deleted.

71 changes: 0 additions & 71 deletions pkg/storage/version_test.go

This file was deleted.

0 comments on commit a0008ec

Please sign in to comment.