Skip to content

Commit

Permalink
jobs: refresh job details before removing protected timestamps
Browse files Browse the repository at this point in the history
Fixes: #92493

Previously, we added the protected timestamps manager into the jobs
frame work, which made it easier to automatically add and remove
protected timestamps for jobs. Unfortunately, the Unprotect API
when invoked from a clean up function never properly refreshed
the job. Which could lead to a race condition trying to remove
protected timestamps for schema changes. To address this, the
Unprotect function will take advantage of the job update function
to confirm that a refreshed copy does have protected timestamps
set.

Release note: None
  • Loading branch information
fqazi committed Nov 28, 2022
1 parent 8e4d3df commit 02867ed
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions pkg/jobs/jobsprotectedts/jobs_protected_ts_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (p *Manager) Protect(
return nil, err
}
return func(ctx context.Context) error {
// Remove the protected timestamp.
return p.Unprotect(ctx, job)
}, nil
}
Expand All @@ -201,19 +202,23 @@ func (p *Manager) Protect(
// record. Note: This should only be used for job cleanup if is not currently,
// executing.
func (p *Manager) Unprotect(ctx context.Context, job *jobs.Job) error {
return p.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Fetch the protected timestamp UUID from the job, if one exists.
protectedtsID := getProtectedTSOnJob(job.Details())
// Fetch the protected timestamp UUID from the job, if one exists.
if getProtectedTSOnJob(job.Details()) == nil {
return nil
}
// If we do find one then we need to clean up the protected timestamp,
// and remove it from the job.
return job.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
// The job will get refreshed, so check one more time the protected
// timestamp still exists. The callback returned from Protect works
// on a previously cached copy.
protectedtsID := getProtectedTSOnJob(md.Payload.UnwrapDetails())
if protectedtsID == nil {
return nil
}
// If we do find one then we need to clean up the protected timestamp,
// and remove it from the job.
return job.Update(ctx, txn, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
updatedDetails := setProtectedTSOnJob(job.Details(), nil)
md.Payload.Details = jobspb.WrapPayloadDetails(updatedDetails)
ju.UpdatePayload(md.Payload)
return p.protectedTSProvider.Release(ctx, txn, *protectedtsID)
})
updatedDetails := setProtectedTSOnJob(job.Details(), nil)
md.Payload.Details = jobspb.WrapPayloadDetails(updatedDetails)
ju.UpdatePayload(md.Payload)
return p.protectedTSProvider.Release(ctx, txn, *protectedtsID)
})
}

0 comments on commit 02867ed

Please sign in to comment.