Skip to content

Commit

Permalink
jobs,*: make job clients responsible for generating IDs
Browse files Browse the repository at this point in the history
Job IDs used to be randomly generated in the job registry when creating
a job, which meant that they were not stable across restarts when jobs
were created in a txn closure meant to be idempotent. For
`StartableJobs`, we also created a tracing span and registered the "new"
job each time `CreateStartableJobWithTxn` was called, so these would
leak in the presence of restarts.

This commit adds a job ID argument to registry methods that create jobs,
so that callers can generate stable IDs. It also modifies the
`StartableJob` API to help ensure jobs (identified by a stable ID) are
only registered once in the presence of restarts:
`CreateStartableJobWithTxn` now takes a `*StartableJob`, and will not
create a tracing span and register the job again if the reference is
non-nil. This API is not ideal because it's probably easy to use it
incorrectly, but it at least makes the correct behavior on txn restarts
possible.

Release note: None
  • Loading branch information
thoszhang committed Feb 23, 2021
1 parent bdcdf88 commit fe6377c
Show file tree
Hide file tree
Showing 30 changed files with 323 additions and 209 deletions.
19 changes: 10 additions & 9 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,41 +1224,42 @@ func backupPlanHook(
if backupStmt.Options.Detached {
// When running inside an explicit transaction, we simply create the job
// record. We do not wait for the job to finish.
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, p.ExtendedEvalContext().Txn)
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
if err != nil {
return err
}

if err := doWriteBackupManifestCheckpoint(ctx, *aj.ID()); err != nil {
if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil {
return err
}

// The protect timestamp logic for a DETACHED BACKUP can be run within the
// same txn as the BACKUP is being planned in, because we do not wait for
// the BACKUP job to complete.
err = protectTimestampForBackup(ctx, p, p.ExtendedEvalContext().Txn, *aj.ID(), spans,
err = protectTimestampForBackup(ctx, p, p.ExtendedEvalContext().Txn, jobID, spans,
startTime, endTime, backupDetails)
if err != nil {
return err
}

resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
collectTelemetry()
return nil
}

var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
if err != nil {
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
return err
}
if err := doWriteBackupManifestCheckpoint(ctx, *sj.ID()); err != nil {
if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil {
return err
}

return protectTimestampForBackup(ctx, p, txn, *sj.ID(), spans, startTime, endTime,
return protectTimestampForBackup(ctx, p, txn, jobID, spans, startTime, endTime,
backupDetails)
}); err != nil {
if sj != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ func (r *restoreResumer) dropDescriptors(
Progress: jobspb.SchemaChangeGCProgress{},
NonCancelable: true,
}
if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil {
if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, jr.MakeJobID(), txn); err != nil {
return err
}

Expand Down
14 changes: 6 additions & 8 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -1755,23 +1755,21 @@ func doRestorePlan(
if restoreStmt.Options.Detached {
// When running in detached mode, we simply create the job record.
// We do not wait for the job to finish.
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, p.ExtendedEvalContext().Txn)
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
if err != nil {
return err
}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
collectTelemetry()
return nil
}

var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
if err != nil {
return err
}
return nil
return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/backupccl/restore_schema_change_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func createTypeChangeJobFromDesc(
// Type change jobs are not cancellable.
NonCancelable: true,
}
job, err := jr.CreateJobWithTxn(ctx, record, txn)
if err != nil {
jobID := jr.MakeJobID()
if _, err := jr.CreateJobWithTxn(ctx, record, jobID, txn); err != nil {
return err
}
log.Infof(ctx, "queued new type schema change job %d for type %d", *job.ID(), typ.GetID())
log.Infof(ctx, "queued new type schema change job %d for type %d", jobID, typ.GetID())
return nil
}

Expand Down Expand Up @@ -201,18 +201,18 @@ func createSchemaChangeJobsFromMutations(
},
Progress: jobspb.SchemaChangeProgress{},
}
newJob, err := jr.CreateJobWithTxn(ctx, jobRecord, txn)
if err != nil {
jobID := jr.MakeJobID()
if _, err := jr.CreateJobWithTxn(ctx, jobRecord, jobID, txn); err != nil {
return err
}
newMutationJob := descpb.TableDescriptor_MutationJob{
MutationID: mutationID,
JobID: *newJob.ID(),
JobID: jobID,
}
mutationJobs = append(mutationJobs, newMutationJob)

log.Infof(ctx, "queued new schema change job %d for table %d, mutation %d",
*newJob.ID(), tableDesc.ID, mutationID)
jobID, tableDesc.ID, mutationID)
}
tableDesc.MutationJobs = mutationJobs
return nil
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ go_library(
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb:ptpb_go_proto",
"//pkg/roachpb",
"//pkg/security",
"//pkg/server/telemetry",
Expand Down
23 changes: 12 additions & 11 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -327,13 +328,17 @@ func changefeedPlanHook(
// changeFrontier.manageProtectedTimestamps for more details on the handling of
// protected timestamps.
var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
{
var protectedTimestampID uuid.UUID
var spansToProtect []roachpb.Span
var ptr *ptpb.Record
if hasInitialScan := initialScanFromOptions(details.Opts); hasInitialScan {
protectedTimestampID = uuid.MakeV4()
spansToProtect = makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID
ptr = jobsprotectedts.MakeRecord(protectedTimestampID, jobID,
statementTime, spansToProtect)
}

jr := jobs.Record{
Expand All @@ -348,19 +353,15 @@ func changefeedPlanHook(
Details: details,
Progress: *progress.GetChangefeed(),
}
createJobAndProtectedTS := func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
if err != nil {
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
return err
}
if protectedTimestampID == uuid.Nil {
return nil
if ptr != nil {
return p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, ptr)
}
ptr := jobsprotectedts.MakeRecord(protectedTimestampID, *sj.ID(),
statementTime, spansToProtect)
return p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, ptr)
}
if err := p.ExecCfg().DB.Txn(ctx, createJobAndProtectedTS); err != nil {
return nil
}); err != nil {
if sj != nil {
if err := sj.CleanupOnRollback(ctx); err != nil {
log.Warningf(ctx, "failed to cleanup aborted job: %v", err)
Expand Down Expand Up @@ -392,7 +393,7 @@ func changefeedPlanHook(
case <-ctx.Done():
return ctx.Err()
case resultsCh <- tree.Datums{
tree.NewDInt(tree.DInt(*sj.ID())),
tree.NewDInt(tree.DInt(jobID)),
}:
return nil
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,30 +914,31 @@ func importPlanHook(
if isDetached {
// When running inside an explicit transaction, we simply create the job
// record. We do not wait for the job to finish.
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, p.ExtendedEvalContext().Txn)
jobID := p.ExecCfg().JobRegistry.MakeJobID()
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
if err != nil {
return err
}

if err = protectTimestampForImport(ctx, p, p.ExtendedEvalContext().Txn, *aj.ID(), spansToProtect,
if err = protectTimestampForImport(ctx, p, p.ExtendedEvalContext().Txn, jobID, spansToProtect,
walltime, importDetails); err != nil {
return err
}

addToFileFormatTelemetry(format.Format.String(), "started")
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
return nil
}

var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
if err != nil {
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
return err
}

return protectTimestampForImport(ctx, p, txn, *sj.ID(), spansToProtect, walltime, importDetails)
return protectTimestampForImport(ctx, p, txn, jobID, spansToProtect, walltime, importDetails)
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
Expand Down Expand Up @@ -1915,7 +1916,8 @@ func (r *importResumer) dropTables(
Progress: jobspb.SchemaChangeGCProgress{},
NonCancelable: true,
}
if _, err := execCfg.JobRegistry.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil {
if _, err := execCfg.JobRegistry.CreateJobWithTxn(
ctx, gcJobRecord, execCfg.JobRegistry.MakeJobID(), txn); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4716,7 +4716,7 @@ func TestImportControlJobRBAC(t *testing.T) {
})

startLeasedJob := func(t *testing.T, record jobs.Record) *jobs.StartableJob {
job, err := registry.CreateAndStartJob(ctx, nil, record)
job, err := registry.TestingCreateAndStartJob(ctx, nil, record)
require.NoError(t, err)
return job
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestStreamIngestionJobRollBack(t *testing.T) {
},
Progress: jobspb.StreamIngestionProgress{},
}
j, err := registry.CreateAndStartJob(ctx, nil, streamIngestJobRecord)
j, err := registry.TestingCreateAndStartJob(ctx, nil, streamIngestJobRecord)
require.NoError(t, err)

// Insert more data in the table. These changes should be rollback during job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ func ingestionPlanHook(
}

var sj *jobs.StartableJob
jobID := p.ExecCfg().JobRegistry.MakeJobID()
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
return err
return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)
}); err != nil {
if sj != nil {
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingutils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestCutoverBuiltin(t *testing.T) {
},
Progress: jobspb.StreamIngestionProgress{},
}
job, err := registry.CreateAndStartJob(ctx, nil, streamIngestJobRecord)
job, err := registry.TestingCreateAndStartJob(ctx, nil, streamIngestJobRecord)
require.NoError(t, err)

// Check that sentinel is not set.
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ go_test(
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/security",
Expand Down
5 changes: 0 additions & 5 deletions pkg/jobs/deprecated.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,6 @@ func (r *Registry) deprecatedResume(ctx context.Context, resumer Resumer, job *J
func (j *Job) deprecatedInsert(
ctx context.Context, txn *kv.Txn, id int64, lease *jobspb.Lease, session sqlliveness.Session,
) error {
if j.id != nil {
// Already created - do nothing.
return nil
}

j.mu.payload.Lease = lease

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
Expand Down
6 changes: 1 addition & 5 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/errors"
)

// FakeResumer calls optional callbacks during the job lifecycle.
Expand Down Expand Up @@ -69,10 +68,7 @@ func (j *Job) Started(ctx context.Context) error {

// Created is a test only function that inserts a new jobs table row.
func (j *Job) Created(ctx context.Context) error {
if j.ID() != nil {
return errors.Errorf("job already created with ID %v", *j.ID())
}
return j.deprecatedInsert(ctx, nil /* txn */, j.registry.makeJobID(), nil /* lease */, nil /* session */)
return j.deprecatedInsert(ctx, nil /* txn */, *j.ID(), nil /* lease */, nil /* session */)
}

// Paused is a wrapper around the internal function that moves a job to the
Expand Down
Loading

0 comments on commit fe6377c

Please sign in to comment.