From fe6377c4f1b6e822bb436f64808762e02a815c11 Mon Sep 17 00:00:00 2001 From: Lucy Zhang Date: Wed, 17 Feb 2021 22:24:05 -0500 Subject: [PATCH] jobs,*: make job clients responsible for generating IDs Job IDs used to be randomly generated in the job registry when creating a job, which meant that they were not stable across restarts when jobs were created in a txn closure meant to be idempotent. For `StartableJobs`, we also created a tracing span and registered the "new" job each time `CreateStartableJobWithTxn` was called, so these would leak in the presence of restarts. This commit adds a job ID argument to registry methods that create jobs, so that callers can generate stable IDs. It also modifies the `StartableJob` API to help ensure jobs (identified by a stable ID) are only registered once in the presence of restarts: `CreateStartableJobWithTxn` now takes a `*StartableJob`, and will not create a tracing span and register the job again if the reference is non-nil. This API is not ideal because it's probably easy to use it incorrectly, but it at least makes the correct behavior on txn restarts possible. Release note: None --- pkg/ccl/backupccl/backup_planning.go | 19 ++- pkg/ccl/backupccl/restore_job.go | 2 +- pkg/ccl/backupccl/restore_planning.go | 14 +- .../restore_schema_change_creation.go | 14 +- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/changefeed_stmt.go | 23 +-- pkg/ccl/importccl/import_stmt.go | 18 +- pkg/ccl/importccl/import_stmt_test.go | 2 +- .../streamingest/stream_ingestion_job_test.go | 2 +- .../streamingest/stream_ingestion_planning.go | 4 +- .../streamingccl/streamingutils/utils_test.go | 2 +- pkg/jobs/BUILD.bazel | 1 + pkg/jobs/deprecated.go | 5 - pkg/jobs/helpers_test.go | 6 +- pkg/jobs/jobs_test.go | 158 ++++++++++++------ .../jobsprotectedts/jobs_protected_ts_test.go | 5 +- pkg/jobs/registry.go | 152 ++++++++++------- pkg/jobs/registry_external_test.go | 7 +- pkg/jobs/schedule_control_test.go | 12 +- pkg/migration/migrationmanager/manager.go | 11 +- pkg/server/status_test.go | 4 +- pkg/sql/create_stats.go | 17 +- pkg/sql/distsql_physical_planner.go | 2 +- pkg/sql/gcjob_test/gc_job_test.go | 8 +- pkg/sql/indexbackfiller_test.go | 7 +- pkg/sql/planner.go | 4 +- pkg/sql/row/expr_walker_test.go | 2 +- pkg/sql/schema_changer.go | 24 +-- pkg/sql/tenant.go | 3 +- pkg/sql/truncate.go | 3 +- 30 files changed, 323 insertions(+), 209 deletions(-) diff --git a/pkg/ccl/backupccl/backup_planning.go b/pkg/ccl/backupccl/backup_planning.go index 00dbc5ceb5d2..d0d4faf9e2d3 100644 --- a/pkg/ccl/backupccl/backup_planning.go +++ b/pkg/ccl/backupccl/backup_planning.go @@ -1224,41 +1224,42 @@ func backupPlanHook( if backupStmt.Options.Detached { // When running inside an explicit transaction, we simply create the job // record. We do not wait for the job to finish. - aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( - ctx, jr, p.ExtendedEvalContext().Txn) + jobID := p.ExecCfg().JobRegistry.MakeJobID() + _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( + ctx, jr, jobID, p.ExtendedEvalContext().Txn) if err != nil { return err } - if err := doWriteBackupManifestCheckpoint(ctx, *aj.ID()); err != nil { + if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil { return err } // The protect timestamp logic for a DETACHED BACKUP can be run within the // same txn as the BACKUP is being planned in, because we do not wait for // the BACKUP job to complete. - err = protectTimestampForBackup(ctx, p, p.ExtendedEvalContext().Txn, *aj.ID(), spans, + err = protectTimestampForBackup(ctx, p, p.ExtendedEvalContext().Txn, jobID, spans, startTime, endTime, backupDetails) if err != nil { return err } - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))} + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))} collectTelemetry() return nil } var sj *jobs.StartableJob + jobID := p.ExecCfg().JobRegistry.MakeJobID() if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn) - if err != nil { + if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil { return err } - if err := doWriteBackupManifestCheckpoint(ctx, *sj.ID()); err != nil { + if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil { return err } - return protectTimestampForBackup(ctx, p, txn, *sj.ID(), spans, startTime, endTime, + return protectTimestampForBackup(ctx, p, txn, jobID, spans, startTime, endTime, backupDetails) }); err != nil { if sj != nil { diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 955459c130d4..0e7f6e67bba4 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -1766,7 +1766,7 @@ func (r *restoreResumer) dropDescriptors( Progress: jobspb.SchemaChangeGCProgress{}, NonCancelable: true, } - if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil { + if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, jr.MakeJobID(), txn); err != nil { return err } diff --git a/pkg/ccl/backupccl/restore_planning.go b/pkg/ccl/backupccl/restore_planning.go index 6a2896f4c6a9..ce73f7877264 100644 --- a/pkg/ccl/backupccl/restore_planning.go +++ b/pkg/ccl/backupccl/restore_planning.go @@ -1755,23 +1755,21 @@ func doRestorePlan( if restoreStmt.Options.Detached { // When running in detached mode, we simply create the job record. // We do not wait for the job to finish. - aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( - ctx, jr, p.ExtendedEvalContext().Txn) + jobID := p.ExecCfg().JobRegistry.MakeJobID() + _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( + ctx, jr, jobID, p.ExtendedEvalContext().Txn) if err != nil { return err } - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))} + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))} collectTelemetry() return nil } var sj *jobs.StartableJob + jobID := p.ExecCfg().JobRegistry.MakeJobID() if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn) - if err != nil { - return err - } - return nil + return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr) }); err != nil { if sj != nil { if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil { diff --git a/pkg/ccl/backupccl/restore_schema_change_creation.go b/pkg/ccl/backupccl/restore_schema_change_creation.go index e9523a6428f7..17f542349a8c 100644 --- a/pkg/ccl/backupccl/restore_schema_change_creation.go +++ b/pkg/ccl/backupccl/restore_schema_change_creation.go @@ -146,11 +146,11 @@ func createTypeChangeJobFromDesc( // Type change jobs are not cancellable. NonCancelable: true, } - job, err := jr.CreateJobWithTxn(ctx, record, txn) - if err != nil { + jobID := jr.MakeJobID() + if _, err := jr.CreateJobWithTxn(ctx, record, jobID, txn); err != nil { return err } - log.Infof(ctx, "queued new type schema change job %d for type %d", *job.ID(), typ.GetID()) + log.Infof(ctx, "queued new type schema change job %d for type %d", jobID, typ.GetID()) return nil } @@ -201,18 +201,18 @@ func createSchemaChangeJobsFromMutations( }, Progress: jobspb.SchemaChangeProgress{}, } - newJob, err := jr.CreateJobWithTxn(ctx, jobRecord, txn) - if err != nil { + jobID := jr.MakeJobID() + if _, err := jr.CreateJobWithTxn(ctx, jobRecord, jobID, txn); err != nil { return err } newMutationJob := descpb.TableDescriptor_MutationJob{ MutationID: mutationID, - JobID: *newJob.ID(), + JobID: jobID, } mutationJobs = append(mutationJobs, newMutationJob) log.Infof(ctx, "queued new schema change job %d for table %d, mutation %d", - *newJob.ID(), tableDesc.ID, mutationID) + jobID, tableDesc.ID, mutationID) } tableDesc.MutationJobs = mutationJobs return nil diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 02f04b0b17fd..8f6b00538874 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -40,6 +40,7 @@ go_library( "//pkg/kv/kvserver", "//pkg/kv/kvserver/closedts", "//pkg/kv/kvserver/protectedts", + "//pkg/kv/kvserver/protectedts/ptpb:ptpb_go_proto", "//pkg/roachpb", "//pkg/security", "//pkg/server/telemetry", diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 5d516ca91705..bbd77a51885d 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" @@ -327,13 +328,17 @@ func changefeedPlanHook( // changeFrontier.manageProtectedTimestamps for more details on the handling of // protected timestamps. var sj *jobs.StartableJob + jobID := p.ExecCfg().JobRegistry.MakeJobID() { var protectedTimestampID uuid.UUID var spansToProtect []roachpb.Span + var ptr *ptpb.Record if hasInitialScan := initialScanFromOptions(details.Opts); hasInitialScan { protectedTimestampID = uuid.MakeV4() spansToProtect = makeSpansToProtect(p.ExecCfg().Codec, details.Targets) progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID + ptr = jobsprotectedts.MakeRecord(protectedTimestampID, jobID, + statementTime, spansToProtect) } jr := jobs.Record{ @@ -348,19 +353,15 @@ func changefeedPlanHook( Details: details, Progress: *progress.GetChangefeed(), } - createJobAndProtectedTS := func(ctx context.Context, txn *kv.Txn) (err error) { - sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn) - if err != nil { + if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil { return err } - if protectedTimestampID == uuid.Nil { - return nil + if ptr != nil { + return p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, ptr) } - ptr := jobsprotectedts.MakeRecord(protectedTimestampID, *sj.ID(), - statementTime, spansToProtect) - return p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, ptr) - } - if err := p.ExecCfg().DB.Txn(ctx, createJobAndProtectedTS); err != nil { + return nil + }); err != nil { if sj != nil { if err := sj.CleanupOnRollback(ctx); err != nil { log.Warningf(ctx, "failed to cleanup aborted job: %v", err) @@ -392,7 +393,7 @@ func changefeedPlanHook( case <-ctx.Done(): return ctx.Err() case resultsCh <- tree.Datums{ - tree.NewDInt(tree.DInt(*sj.ID())), + tree.NewDInt(tree.DInt(jobID)), }: return nil } diff --git a/pkg/ccl/importccl/import_stmt.go b/pkg/ccl/importccl/import_stmt.go index 70e333b31231..ce6cd2d8aa6c 100644 --- a/pkg/ccl/importccl/import_stmt.go +++ b/pkg/ccl/importccl/import_stmt.go @@ -914,30 +914,31 @@ func importPlanHook( if isDetached { // When running inside an explicit transaction, we simply create the job // record. We do not wait for the job to finish. - aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( - ctx, jr, p.ExtendedEvalContext().Txn) + jobID := p.ExecCfg().JobRegistry.MakeJobID() + _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( + ctx, jr, jobID, p.ExtendedEvalContext().Txn) if err != nil { return err } - if err = protectTimestampForImport(ctx, p, p.ExtendedEvalContext().Txn, *aj.ID(), spansToProtect, + if err = protectTimestampForImport(ctx, p, p.ExtendedEvalContext().Txn, jobID, spansToProtect, walltime, importDetails); err != nil { return err } addToFileFormatTelemetry(format.Format.String(), "started") - resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))} + resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))} return nil } var sj *jobs.StartableJob + jobID := p.ExecCfg().JobRegistry.MakeJobID() if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn) - if err != nil { + if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil { return err } - return protectTimestampForImport(ctx, p, txn, *sj.ID(), spansToProtect, walltime, importDetails) + return protectTimestampForImport(ctx, p, txn, jobID, spansToProtect, walltime, importDetails) }); err != nil { if sj != nil { if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil { @@ -1915,7 +1916,8 @@ func (r *importResumer) dropTables( Progress: jobspb.SchemaChangeGCProgress{}, NonCancelable: true, } - if _, err := execCfg.JobRegistry.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil { + if _, err := execCfg.JobRegistry.CreateJobWithTxn( + ctx, gcJobRecord, execCfg.JobRegistry.MakeJobID(), txn); err != nil { return err } diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index 52b7a595ed9d..dc8dd5a6dcf6 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -4716,7 +4716,7 @@ func TestImportControlJobRBAC(t *testing.T) { }) startLeasedJob := func(t *testing.T, record jobs.Record) *jobs.StartableJob { - job, err := registry.CreateAndStartJob(ctx, nil, record) + job, err := registry.TestingCreateAndStartJob(ctx, nil, record) require.NoError(t, err) return job } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 9560d657859c..cc44f5c622a0 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -63,7 +63,7 @@ func TestStreamIngestionJobRollBack(t *testing.T) { }, Progress: jobspb.StreamIngestionProgress{}, } - j, err := registry.CreateAndStartJob(ctx, nil, streamIngestJobRecord) + j, err := registry.TestingCreateAndStartJob(ctx, nil, streamIngestJobRecord) require.NoError(t, err) // Insert more data in the table. These changes should be rollback during job diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index 2aaed0cf2b88..7641ae4cd221 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -98,9 +98,9 @@ func ingestionPlanHook( } var sj *jobs.StartableJob + jobID := p.ExecCfg().JobRegistry.MakeJobID() if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn) - return err + return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr) }); err != nil { if sj != nil { if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil { diff --git a/pkg/ccl/streamingccl/streamingutils/utils_test.go b/pkg/ccl/streamingccl/streamingutils/utils_test.go index f4b89fd1100e..d33fb3cbf89b 100644 --- a/pkg/ccl/streamingccl/streamingutils/utils_test.go +++ b/pkg/ccl/streamingccl/streamingutils/utils_test.go @@ -50,7 +50,7 @@ func TestCutoverBuiltin(t *testing.T) { }, Progress: jobspb.StreamIngestionProgress{}, } - job, err := registry.CreateAndStartJob(ctx, nil, streamIngestJobRecord) + job, err := registry.TestingCreateAndStartJob(ctx, nil, streamIngestJobRecord) require.NoError(t, err) // Check that sentinel is not set. diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index 5d87bf825a19..598f95505bfd 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -88,6 +88,7 @@ go_test( "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvserver", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/security", diff --git a/pkg/jobs/deprecated.go b/pkg/jobs/deprecated.go index 286821c025a9..c998cafa85cb 100644 --- a/pkg/jobs/deprecated.go +++ b/pkg/jobs/deprecated.go @@ -377,11 +377,6 @@ func (r *Registry) deprecatedResume(ctx context.Context, resumer Resumer, job *J func (j *Job) deprecatedInsert( ctx context.Context, txn *kv.Txn, id int64, lease *jobspb.Lease, session sqlliveness.Session, ) error { - if j.id != nil { - // Already created - do nothing. - return nil - } - j.mu.payload.Lease = lease if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error { diff --git a/pkg/jobs/helpers_test.go b/pkg/jobs/helpers_test.go index f0fc7b58b76d..0c720946fe80 100644 --- a/pkg/jobs/helpers_test.go +++ b/pkg/jobs/helpers_test.go @@ -15,7 +15,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/errors" ) // FakeResumer calls optional callbacks during the job lifecycle. @@ -69,10 +68,7 @@ func (j *Job) Started(ctx context.Context) error { // Created is a test only function that inserts a new jobs table row. func (j *Job) Created(ctx context.Context) error { - if j.ID() != nil { - return errors.Errorf("job already created with ID %v", *j.ID()) - } - return j.deprecatedInsert(ctx, nil /* txn */, j.registry.makeJobID(), nil /* lease */, nil /* session */) + return j.deprecatedInsert(ctx, nil /* txn */, *j.ID(), nil /* lease */, nil /* session */) } // Paused is a wrapper around the internal function that moves a job to the diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 22961cc1c40a..44d6878061e3 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server" @@ -315,7 +316,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -337,7 +338,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -358,7 +359,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -392,7 +393,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -430,7 +431,7 @@ func TestRegistryLifecycle(t *testing.T) { rts := registryTestSuite{} rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -457,7 +458,7 @@ func TestRegistryLifecycle(t *testing.T) { rts := registryTestSuite{} rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -483,7 +484,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -513,7 +514,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -552,7 +553,7 @@ func TestRegistryLifecycle(t *testing.T) { rts := registryTestSuite{} rts.setUp(t) defer rts.tearDown() - job, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + job, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -658,7 +659,7 @@ func TestRegistryLifecycle(t *testing.T) { rts.setUp(t) defer rts.tearDown() - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -689,7 +690,7 @@ func TestRegistryLifecycle(t *testing.T) { // Make marking success fail. rts.successErr = errors.New("injected failure at marking as succeeded") - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -722,7 +723,7 @@ func TestRegistryLifecycle(t *testing.T) { // Make marking success fail. rts.successErr = errors.New("resume failed") - j, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + j, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) if err != nil { t.Fatal(err) } @@ -758,7 +759,7 @@ func TestRegistryLifecycle(t *testing.T) { return nil } - job, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + job, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) require.NoError(t, err) rts.job = job @@ -793,7 +794,7 @@ func TestRegistryLifecycle(t *testing.T) { return errors.New("boom") } - job, err := rts.registry.CreateAndStartJob(rts.ctx, nil, rts.mockJob) + job, err := rts.registry.TestingCreateAndStartJob(rts.ctx, nil, rts.mockJob) require.NoError(t, err) rts.job = job @@ -829,7 +830,7 @@ func TestJobLifecycle(t *testing.T) { createJob := func(record jobs.Record) (*jobs.Job, expectation) { beforeTime := timeutil.Now() - job := registry.NewJob(record) + job := registry.NewJob(record, registry.MakeJobID()) if err := job.Created(ctx); err != nil { t.Fatal(err) } @@ -871,7 +872,7 @@ func TestJobLifecycle(t *testing.T) { startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) { beforeTime := timeutil.Now() - job, err := registry.CreateAndStartJob(ctx, nil, record) + job, err := registry.TestingCreateAndStartJob(ctx, nil, record) if err != nil { t.Fatal(err) } @@ -960,7 +961,7 @@ func TestJobLifecycle(t *testing.T) { Before: timeutil.Now(), Error: "Buzz Lightyear can't fly", } - buzzJob := registry.NewJob(buzzRecord) + buzzJob := registry.NewJob(buzzRecord, registry.MakeJobID()) if err := buzzJob.Created(ctx); err != nil { t.Fatal(err) @@ -1195,7 +1196,7 @@ func TestJobLifecycle(t *testing.T) { job := registry.NewJob(jobs.Record{ Details: 42, - }) + }, registry.MakeJobID()) _ = job.Created(ctx) }) @@ -1203,9 +1204,9 @@ func TestJobLifecycle(t *testing.T) { job := registry.NewJob(jobs.Record{ Details: jobspb.RestoreDetails{}, Progress: jobspb.RestoreProgress{}, - }) - if err := job.Started(ctx); !testutils.IsError(err, "job not created") { - t.Fatalf("expected 'job not created' error, but got %v", err) + }, registry.MakeJobID()) + if err := job.Started(ctx); !testutils.IsError(err, "not found in system.jobs table") { + t.Fatalf("unexpected error %v", err) } }) @@ -1386,14 +1387,15 @@ func TestJobLifecycle(t *testing.T) { t.Run("job with created by fields", func(t *testing.T) { createdByType := "internal_test" + jobID := registry.MakeJobID() job := registry.NewJob(jobs.Record{ Details: jobspb.RestoreDetails{}, Progress: jobspb.RestoreProgress{}, CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123}, - }) + }, jobID) require.NoError(t, job.Created(ctx)) - loadedJob, err := registry.LoadJob(ctx, *job.ID()) + loadedJob, err := registry.LoadJob(ctx, jobID) require.NoError(t, err) require.NotNil(t, loadedJob.CreatedBy()) require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy()) @@ -1836,7 +1838,7 @@ func TestShowJobWhenComplete(t *testing.T) { t.Run("show job", func(t *testing.T) { // Start a job and cancel it so it is in state finished and then query it with // SHOW JOB WHEN COMPLETE. - job, err := registry.CreateAndStartJob(ctx, nil, mockJob) + job, err := registry.TestingCreateAndStartJob(ctx, nil, mockJob) if err != nil { t.Fatal(err) } @@ -1874,7 +1876,7 @@ func TestShowJobWhenComplete(t *testing.T) { // query still blocks until the second job is also canceled. var jobs [2]*jobs.StartableJob for i := range jobs { - job, err := registry.CreateAndStartJob(ctx, nil, mockJob) + job, err := registry.TestingCreateAndStartJob(ctx, nil, mockJob) if err != nil { t.Fatal(err) } @@ -2111,11 +2113,12 @@ func TestStartableJobMixedVersion(t *testing.T) { return jobs.FakeResumer{} }) var j *jobs.StartableJob + jobID := jr.MakeJobID() require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - j, err = jr.CreateStartableJobWithTxn(ctx, jobs.Record{ + err = jr.CreateStartableJobWithTxn(ctx, &j, jobID, txn, jobs.Record{ Details: jobspb.ImportDetails{}, Progress: jobspb.ImportProgress{}, - }, txn) + }) return err })) _, err = sqlDB.Exec("SET CLUSTER SETTING version = crdb_internal.node_executable_version()") @@ -2161,9 +2164,9 @@ func TestStartableJob(t *testing.T) { Progress: jobspb.RestoreProgress{}, } createStartableJob := func(t *testing.T) (sj *jobs.StartableJob) { + jobID := jr.MakeJobID() require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - sj, err = jr.CreateStartableJobWithTxn(ctx, rec, txn) - return err + return jr.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, rec) })) return sj } @@ -2180,14 +2183,16 @@ func TestStartableJob(t *testing.T) { defer func() { require.NoError(t, txn.Rollback(ctx)) }() - sj, err := jr.CreateStartableJobWithTxn(ctx, rec, txn) + var sj *jobs.StartableJob + err := jr.CreateStartableJobWithTxn(ctx, &sj, jr.MakeJobID(), txn, rec) require.NoError(t, err) err = sj.Start(ctx) require.Regexp(t, `cannot resume .* job which is not committed`, err) }) t.Run("Start called with aborted txn", func(t *testing.T) { txn := db.NewTxn(ctx, "test") - sj, err := jr.CreateStartableJobWithTxn(ctx, rec, txn) + var sj *jobs.StartableJob + err := jr.CreateStartableJobWithTxn(ctx, &sj, jr.MakeJobID(), txn, rec) require.NoError(t, err) require.NoError(t, txn.Rollback(ctx)) err = sj.Start(ctx) @@ -2198,7 +2203,8 @@ func TestStartableJob(t *testing.T) { defer func() { require.NoError(t, txn.Rollback(ctx)) }() - sj, err := jr.CreateStartableJobWithTxn(ctx, rec, txn) + var sj *jobs.StartableJob + err := jr.CreateStartableJobWithTxn(ctx, &sj, jr.MakeJobID(), txn, rec) require.NoError(t, err) err = sj.CleanupOnRollback(ctx) require.Regexp(t, `cannot call CleanupOnRollback for a StartableJob with a non-finalized transaction`, err) @@ -2210,7 +2216,8 @@ func TestStartableJob(t *testing.T) { }) t.Run("CleanupOnRollback positive case", func(t *testing.T) { txn := db.NewTxn(ctx, "test") - sj, err := jr.CreateStartableJobWithTxn(ctx, rec, txn) + var sj *jobs.StartableJob + err := jr.CreateStartableJobWithTxn(ctx, &sj, jr.MakeJobID(), txn, rec) require.NoError(t, err) require.NoError(t, txn.Rollback(ctx)) require.NoError(t, sj.CleanupOnRollback(ctx)) @@ -2220,7 +2227,8 @@ func TestStartableJob(t *testing.T) { }) t.Run("Cancel", func(t *testing.T) { txn := db.NewTxn(ctx, "test") - sj, err := jr.CreateStartableJobWithTxn(ctx, rec, txn) + var sj *jobs.StartableJob + err := jr.CreateStartableJobWithTxn(ctx, &sj, jr.MakeJobID(), txn, rec) require.NoError(t, err) require.NoError(t, txn.Commit(ctx)) require.NoError(t, sj.Cancel(ctx)) @@ -2268,9 +2276,9 @@ func TestStartableJob(t *testing.T) { return nil }) clientResults := make(chan tree.Datums) + jobID := jr.MakeJobID() require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - sj, err = jr.CreateStartableJobWithTxn(ctx, rec, txn) - return err + return jr.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, rec) })) return sj, clientResults, blockResume, cleanup } @@ -2310,6 +2318,55 @@ func TestStartableJob(t *testing.T) { }) } +// TestStartableJobTxnRetry tests that in the presence of transaction retries, +// StartableJobs created in the transaction are correctly registered exactly +// once. +func TestStartableJobTxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer jobs.ResetConstructors()() + + ctx := context.Background() + + const txnName = "create job" + haveInjectedRetry := false + params := base.TestServerArgs{} + params.Knobs.Store = &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, r roachpb.BatchRequest) *roachpb.Error { + if r.Txn == nil || r.Txn.Name != txnName { + return nil + } + if _, ok := r.GetArg(roachpb.EndTxn); ok { + if !haveInjectedRetry { + haveInjectedRetry = true + // Force a retry error the first time. + return roachpb.NewError(roachpb.NewTransactionRetryError(roachpb.RETRY_REASON_UNKNOWN, "injected error")) + } + } + return nil + }, + } + s, _, db := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + jr := s.JobRegistry().(*jobs.Registry) + jobs.RegisterConstructor(jobspb.TypeRestore, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { + return jobs.FakeResumer{} + }) + rec := jobs.Record{ + Details: jobspb.RestoreDetails{}, + Progress: jobspb.RestoreProgress{}, + } + + jobID := jr.MakeJobID() + var sj *jobs.StartableJob + require.NoError(t, db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetDebugName(txnName) + return jr.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, rec) + })) + require.True(t, haveInjectedRetry) + require.NoError(t, sj.Start(ctx)) +} + // TestUnmigratedSchemaChangeJobs tests that schema change jobs created in 19.2 // that have not undergone a migration cannot be adopted, canceled, or paused. func TestUnmigratedSchemaChangeJobs(t *testing.T) { @@ -2355,7 +2412,7 @@ func TestUnmigratedSchemaChangeJobs(t *testing.T) { }) t.Run("pause not supported", func(t *testing.T) { - job, err := registry.CreateJobWithTxn(ctx, rec, nil /* txn */) + job, err := registry.CreateJobWithTxn(ctx, rec, registry.MakeJobID(), nil) if err != nil { t.Fatal(err) } @@ -2365,7 +2422,7 @@ func TestUnmigratedSchemaChangeJobs(t *testing.T) { }) t.Run("cancel not supported", func(t *testing.T) { - job, err := registry.CreateJobWithTxn(ctx, rec, nil /* txn */) + job, err := registry.CreateJobWithTxn(ctx, rec, registry.MakeJobID(), nil) if err != nil { t.Fatal(err) } @@ -2405,7 +2462,8 @@ func TestRegistryTestingNudgeAdoptionQueue(t *testing.T) { } }) before := timeutil.Now() - j, err := registry.CreateAdoptableJobWithTxn(ctx, rec, nil /* txn */) + jobID := registry.MakeJobID() + _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, jobID, nil /* txn */) require.NoError(t, err) registry.TestingNudgeAdoptionQueue() // We want the job to be resumed very rapidly. We set this long timeout of 2s @@ -2417,7 +2475,7 @@ func TestRegistryTestingNudgeAdoptionQueue(t *testing.T) { case <-time.After(aLongTime): t.Fatal("job was not adopted") } - loaded, err := registry.LoadJob(ctx, *j.ID()) + loaded, err := registry.LoadJob(ctx, jobID) require.NoError(t, err) started := timeutil.Unix(0, loaded.Payload().StartedMicros*1000) require.True(t, started.After(before), @@ -2494,7 +2552,7 @@ func TestMetrics(t *testing.T) { Details: jobspb.BackupDetails{}, Progress: jobspb.BackupProgress{}, } - _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, nil /* txn */) + _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, registry.MakeJobID(), nil /* txn */) require.NoError(t, err) errCh := <-resuming backupMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeBackup] @@ -2512,7 +2570,8 @@ func TestMetrics(t *testing.T) { } importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport] - j, err := registry.CreateAdoptableJobWithTxn(ctx, rec, nil /* txn */) + jobID := registry.MakeJobID() + _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, jobID, nil /* txn */) require.NoError(t, err) { // Fail the Resume with a retriable error. @@ -2527,7 +2586,7 @@ func TestMetrics(t *testing.T) { // We'll pause the job this time around and make sure it stops running. <-resuming require.Equal(t, int64(1), importMetrics.CurrentlyRunning.Value()) - require.NoError(t, registry.PauseRequested(ctx, nil, *j.ID())) + require.NoError(t, registry.PauseRequested(ctx, nil, jobID)) int64EqSoon(t, importMetrics.ResumeRetryError.Count, 2) require.Equal(t, int64(0), importMetrics.ResumeFailed.Count()) require.Equal(t, int64(0), importMetrics.ResumeCompleted.Count()) @@ -2535,7 +2594,7 @@ func TestMetrics(t *testing.T) { } { // Now resume the job and let it succeed. - require.NoError(t, registry.Unpause(ctx, nil, *j.ID())) + require.NoError(t, registry.Unpause(ctx, nil, jobID)) errCh := <-resuming require.Equal(t, int64(1), importMetrics.CurrentlyRunning.Value()) errCh <- nil @@ -2552,7 +2611,7 @@ func TestMetrics(t *testing.T) { } importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport] - _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, nil /* txn */) + _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, registry.MakeJobID(), nil /* txn */) require.NoError(t, err) { // Fail the Resume with a permanent error. @@ -2587,7 +2646,8 @@ func TestMetrics(t *testing.T) { } importMetrics := registry.MetricsStruct().JobMetrics[jobspb.TypeImport] - j, err := registry.CreateAdoptableJobWithTxn(ctx, rec, nil /* txn */) + jobID := registry.MakeJobID() + _, err := registry.CreateAdoptableJobWithTxn(ctx, rec, jobID, nil /* txn */) require.NoError(t, err) { // Fail the Resume with a retriable error. @@ -2602,7 +2662,7 @@ func TestMetrics(t *testing.T) { // We'll pause the job this time around and make sure it stops running. <-resuming require.Equal(t, int64(1), importMetrics.CurrentlyRunning.Value()) - require.NoError(t, registry.PauseRequested(ctx, nil, *j.ID())) + require.NoError(t, registry.PauseRequested(ctx, nil, jobID)) int64EqSoon(t, importMetrics.FailOrCancelRetryError.Count, 1) require.Equal(t, int64(1), importMetrics.ResumeFailed.Count()) require.Equal(t, int64(0), importMetrics.ResumeCompleted.Count()) @@ -2610,7 +2670,7 @@ func TestMetrics(t *testing.T) { } { // Now resume the job and let it succeed. - require.NoError(t, registry.Unpause(ctx, nil, *j.ID())) + require.NoError(t, registry.Unpause(ctx, nil, jobID)) errCh := <-resuming require.Equal(t, int64(1), importMetrics.CurrentlyRunning.Value()) errCh <- nil @@ -2667,7 +2727,7 @@ func TestLoseLeaseDuringExecution(t *testing.T) { } }) - _, err := registry.CreateJobWithTxn(ctx, rec, nil /* txn */) + _, err := registry.CreateJobWithTxn(ctx, rec, registry.MakeJobID(), nil) require.NoError(t, err) registry.TestingNudgeAdoptionQueue() require.Regexp(t, `expected session '\w+' but found NULL`, <-resumed) diff --git a/pkg/jobs/jobsprotectedts/jobs_protected_ts_test.go b/pkg/jobs/jobsprotectedts/jobs_protected_ts_test.go index 20e56b37fa8e..27b8d570dada 100644 --- a/pkg/jobs/jobsprotectedts/jobs_protected_ts_test.go +++ b/pkg/jobs/jobsprotectedts/jobs_protected_ts_test.go @@ -72,11 +72,12 @@ func TestJobsProtectedTimestamp(t *testing.T) { } mkJobAndRecord := func() (j *jobs.Job, rec *ptpb.Record) { ts := s0.Clock().Now() + jobID := jr.MakeJobID() require.NoError(t, s0.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - if j, err = jr.CreateJobWithTxn(ctx, mkJobRec(), txn); err != nil { + if j, err = jr.CreateJobWithTxn(ctx, mkJobRec(), jobID, txn); err != nil { return err } - rec = jobsprotectedts.MakeRecord(uuid.MakeV4(), *j.ID(), ts, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}) + rec = jobsprotectedts.MakeRecord(uuid.MakeV4(), jobID, ts, []roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}) return ptp.Protect(ctx, txn, rec) })) return j, rec diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 1f22558588c0..14bf61b81ae0 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -281,22 +281,28 @@ func (r *Registry) makeCtx() (context.Context, func()) { return context.WithCancel(r.ac.AnnotateCtx(context.Background())) } -func (r *Registry) makeJobID() int64 { +// MakeJobID generates a new job ID. +func (r *Registry) MakeJobID() int64 { return int64(builtins.GenerateUniqueInt(r.nodeID.SQLInstanceID())) } -// CreateAndStartJob creates and asynchronously starts a job from record. An -// error is returned if the job type has not been registered with +// TestingCreateAndStartJob creates and asynchronously starts a job from record. +// An error is returned if the job type has not been registered with // RegisterConstructor. The ctx passed to this function is not the context the // job will be started with (canceling ctx will not cause the job to cancel). -func (r *Registry) CreateAndStartJob( +func (r *Registry) TestingCreateAndStartJob( ctx context.Context, resultsCh chan<- tree.Datums, record Record, ) (*StartableJob, error) { var rj *StartableJob + jobID := r.MakeJobID() if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { - rj, err = r.CreateStartableJobWithTxn(ctx, record, txn) - return err + return r.CreateStartableJobWithTxn(ctx, &rj, jobID, txn, record) }); err != nil { + if rj != nil { + if cleanupErr := rj.CleanupOnRollback(ctx); cleanupErr != nil { + log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr) + } + } return nil, err } err := rj.Start(ctx) @@ -390,8 +396,9 @@ func (r *Registry) Run(ctx context.Context, ex sqlutil.InternalExecutor, jobs [] } // NewJob creates a new Job. -func (r *Registry) NewJob(record Record) *Job { +func (r *Registry) NewJob(record Record, jobID int64) *Job { job := &Job{ + id: &jobID, registry: r, createdBy: record.CreatedBy, } @@ -413,8 +420,10 @@ func (r *Registry) NewJob(record Record) *Job { // CreateJobWithTxn creates a job to be started later with StartJob. It stores // the job in the jobs table, marks it pending and gives the current node a // lease. -func (r *Registry) CreateJobWithTxn(ctx context.Context, record Record, txn *kv.Txn) (*Job, error) { - j := r.NewJob(record) +func (r *Registry) CreateJobWithTxn( + ctx context.Context, record Record, jobID int64, txn *kv.Txn, +) (*Job, error) { + j := r.NewJob(record, jobID) s, err := r.sqlInstance.Session(ctx) if errors.Is(err, sqlliveness.NotStartedError) { @@ -431,14 +440,13 @@ func (r *Registry) CreateJobWithTxn(ctx context.Context, record Record, txn *kv. // TODO(spaskob): remove in 20.2 as this code path is only needed while // migrating to 20.2 cluster. if err := j.deprecatedInsert( - ctx, txn, r.makeJobID(), r.deprecatedNewLease(), s, + ctx, txn, jobID, r.deprecatedNewLease(), s, ); err != nil { return nil, err } return j, nil } j.sessionID = s.ID() - jobID := r.makeJobID() start := timeutil.Now() if txn != nil { start = txn.ReadTimestamp().GoTime() @@ -459,7 +467,6 @@ VALUES ($1, $2, $3, $4, $5, $6)`, jobID, StatusRunning, payloadBytes, progressBy return nil, err } - j.id = &jobID return j, nil } @@ -468,16 +475,16 @@ const invalidNodeID = 0 // CreateAdoptableJobWithTxn creates a job which will be adopted for execution // at a later time by some node in the cluster. func (r *Registry) CreateAdoptableJobWithTxn( - ctx context.Context, record Record, txn *kv.Txn, + ctx context.Context, record Record, jobID int64, txn *kv.Txn, ) (*Job, error) { - j := r.NewJob(record) + j := r.NewJob(record, jobID) // We create a job record with an invalid lease to force the registry (on some node // in the cluster) to adopt this job at a later time. lease := &jobspb.Lease{NodeID: invalidNodeID} if err := j.deprecatedInsert( - ctx, txn, r.makeJobID(), lease, nil, + ctx, txn, jobID, lease, nil, ); err != nil { return nil, err } @@ -497,59 +504,90 @@ func (r *Registry) CreateAdoptableJobWithTxn( // committed, the caller must explicitly Start it. If the transaction is rolled // back then the caller must call CleanupOnRollback to unregister the job from // the Registry. +// +// When used in a closure that is retryable in the presence of transaction +// restarts, the job ID must be stable across retries to avoid leaking tracing +// spans and registry entries. The intended usage is to define the ID and +// *StartableJob outside the closure. The StartableJob referred to will remain +// the same if the method is called with the same job ID and has already been +// initialized with a tracing span and registered; otherwise, a new one will be +// allocated, and sj will point to it. The point is to ensure that the tracing +// span is created and the job registered exactly once, if and only if the +// transaction commits. This is a fragile API. func (r *Registry) CreateStartableJobWithTxn( - ctx context.Context, record Record, txn *kv.Txn, -) (*StartableJob, error) { - j, err := r.CreateJobWithTxn(ctx, record, txn) + ctx context.Context, sj **StartableJob, jobID int64, txn *kv.Txn, record Record, +) error { + alreadyInitialized := *sj != nil + if alreadyInitialized { + if jobID != *(*sj).Job.ID() { + log.Fatalf(ctx, + "attempted to rewrite startable job for ID %d with unexpected ID %d", + *(*sj).Job.ID(), jobID, + ) + } + } + + j, err := r.CreateJobWithTxn(ctx, record, jobID, txn) if err != nil { - return nil, err + return err } resumer, err := r.createResumer(j, r.settings) if err != nil { - return nil, err - } - // Construct a context which contains a tracing span that follows from the - // span in the parent context. We don't directly use the parent span because - // we want independent lifetimes and cancellation. For the same reason, we - // don't use the Context returned by ForkSpan. - resumerCtx, cancel := r.makeCtx() - _, span := tracing.ForkSpan(ctx, "job") - if span != nil { - resumerCtx = tracing.ContextWithSpan(resumerCtx, span) - - // This trace span unfortunately is sometimes never finished. - // As a hack/workaround, finish it now so that it leaves the - // tracer registry. - // - // Remove this when this issue is fixed: - // https://github.com/cockroachdb/cockroach/issues/60671 - span.Finish() + return err } - if r.startUsingSQLLivenessAdoption(ctx) { - r.mu.Lock() - defer r.mu.Unlock() - if _, alreadyRegistered := r.mu.adoptedJobs[*j.ID()]; alreadyRegistered { - log.Fatalf(ctx, "job %d: was just created but found in registered adopted jobs", *j.ID()) + var resumerCtx context.Context + var cancel func() + var span *tracing.Span + var execDone chan struct{} + if !alreadyInitialized { + // Construct a context which contains a tracing span that follows from the + // span in the parent context. We don't directly use the parent span because + // we want independent lifetimes and cancellation. For the same reason, we + // don't use the Context returned by ForkSpan. + resumerCtx, cancel = r.makeCtx() + _, span = tracing.ForkSpan(ctx, "job") + if span != nil { + resumerCtx = tracing.ContextWithSpan(resumerCtx, span) + + // This trace span unfortunately is sometimes never finished. + // As a hack/workaround, finish it now so that it leaves the + // tracer registry. + // + // Remove this when this issue is fixed: + // https://github.com/cockroachdb/cockroach/issues/60671 + span.Finish() } - r.mu.adoptedJobs[*j.ID()] = &adoptedJob{sid: j.sessionID, cancel: cancel} - } else { - // TODO(spaskob): remove in 20.2 as this code path is only needed while - // migrating to 20.2 cluster. - if err := r.deprecatedRegister(*j.ID(), cancel); err != nil { - return nil, err + + if r.startUsingSQLLivenessAdoption(ctx) { + r.mu.Lock() + defer r.mu.Unlock() + if _, alreadyRegistered := r.mu.adoptedJobs[jobID]; alreadyRegistered { + log.Fatalf(ctx, "job %d: was just created but found in registered adopted jobs", jobID) + } + r.mu.adoptedJobs[jobID] = &adoptedJob{sid: j.sessionID, cancel: cancel} + } else { + // TODO(spaskob): remove in 20.2 as this code path is only needed while + // migrating to 20.2 cluster. + if err := r.deprecatedRegister(jobID, cancel); err != nil { + return err + } } + + execDone = make(chan struct{}) } - return &StartableJob{ - Job: j, - txn: txn, - resumer: resumer, - resumerCtx: resumerCtx, - cancel: cancel, - span: span, - execDone: make(chan struct{}), - }, nil + if !alreadyInitialized { + *sj = &StartableJob{} + (*sj).resumerCtx = resumerCtx + (*sj).cancel = cancel + (*sj).span = span + (*sj).execDone = execDone + } + (*sj).Job = j + (*sj).resumer = resumer + (*sj).txn = txn + return nil } // LoadJob loads an existing job with the given jobID from the system.jobs diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index a507fc6f2ad0..b64d5c9c6e25 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -54,17 +54,18 @@ func TestRoundtripJob(t *testing.T) { registry := s.JobRegistry().(*jobs.Registry) defer s.Stopper().Stop(ctx) + jobID := registry.MakeJobID() storedJob := registry.NewJob(jobs.Record{ Description: "beep boop", Username: security.MakeSQLUsernameFromPreNormalizedString("robot"), DescriptorIDs: descpb.IDs{42}, Details: jobspb.RestoreDetails{}, Progress: jobspb.RestoreProgress{}, - }) + }, jobID) if err := storedJob.Created(ctx); err != nil { t.Fatal(err) } - retrievedJob, err := registry.LoadJob(ctx, *storedJob.ID()) + retrievedJob, err := registry.LoadJob(ctx, jobID) if err != nil { t.Fatal(err) } @@ -177,7 +178,7 @@ func TestRegistryResumeExpiredLease(t *testing.T) { Details: jobspb.BackupDetails{}, Progress: jobspb.BackupProgress{}, } - job, err := newRegistry(nodeid).CreateAndStartJob(ctx, nil, rec) + job, err := newRegistry(nodeid).TestingCreateAndStartJob(ctx, nil, rec) if err != nil { t.Fatal(err) } diff --git a/pkg/jobs/schedule_control_test.go b/pkg/jobs/schedule_control_test.go index c04daa17a3f6..7848ac252b08 100644 --- a/pkg/jobs/schedule_control_test.go +++ b/pkg/jobs/schedule_control_test.go @@ -150,7 +150,7 @@ func TestJobsControlForSchedules(t *testing.T) { // Create few jobs not started by any schedule. for i := 0; i < numJobs; i++ { - require.NoError(t, registry.NewJob(record).Created(context.Background())) + require.NoError(t, registry.NewJob(record, registry.MakeJobID()).Created(context.Background())) } var scheduleID int64 = 123 @@ -179,7 +179,8 @@ func TestJobsControlForSchedules(t *testing.T) { Name: CreatedByScheduledJobs, ID: scheduleID, } - newJob := registry.NewJob(record) + jobID := registry.MakeJobID() + newJob := registry.NewJob(record, jobID) require.NoError(t, newJob.Created(context.Background())) if tc.command == "resume" { @@ -188,7 +189,7 @@ func TestJobsControlForSchedules(t *testing.T) { // We can't just pause the job (since it will stay in pause-requested state forever). // So, just force set job status to paused. th.sqlDB.Exec(t, "UPDATE system.jobs SET status=$1 WHERE id=$2", StatusPaused, - *newJob.ID()) + jobID) } } } @@ -269,9 +270,10 @@ func TestFilterJobsControlForSchedules(t *testing.T) { Name: CreatedByScheduledJobs, ID: scheduleID, } - newJob := registry.NewJob(record) + jobID := registry.MakeJobID() + newJob := registry.NewJob(record, jobID) require.NoError(t, newJob.Created(context.Background())) - th.sqlDB.Exec(t, "UPDATE system.jobs SET status=$1 WHERE id=$2", status, *newJob.ID()) + th.sqlDB.Exec(t, "UPDATE system.jobs SET status=$1 WHERE id=$2", status, jobID) } jobControl := fmt.Sprintf(tc.command+" JOBS FOR SCHEDULE %d", scheduleID) diff --git a/pkg/migration/migrationmanager/manager.go b/pkg/migration/migrationmanager/manager.go index af23abca15df..63bebc64e429 100644 --- a/pkg/migration/migrationmanager/manager.go +++ b/pkg/migration/migrationmanager/manager.go @@ -265,6 +265,7 @@ func (m *Manager) runMigration( func (m *Manager) getOrCreateMigrationJob( ctx context.Context, user security.SQLUsername, version clusterversion.ClusterVersion, ) (alreadyCompleted bool, jobID int64, _ error) { + newJobID := m.jr.MakeJobID() if err := m.c.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { alreadyCompleted, err = migrationjob.CheckIfMigrationCompleted(ctx, txn, m.ie, version) if alreadyCompleted || err != nil { @@ -278,13 +279,9 @@ func (m *Manager) getOrCreateMigrationJob( if found { return nil } - var j *jobs.Job - j, err = m.jr.CreateJobWithTxn(ctx, migrationjob.NewRecord(version, user), txn) - if err != nil { - return err - } - jobID = *j.ID() - return nil + jobID = newJobID + _, err = m.jr.CreateJobWithTxn(ctx, migrationjob.NewRecord(version, user), jobID, txn) + return err }); err != nil { return false, 0, err } diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index c1fd1e409e2e..ede44234f709 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -2116,7 +2116,8 @@ func TestJobStatusResponse(t *testing.T) { require.Nil(t, response) ctx := context.Background() - job, err := ts.JobRegistry().(*jobs.Registry).CreateJobWithTxn( + jr := ts.JobRegistry().(*jobs.Registry) + job, err := jr.CreateJobWithTxn( ctx, jobs.Record{ Description: "testing", @@ -2140,6 +2141,7 @@ func TestJobStatusResponse(t *testing.T) { Progress: jobspb.ImportProgress{}, DescriptorIDs: []descpb.ID{1, 2, 3}, }, + jr.MakeJobID(), nil) if err != nil { t.Fatal(err) diff --git a/pkg/sql/create_stats.go b/pkg/sql/create_stats.go index 3c5dffb6512d..5df143c04664 100644 --- a/pkg/sql/create_stats.go +++ b/pkg/sql/create_stats.go @@ -136,8 +136,19 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da telemetry.Inc(sqltelemetry.CreateStatisticsUseCounter) } - job, err := n.p.ExecCfg().JobRegistry.CreateAndStartJob(ctx, resultsCh, *record) - if err != nil { + var job *jobs.StartableJob + jobID := n.p.ExecCfg().JobRegistry.MakeJobID() + if err := n.p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + return n.p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &job, jobID, txn, *record) + }); err != nil { + if job != nil { + if cleanupErr := job.CleanupOnRollback(ctx); cleanupErr != nil { + log.Warningf(ctx, "failed to cleanup StartableJob: %v", cleanupErr) + } + } + return err + } + if err := job.Start(ctx); err != nil { return err } @@ -146,7 +157,7 @@ func (n *createStatsNode) startJob(ctx context.Context, resultsCh chan<- tree.Da // Delete the job so users don't see it and get confused by the error. const stmt = `DELETE FROM system.jobs WHERE id = $1` if _ /* cols */, delErr := n.p.ExecCfg().InternalExecutor.Exec( - ctx, "delete-job", nil /* txn */, stmt, *job.ID(), + ctx, "delete-job", nil /* txn */, stmt, jobID, ); delErr != nil { log.Warningf(ctx, "failed to delete job: %v", delErr) } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 1f0ad1228e44..8b52dbd96e5d 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -2728,7 +2728,7 @@ func (dsp *DistSQLPlanner) createPhysPlanForPlanNode( if err != nil { return nil, err } - job := n.p.ExecCfg().JobRegistry.NewJob(*record) + job := n.p.ExecCfg().JobRegistry.NewJob(*record, 0) plan, err = dsp.createPlanForCreateStats(planCtx, job) } diff --git a/pkg/sql/gcjob_test/gc_job_test.go b/pkg/sql/gcjob_test/gc_job_test.go index 710883a2fb65..07dd341370ac 100644 --- a/pkg/sql/gcjob_test/gc_job_test.go +++ b/pkg/sql/gcjob_test/gc_job_test.go @@ -199,7 +199,7 @@ func TestSchemaChangeGCJob(t *testing.T) { } resultsCh := make(chan tree.Datums) - job, err := jobRegistry.CreateAndStartJob(ctx, resultsCh, jobRecord) + job, err := jobRegistry.TestingCreateAndStartJob(ctx, resultsCh, jobRecord) if err != nil { t.Fatal(err) } @@ -367,7 +367,7 @@ func TestGCResumer(t *testing.T) { } resultsCh := make(chan tree.Datums) - sj, err := jobRegistry.CreateAndStartJob(ctx, resultsCh, record) + sj, err := jobRegistry.TestingCreateAndStartJob(ctx, resultsCh, record) require.NoError(t, err) require.NoError(t, sj.AwaitCompletion(ctx)) job, err := jobRegistry.LoadJob(ctx, *sj.ID()) @@ -394,7 +394,7 @@ func TestGCResumer(t *testing.T) { } resultsCh := make(chan tree.Datums) - sj, err := jobRegistry.CreateAndStartJob(ctx, resultsCh, record) + sj, err := jobRegistry.TestingCreateAndStartJob(ctx, resultsCh, record) require.NoError(t, err) _, err = sqlDB.Exec("ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;") @@ -429,7 +429,7 @@ func TestGCResumer(t *testing.T) { } resultsCh := make(chan tree.Datums) - sj, err := jobRegistry.CreateAndStartJob(ctx, resultsCh, record) + sj, err := jobRegistry.TestingCreateAndStartJob(ctx, resultsCh, record) require.NoError(t, err) require.Error(t, sj.AwaitCompletion(ctx)) }) diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index 640abe66433e..02c6271e6d96 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -488,6 +488,7 @@ INSERT INTO foo VALUES (1), (10), (100); ResumeSpans: []roachpb.Span{span}, } } + jobID := jr.MakeJobID() j, err = jr.CreateAdoptableJobWithTxn(ctx, jobs.Record{ Description: "testing", Statement: "testing", @@ -500,15 +501,15 @@ INSERT INTO foo VALUES (1), (10), (100); ResumeSpanList: resumeSpanList, }, Progress: jobspb.SchemaChangeGCProgress{}, - }, txn) + }, jobID, txn) if err != nil { return err } mut.MutationJobs = append(mut.MutationJobs, descpb.TableDescriptor_MutationJob{ - JobID: *j.ID(), + JobID: jobID, MutationID: 1, }) - jobToBlock.Store(*j.ID()) + jobToBlock.Store(jobID) mut.MaybeIncrementVersion() table = mut.ImmutableCopy().(catalog.TableDescriptor) return descriptors.WriteDesc(ctx, false /* kvTrace */, mut, txn) diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index e5369ee367ec..13d5aab26ad2 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -112,15 +112,17 @@ func (evalCtx *extendedEvalContext) copy() *extendedEvalContext { func (evalCtx *extendedEvalContext) QueueJob( ctx context.Context, record jobs.Record, ) (*jobs.Job, error) { + jobID := evalCtx.ExecCfg.JobRegistry.MakeJobID() job, err := evalCtx.ExecCfg.JobRegistry.CreateJobWithTxn( ctx, record, + jobID, evalCtx.Txn, ) if err != nil { return nil, err } - *evalCtx.Jobs = append(*evalCtx.Jobs, *job.ID()) + *evalCtx.Jobs = append(*evalCtx.Jobs, jobID) return job, nil } diff --git a/pkg/sql/row/expr_walker_test.go b/pkg/sql/row/expr_walker_test.go index 9cc6876161a3..15a24e0042e5 100644 --- a/pkg/sql/row/expr_walker_test.go +++ b/pkg/sql/row/expr_walker_test.go @@ -65,7 +65,7 @@ func createMockImportJob( ResumePos: []int64{resumePos}, }, } - mockImportJob, err := registry.CreateJobWithTxn(ctx, mockImportRecord, nil /* txn */) + mockImportJob, err := registry.CreateJobWithTxn(ctx, mockImportRecord, registry.MakeJobID(), nil) require.NoError(t, err) return mockImportJob } diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index b777081ff07f..33e8445364f0 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -480,13 +480,14 @@ func startGCJob( details jobspb.SchemaChangeGCDetails, ) error { jobRecord := CreateGCJobRecord(schemaChangeDescription, username, details) + jobID := jobRegistry.MakeJobID() if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - _, err := jobRegistry.CreateJobWithTxn(ctx, jobRecord, txn) + _, err := jobRegistry.CreateJobWithTxn(ctx, jobRecord, jobID, txn) return err }); err != nil { return err } - // TODO (lucy): Add logging once we create the job ID outside the txn closure. + log.Infof(ctx, "starting GC job %d", jobID) return jobRegistry.NotifyToAdoptJobs(ctx) } @@ -840,6 +841,7 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er // Check if the target table needs to be cleaned up at all. If the target // table was in the ADD state and the schema change failed, then we need to // clean up the descriptor. + gcJobID := sc.jobRegistry.MakeJobID() if err := sc.txn(ctx, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error { scTable, err := descsCol.GetMutableTableVersionByID(ctx, sc.descID, txn) if err != nil { @@ -877,14 +879,14 @@ func (sc *SchemaChanger) rollbackSchemaChange(ctx context.Context, err error) er }, }, ) - if _, err := sc.jobRegistry.CreateJobWithTxn(ctx, jobRecord, txn); err != nil { + if _, err := sc.jobRegistry.CreateJobWithTxn(ctx, jobRecord, gcJobID, txn); err != nil { return err } return txn.Run(ctx, b) }); err != nil { return err } - // TODO (lucy): Add logging once we create the job ID outside the txn closure. + log.Infof(ctx, "starting GC job %d", gcJobID) return sc.jobRegistry.NotifyToAdoptJobs(ctx) } @@ -981,11 +983,11 @@ func (sc *SchemaChanger) createIndexGCJob( } gcJobRecord := CreateGCJobRecord(jobDesc, sc.job.Payload().UsernameProto.Decode(), indexGCDetails) - indexGCJob, err := sc.jobRegistry.CreateJobWithTxn(ctx, gcJobRecord, txn) - if err != nil { + jobID := sc.jobRegistry.MakeJobID() + if _, err := sc.jobRegistry.CreateJobWithTxn(ctx, gcJobRecord, jobID, txn); err != nil { return err } - log.Infof(ctx, "created index GC job %d", *indexGCJob.ID()) + log.Infof(ctx, "created index GC job %d", jobID) return nil } @@ -2445,14 +2447,14 @@ func (sc *SchemaChanger) queueCleanupJobs( Progress: jobspb.SchemaChangeProgress{}, NonCancelable: true, } - job, err := sc.jobRegistry.CreateJobWithTxn(ctx, jobRecord, txn) - if err != nil { + jobID := sc.jobRegistry.MakeJobID() + if _, err := sc.jobRegistry.CreateJobWithTxn(ctx, jobRecord, jobID, txn); err != nil { return err } - log.Infof(ctx, "created job %d to drop previous columns and indexes", *job.ID()) + log.Infof(ctx, "created job %d to drop previous columns and indexes", jobID) scDesc.MutationJobs = append(scDesc.MutationJobs, descpb.TableDescriptor_MutationJob{ MutationID: mutationID, - JobID: *job.ID(), + JobID: jobID, }) } return nil diff --git a/pkg/sql/tenant.go b/pkg/sql/tenant.go index 6106c41b7f4f..fae7a1367b9a 100644 --- a/pkg/sql/tenant.go +++ b/pkg/sql/tenant.go @@ -319,7 +319,8 @@ func GCTenantJob( Progress: jobspb.SchemaChangeGCProgress{}, NonCancelable: true, } - if _, err := execCfg.JobRegistry.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil { + if _, err := execCfg.JobRegistry.CreateJobWithTxn( + ctx, gcJobRecord, execCfg.JobRegistry.MakeJobID(), txn); err != nil { return err } return nil diff --git a/pkg/sql/truncate.go b/pkg/sql/truncate.go index 8c7321e982b2..348b0f25bbdd 100644 --- a/pkg/sql/truncate.go +++ b/pkg/sql/truncate.go @@ -254,7 +254,8 @@ func (p *planner) truncateTable( details.InterleavedIndexes = droppedInterleaves } record := CreateGCJobRecord(jobDesc, p.User(), details) - if _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(ctx, record, p.txn); err != nil { + if _, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn( + ctx, record, p.ExecCfg().JobRegistry.MakeJobID(), p.txn); err != nil { return err }