From f1c81b0f011cd37a0e1c4a2e671eefca93db4e66 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 7 Nov 2023 17:39:34 +0000 Subject: [PATCH] jobs: only force jobs.MaybeGenerateForcedRetryableError in 23.1 Release note: none. Epic: none. --- .../streamingccl/replicationutils/BUILD.bazel | 1 + pkg/ccl/streamingccl/replicationutils/utils.go | 9 +++++---- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 1 + .../streamingest/replication_stream_e2e_test.go | 8 ++++++-- .../streamingest/stream_ingestion_job.go | 7 ++++--- .../streamingest/stream_ingestion_processor.go | 5 ++++- .../streamingccl/streamproducer/producer_job.go | 2 +- pkg/jobs/job_info_storage.go | 10 ++++++---- pkg/jobs/job_info_storage_test.go | 2 +- pkg/jobs/jobs.go | 4 ++-- pkg/jobs/jobs_test.go | 2 +- pkg/jobs/jobsprofiler/profiler.go | 2 +- pkg/jobs/jobsprofiler/profiler_test.go | 2 +- pkg/jobs/utils.go | 10 +++++++--- pkg/server/autoconfig/BUILD.bazel | 1 + pkg/server/autoconfig/auto_config_env_runner.go | 8 ++++---- pkg/server/autoconfig/auto_config_task.go | 4 ++-- pkg/server/autoconfig/task_markers.go | 17 +++++++++-------- pkg/sql/crdb_internal.go | 4 ++-- 19 files changed, 59 insertions(+), 40 deletions(-) diff --git a/pkg/ccl/streamingccl/replicationutils/BUILD.bazel b/pkg/ccl/streamingccl/replicationutils/BUILD.bazel index 1f7fe2da5498..9b3790854fc9 100644 --- a/pkg/ccl/streamingccl/replicationutils/BUILD.bazel +++ b/pkg/ccl/streamingccl/replicationutils/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/ccl/streamingccl/streamclient", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/kv/kvpb", diff --git a/pkg/ccl/streamingccl/replicationutils/utils.go b/pkg/ccl/streamingccl/replicationutils/utils.go index 26bc21eb6a37..1564d2bbbf35 100644 --- a/pkg/ccl/streamingccl/replicationutils/utils.go +++ b/pkg/ccl/streamingccl/replicationutils/utils.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -174,9 +175,9 @@ func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp { // LoadIngestionProgress loads the latest persisted stream ingestion progress. // The method returns nil if the progress does not exist yet. func LoadIngestionProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.StreamIngestionProgress, error) { - progress, err := jobs.LoadJobProgress(ctx, db, jobID) + progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv) if err != nil || progress == nil { return nil, err } @@ -192,9 +193,9 @@ func LoadIngestionProgress( // LoadReplicationProgress loads the latest persisted stream replication progress. // The method returns nil if the progress does not exist yet. func LoadReplicationProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.StreamReplicationProgress, error) { - progress, err := jobs.LoadJobProgress(ctx, db, jobID) + progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv) if err != nil || progress == nil { return nil, err } diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index 69e46cf7b5ec..d2f19963cd90 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/cloud", "//pkg/cloud/externalconn", "//pkg/cloud/externalconn/connectionpb", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobsprofiler", diff --git a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go index f4985ab2310f..44bf151fdd1b 100644 --- a/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go @@ -911,12 +911,16 @@ func TestLoadProducerAndIngestionProgress(t *testing.T) { c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID)) srcDB := c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB - producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID)) + producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID), + c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version, + ) require.NoError(t, err) require.Equal(t, jobspb.StreamReplicationProgress_NOT_FINISHED, producerProgress.StreamIngestionStatus) destDB := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB - ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID)) + ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID), + c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version, + ) require.NoError(t, err) require.Equal(t, jobspb.Replicating, ingestionProgress.ReplicationStatus) } diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 6d66e80072c4..c44e91392c61 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" @@ -312,7 +313,7 @@ func ingestWithRetries( log.Warningf(ctx, msgFmt, err) updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError, fmt.Sprintf(msgFmt, err)) - newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob) + newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob, execCtx.ExecCfg().Settings.Version) if lastReplicatedTime.Less(newReplicatedTime) { r.Reset() lastReplicatedTime = newReplicatedTime @@ -326,8 +327,8 @@ func ingestWithRetries( return nil } -func loadReplicatedTime(ctx context.Context, db isql.DB, ingestionJob *jobs.Job) hlc.Timestamp { - latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID()) +func loadReplicatedTime(ctx context.Context, db isql.DB, ingestionJob *jobs.Job, cv clusterversion.Handle) hlc.Timestamp { + latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID(), cv) if err != nil { log.Warningf(ctx, "error loading job progress: %s", err) return hlc.Timestamp{} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go index 21dd35ea9d39..2201c03d6a9b 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" @@ -270,6 +271,7 @@ func newStreamIngestionDataProcessor( cutoverProvider: &cutoverFromJobProgress{ jobID: jobspb.JobID(spec.JobID), db: flowCtx.Cfg.DB, + cv: flowCtx.Cfg.Settings.Version, }, cutoverCh: make(chan struct{}), closePoller: make(chan struct{}), @@ -1182,10 +1184,11 @@ type cutoverProvider interface { type cutoverFromJobProgress struct { db isql.DB jobID jobspb.JobID + cv clusterversion.Handle } func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) { - ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID) + ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID, c.cv) if err != nil { return false, err } diff --git a/pkg/ccl/streamingccl/streamproducer/producer_job.go b/pkg/ccl/streamingccl/streamproducer/producer_job.go index 7605107b4ecd..2a3029c045f2 100644 --- a/pkg/ccl/streamingccl/streamproducer/producer_job.go +++ b/pkg/ccl/streamingccl/streamproducer/producer_job.go @@ -95,7 +95,7 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er case <-p.timer.Ch(): p.timer.MarkRead() p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV())) - progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID()) + progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID(), execCfg.Settings.Version) if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterResumerJobLoad != nil { err = knobs.AfterResumerJobLoad(err) } diff --git a/pkg/jobs/job_info_storage.go b/pkg/jobs/job_info_storage.go index d749910f835c..04a0ddc5f2db 100644 --- a/pkg/jobs/job_info_storage.go +++ b/pkg/jobs/job_info_storage.go @@ -14,6 +14,7 @@ import ( "bytes" "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -30,18 +31,19 @@ import ( type InfoStorage struct { j *Job txn isql.Txn + cv clusterversion.Handle } // InfoStorage returns a new InfoStorage with the passed in job and txn. func (j *Job) InfoStorage(txn isql.Txn) InfoStorage { - return InfoStorage{j: j, txn: txn} + return InfoStorage{j: j, txn: txn, cv: j.registry.settings.Version} } // InfoStorageForJobID returns a new InfoStorage with the passed in // job ID and txn. It avoids loading the job record. The resulting // job_info writes will not check the job session ID. -func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID) InfoStorage { - return InfoStorage{j: &Job{id: jobID}, txn: txn} +func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID, cv clusterversion.Handle) InfoStorage { + return InfoStorage{j: &Job{id: jobID}, txn: txn, cv: cv} } func (i InfoStorage) checkClaimSession(ctx context.Context) error { @@ -236,7 +238,7 @@ func (i InfoStorage) Write(ctx context.Context, infoKey string, value []byte) er return errors.AssertionFailedf("missing value (infoKey %q)", infoKey) } if err := i.write(ctx, infoKey, value); err != nil { - return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err) + return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err, i.cv) } return nil } diff --git a/pkg/jobs/job_info_storage_test.go b/pkg/jobs/job_info_storage_test.go index cfe107e0b2f6..0dcfe578296f 100644 --- a/pkg/jobs/job_info_storage_test.go +++ b/pkg/jobs/job_info_storage_test.go @@ -384,7 +384,7 @@ func TestJobInfoUpgradeRegressionTests(t *testing.T) { require.NoError(t, err) err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, execCfg.Settings.Version) payloadBytes, _, err := infoStorage.Get(ctx, jobs.GetLegacyPayloadKey()) if err != nil { return err diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index bb7e2bd0225c..d1af85ebc6d3 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -1151,14 +1151,14 @@ func FormatRetriableExecutionErrorLogToStringArray( // LoadJobProgress returns the job progress from the info table. Note that the // progress can be nil if none is recorded. func LoadJobProgress( - ctx context.Context, db isql.DB, jobID jobspb.JobID, + ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle, ) (*jobspb.Progress, error) { var ( progressBytes []byte exists bool ) if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := InfoStorageForJob(txn, jobID) + infoStorage := InfoStorageForJob(txn, jobID, cv) var err error progressBytes, exists, err = infoStorage.GetLegacyProgress(ctx) return err diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 5231cc754b48..fa493a205740 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -3698,7 +3698,7 @@ func TestLoadJobProgress(t *testing.T) { _, err := r.CreateJobWithTxn(ctx, rec, 7, nil) require.NoError(t, err) - p, err := jobs.LoadJobProgress(ctx, s.InternalDB().(isql.DB), 7) + p, err := jobs.LoadJobProgress(ctx, s.InternalDB().(isql.DB), 7, s.ClusterSettings().Version) require.NoError(t, err) require.Equal(t, []float32{7.1}, p.GetDetails().(*jobspb.Progress_Import).Import.ReadProgress) } diff --git a/pkg/jobs/jobsprofiler/profiler.go b/pkg/jobs/jobsprofiler/profiler.go index 28a4d1bb23f0..a3edb20ca690 100644 --- a/pkg/jobs/jobsprofiler/profiler.go +++ b/pkg/jobs/jobsprofiler/profiler.go @@ -53,7 +53,7 @@ func StorePlanDiagram( } const infoKey = "dsp-diag-url-%d" - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, cv) return infoStorage.Write(ctx, fmt.Sprintf(infoKey, timeutil.Now().UnixNano()), []byte(diagURL.String())) }) diff --git a/pkg/jobs/jobsprofiler/profiler_test.go b/pkg/jobs/jobsprofiler/profiler_test.go index ae7d81b151a8..08de2b5c6281 100644 --- a/pkg/jobs/jobsprofiler/profiler_test.go +++ b/pkg/jobs/jobsprofiler/profiler_test.go @@ -82,7 +82,7 @@ func TestProfilerStorePlanDiagram(t *testing.T) { testutils.SucceedsSoon(t, func() error { var count int err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { - infoStorage := jobs.InfoStorageForJob(txn, jobID) + infoStorage := jobs.InfoStorageForJob(txn, jobID, execCfg.Settings.Version) return infoStorage.Iterate(ctx, "dsp-diag-url", func(infoKey string, value []byte) error { count++ return nil diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index 11f4ca568731..a1cfdd515eef 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -174,12 +174,16 @@ func isJobInfoTableDoesNotExistError(err error) bool { // txn is pushed to a higher timestamp at which the upgrade will have completed // and the table/column will be visible. The longer term fix is being tracked in // https://github.com/cockroachdb/cockroach/issues/106764. -func MaybeGenerateForcedRetryableError(ctx context.Context, txn *kv.Txn, err error) error { - if err != nil && isJobTypeColumnDoesNotExistError(err) { +func MaybeGenerateForcedRetryableError(ctx context.Context, txn *kv.Txn, err error, cv clusterversion.Handle) error { + if err == nil || !cv.IsActive(ctx, clusterversion.V23_1) { + return err + } + + if isJobTypeColumnDoesNotExistError(err) { return txn.GenerateForcedRetryableError(ctx, "synthetic error "+ "to push timestamp to after the `job_type` upgrade has run") } - if err != nil && isJobInfoTableDoesNotExistError(err) { + if isJobInfoTableDoesNotExistError(err) { return txn.GenerateForcedRetryableError(ctx, "synthetic error "+ "to push timestamp to after the `job_info` upgrade has run") } diff --git a/pkg/server/autoconfig/BUILD.bazel b/pkg/server/autoconfig/BUILD.bazel index 5385006c3ce9..719f9e348e8c 100644 --- a/pkg/server/autoconfig/BUILD.bazel +++ b/pkg/server/autoconfig/BUILD.bazel @@ -13,6 +13,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig", visibility = ["//visibility:public"], deps = [ + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/security/username", diff --git a/pkg/server/autoconfig/auto_config_env_runner.go b/pkg/server/autoconfig/auto_config_env_runner.go index 024c54d9c6ac..69f10b81f889 100644 --- a/pkg/server/autoconfig/auto_config_env_runner.go +++ b/pkg/server/autoconfig/auto_config_env_runner.go @@ -141,7 +141,7 @@ func (r *envRunner) maybeRunNextTask( err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (resErr error) { // Re-check if there any other started task already. - otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID) + otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) if err != nil { return err } @@ -156,7 +156,7 @@ func (r *envRunner) maybeRunNextTask( } // Find the latest completed task. - lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID) + lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) if err != nil { return err } @@ -185,7 +185,7 @@ func (r *envRunner) maybeRunNextTask( // maybeWaitForCurrentTaskJob(), which is an optimization. Storing // the job ID is not strictly required for sequencing the tasks. if err := writeStartMarker(ctx, txn, - InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID); err != nil { + InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID, execCfg.Settings.Version); err != nil { return errors.Wrapf(err, "unable to write start marker for task %d", nextTaskID) } @@ -214,7 +214,7 @@ func (r *envRunner) maybeWaitForCurrentTaskJob( if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { var err error - prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID) + prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID, execCfg.Settings.Version) return err }); err != nil { return errors.Wrap(err, "checking latest task job") diff --git a/pkg/server/autoconfig/auto_config_task.go b/pkg/server/autoconfig/auto_config_task.go index 9b313f488f5f..770d5315d595 100644 --- a/pkg/server/autoconfig/auto_config_task.go +++ b/pkg/server/autoconfig/auto_config_task.go @@ -78,7 +78,7 @@ func (r *taskRunner) OnFailOrCancel(ctx context.Context, execCtx interface{}, jo if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { return markTaskComplete(ctx, txn, InfoKeyTaskRef{Environment: r.envID, Task: r.task.TaskID}, - []byte("task error")) + []byte("task error"), execCfg.Settings.Version) }); err != nil { return err } @@ -175,7 +175,7 @@ func execSimpleSQL( log.Infof(ctx, "finished executing txn statements") return markTaskComplete(ctx, txn, InfoKeyTaskRef{Environment: envID, Task: taskID}, - []byte("task success")) + []byte("task success"), execCfg.Settings.Version) }) } diff --git a/pkg/server/autoconfig/task_markers.go b/pkg/server/autoconfig/task_markers.go index 2b861e8db18c..17071b42b255 100644 --- a/pkg/server/autoconfig/task_markers.go +++ b/pkg/server/autoconfig/task_markers.go @@ -16,6 +16,7 @@ import ( "strconv" "strings" + "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/sql/isql" @@ -141,9 +142,9 @@ func (tr *InfoKeyTaskRef) decodeInternal(prefix, infoKey string) error { // writeStartMarker writes a start marker for the given task ID and // also writes its job ID into the value part. func writeStartMarker( - ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, + ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, cv clusterversion.Handle, ) error { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) return infoStorage.Write(ctx, taskRef.EncodeStartMarkerKey(), []byte(strconv.FormatUint(uint64(jobID), 10))) @@ -152,9 +153,9 @@ func writeStartMarker( // getCurrentlyStartedTaskID retrieves the ID of the last task which // has a start marker in job_info. func getCurrentlyStartedTaskID( - ctx context.Context, txn isql.Txn, env EnvironmentID, + ctx context.Context, txn isql.Txn, env EnvironmentID, cv clusterversion.Handle, ) (prevTaskID TaskID, prevJobID jobspb.JobID, err error) { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) if err := infoStorage.GetLast(ctx, InfoKeyStartPrefix(env), @@ -184,9 +185,9 @@ func getCurrentlyStartedTaskID( // getLastCompletedTaskID retrieves the task ID of the last task which // has a completion marker in job_info. func getLastCompletedTaskID( - ctx context.Context, txn isql.Txn, env EnvironmentID, + ctx context.Context, txn isql.Txn, env EnvironmentID, cv clusterversion.Handle, ) (lastTaskID TaskID, err error) { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) if err := infoStorage.GetLast(ctx, InfoKeyCompletionPrefix(env), @@ -208,9 +209,9 @@ func getLastCompletedTaskID( // markTaskCompletes transactionally removes the task's start marker // and creates a completion marker. func markTaskComplete( - ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, + ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, cv clusterversion.Handle, ) error { - infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID) + infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv) // Remove the start marker. if err := infoStorage.Delete(ctx, taskRef.EncodeStartMarkerKey()); err != nil { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 291706d4eb71..050ba83817c4 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1064,7 +1064,7 @@ func populateSystemJobsTableRows( params..., ) if err != nil { - return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err) + return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err, p.execCfg.Settings.Version) } cleanup := func(ctx context.Context) { @@ -1077,7 +1077,7 @@ func populateSystemJobsTableRows( for { hasNext, err := it.Next(ctx) if !hasNext || err != nil { - return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err) + return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err, p.execCfg.Settings.Version) } currentRow := it.Cur()