Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: jobs: only force jobs.MaybeGenerateForcedRetryableError in 23.1 #113967

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/replicationutils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/streamingccl/streamclient",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/kv/kvpb",
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/streamingccl/replicationutils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand Down Expand Up @@ -174,9 +175,9 @@ func ReplicatedTimeFromProgress(p *jobspb.Progress) hlc.Timestamp {
// LoadIngestionProgress loads the latest persisted stream ingestion progress.
// The method returns nil if the progress does not exist yet.
func LoadIngestionProgress(
ctx context.Context, db isql.DB, jobID jobspb.JobID,
ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle,
) (*jobspb.StreamIngestionProgress, error) {
progress, err := jobs.LoadJobProgress(ctx, db, jobID)
progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv)
if err != nil || progress == nil {
return nil, err
}
Expand All @@ -192,9 +193,9 @@ func LoadIngestionProgress(
// LoadReplicationProgress loads the latest persisted stream replication progress.
// The method returns nil if the progress does not exist yet.
func LoadReplicationProgress(
ctx context.Context, db isql.DB, jobID jobspb.JobID,
ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle,
) (*jobspb.StreamReplicationProgress, error) {
progress, err := jobs.LoadJobProgress(ctx, db, jobID)
progress, err := jobs.LoadJobProgress(ctx, db, jobID, cv)
if err != nil || progress == nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/streamingccl/streamingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//pkg/cloud",
"//pkg/cloud/externalconn",
"//pkg/cloud/externalconn/connectionpb",
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobsprofiler",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -911,12 +911,16 @@ func TestLoadProducerAndIngestionProgress(t *testing.T) {
c.WaitUntilReplicatedTime(srcTime, jobspb.JobID(replicationJobID))

srcDB := c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB
producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID))
producerProgress, err := replicationutils.LoadReplicationProgress(ctx, srcDB, jobspb.JobID(producerJobID),
c.SrcSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version,
)
require.NoError(t, err)
require.Equal(t, jobspb.StreamReplicationProgress_NOT_FINISHED, producerProgress.StreamIngestionStatus)

destDB := c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).InternalDB
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID))
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, destDB, jobspb.JobID(replicationJobID),
c.DestSysServer.ExecutorConfig().(sql.ExecutorConfig).Settings.Version,
)
require.NoError(t, err)
require.Equal(t, jobspb.Replicating, ingestionProgress.ReplicationStatus)
}
7 changes: 4 additions & 3 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts"
Expand Down Expand Up @@ -312,7 +313,7 @@ func ingestWithRetries(
log.Warningf(ctx, msgFmt, err)
updateRunningStatus(ctx, ingestionJob, jobspb.ReplicationError,
fmt.Sprintf(msgFmt, err))
newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob)
newReplicatedTime := loadReplicatedTime(ctx, execCtx.ExecCfg().InternalDB, ingestionJob, execCtx.ExecCfg().Settings.Version)
if lastReplicatedTime.Less(newReplicatedTime) {
r.Reset()
lastReplicatedTime = newReplicatedTime
Expand All @@ -326,8 +327,8 @@ func ingestWithRetries(
return nil
}

func loadReplicatedTime(ctx context.Context, db isql.DB, ingestionJob *jobs.Job) hlc.Timestamp {
latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID())
func loadReplicatedTime(ctx context.Context, db isql.DB, ingestionJob *jobs.Job, cv clusterversion.Handle) hlc.Timestamp {
latestProgress, err := replicationutils.LoadIngestionProgress(ctx, db, ingestionJob.ID(), cv)
if err != nil {
log.Warningf(ctx, "error loading job progress: %s", err)
return hlc.Timestamp{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/replicationutils"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
Expand Down Expand Up @@ -270,6 +271,7 @@ func newStreamIngestionDataProcessor(
cutoverProvider: &cutoverFromJobProgress{
jobID: jobspb.JobID(spec.JobID),
db: flowCtx.Cfg.DB,
cv: flowCtx.Cfg.Settings.Version,
},
cutoverCh: make(chan struct{}),
closePoller: make(chan struct{}),
Expand Down Expand Up @@ -1182,10 +1184,11 @@ type cutoverProvider interface {
type cutoverFromJobProgress struct {
db isql.DB
jobID jobspb.JobID
cv clusterversion.Handle
}

func (c *cutoverFromJobProgress) cutoverReached(ctx context.Context) (bool, error) {
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID)
ingestionProgress, err := replicationutils.LoadIngestionProgress(ctx, c.db, c.jobID, c.cv)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamproducer/producer_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *producerJobResumer) Resume(ctx context.Context, execCtx interface{}) er
case <-p.timer.Ch():
p.timer.MarkRead()
p.timer.Reset(streamingccl.StreamReplicationStreamLivenessTrackFrequency.Get(execCfg.SV()))
progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID())
progress, err := replicationutils.LoadReplicationProgress(ctx, execCfg.InternalDB, p.job.ID(), execCfg.Settings.Version)
if knobs := execCfg.StreamingTestingKnobs; knobs != nil && knobs.AfterResumerJobLoad != nil {
err = knobs.AfterResumerJobLoad(err)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
Expand All @@ -30,18 +31,19 @@ import (
type InfoStorage struct {
j *Job
txn isql.Txn
cv clusterversion.Handle
}

// InfoStorage returns a new InfoStorage with the passed in job and txn.
func (j *Job) InfoStorage(txn isql.Txn) InfoStorage {
return InfoStorage{j: j, txn: txn}
return InfoStorage{j: j, txn: txn, cv: j.registry.settings.Version}
}

// InfoStorageForJobID returns a new InfoStorage with the passed in
// job ID and txn. It avoids loading the job record. The resulting
// job_info writes will not check the job session ID.
func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID) InfoStorage {
return InfoStorage{j: &Job{id: jobID}, txn: txn}
func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID, cv clusterversion.Handle) InfoStorage {
return InfoStorage{j: &Job{id: jobID}, txn: txn, cv: cv}
}

func (i InfoStorage) checkClaimSession(ctx context.Context) error {
Expand Down Expand Up @@ -236,7 +238,7 @@ func (i InfoStorage) Write(ctx context.Context, infoKey string, value []byte) er
return errors.AssertionFailedf("missing value (infoKey %q)", infoKey)
}
if err := i.write(ctx, infoKey, value); err != nil {
return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err)
return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err, i.cv)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/job_info_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TestJobInfoUpgradeRegressionTests(t *testing.T) {
require.NoError(t, err)

err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
infoStorage := jobs.InfoStorageForJob(txn, jobID, execCfg.Settings.Version)
payloadBytes, _, err := infoStorage.Get(ctx, jobs.GetLegacyPayloadKey())
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,14 +1151,14 @@ func FormatRetriableExecutionErrorLogToStringArray(
// LoadJobProgress returns the job progress from the info table. Note that the
// progress can be nil if none is recorded.
func LoadJobProgress(
ctx context.Context, db isql.DB, jobID jobspb.JobID,
ctx context.Context, db isql.DB, jobID jobspb.JobID, cv clusterversion.Handle,
) (*jobspb.Progress, error) {
var (
progressBytes []byte
exists bool
)
if err := db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := InfoStorageForJob(txn, jobID)
infoStorage := InfoStorageForJob(txn, jobID, cv)
var err error
progressBytes, exists, err = infoStorage.GetLegacyProgress(ctx)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3698,7 +3698,7 @@ func TestLoadJobProgress(t *testing.T) {
_, err := r.CreateJobWithTxn(ctx, rec, 7, nil)
require.NoError(t, err)

p, err := jobs.LoadJobProgress(ctx, s.InternalDB().(isql.DB), 7)
p, err := jobs.LoadJobProgress(ctx, s.InternalDB().(isql.DB), 7, s.ClusterSettings().Version)
require.NoError(t, err)
require.Equal(t, []float32{7.1}, p.GetDetails().(*jobspb.Progress_Import).Import.ReadProgress)
}
2 changes: 1 addition & 1 deletion pkg/jobs/jobsprofiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func StorePlanDiagram(
}

const infoKey = "dsp-diag-url-%d"
infoStorage := jobs.InfoStorageForJob(txn, jobID)
infoStorage := jobs.InfoStorageForJob(txn, jobID, cv)
return infoStorage.Write(ctx, fmt.Sprintf(infoKey, timeutil.Now().UnixNano()),
[]byte(diagURL.String()))
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/jobsprofiler/profiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestProfilerStorePlanDiagram(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
var count int
err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
infoStorage := jobs.InfoStorageForJob(txn, jobID)
infoStorage := jobs.InfoStorageForJob(txn, jobID, execCfg.Settings.Version)
return infoStorage.Iterate(ctx, "dsp-diag-url", func(infoKey string, value []byte) error {
count++
return nil
Expand Down
10 changes: 7 additions & 3 deletions pkg/jobs/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,16 @@ func isJobInfoTableDoesNotExistError(err error) bool {
// txn is pushed to a higher timestamp at which the upgrade will have completed
// and the table/column will be visible. The longer term fix is being tracked in
// https://github.com/cockroachdb/cockroach/issues/106764.
func MaybeGenerateForcedRetryableError(ctx context.Context, txn *kv.Txn, err error) error {
if err != nil && isJobTypeColumnDoesNotExistError(err) {
func MaybeGenerateForcedRetryableError(ctx context.Context, txn *kv.Txn, err error, cv clusterversion.Handle) error {
if err == nil || !cv.IsActive(ctx, clusterversion.V23_1) {
return err
}

if isJobTypeColumnDoesNotExistError(err) {
return txn.GenerateForcedRetryableError(ctx, "synthetic error "+
"to push timestamp to after the `job_type` upgrade has run")
}
if err != nil && isJobInfoTableDoesNotExistError(err) {
if isJobInfoTableDoesNotExistError(err) {
return txn.GenerateForcedRetryableError(ctx, "synthetic error "+
"to push timestamp to after the `job_info` upgrade has run")
}
Expand Down
1 change: 1 addition & 0 deletions pkg/server/autoconfig/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/server/autoconfig",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/security/username",
Expand Down
8 changes: 4 additions & 4 deletions pkg/server/autoconfig/auto_config_env_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (r *envRunner) maybeRunNextTask(

err = execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) (resErr error) {
// Re-check if there any other started task already.
otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID)
otherTaskID, _, err := getCurrentlyStartedTaskID(ctx, txn, r.envID, execCfg.Settings.Version)
if err != nil {
return err
}
Expand All @@ -156,7 +156,7 @@ func (r *envRunner) maybeRunNextTask(
}

// Find the latest completed task.
lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID)
lastTaskID, err := getLastCompletedTaskID(ctx, txn, r.envID, execCfg.Settings.Version)
if err != nil {
return err
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (r *envRunner) maybeRunNextTask(
// maybeWaitForCurrentTaskJob(), which is an optimization. Storing
// the job ID is not strictly required for sequencing the tasks.
if err := writeStartMarker(ctx, txn,
InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID); err != nil {
InfoKeyTaskRef{Environment: r.envID, Task: nextTaskID}, jobID, execCfg.Settings.Version); err != nil {
return errors.Wrapf(err, "unable to write start marker for task %d", nextTaskID)
}

Expand Down Expand Up @@ -214,7 +214,7 @@ func (r *envRunner) maybeWaitForCurrentTaskJob(

if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
var err error
prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID)
prevTaskID, prevJobID, err = getCurrentlyStartedTaskID(ctx, txn, r.envID, execCfg.Settings.Version)
return err
}); err != nil {
return errors.Wrap(err, "checking latest task job")
Expand Down
4 changes: 2 additions & 2 deletions pkg/server/autoconfig/auto_config_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *taskRunner) OnFailOrCancel(ctx context.Context, execCtx interface{}, jo
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
return markTaskComplete(ctx, txn,
InfoKeyTaskRef{Environment: r.envID, Task: r.task.TaskID},
[]byte("task error"))
[]byte("task error"), execCfg.Settings.Version)
}); err != nil {
return err
}
Expand Down Expand Up @@ -175,7 +175,7 @@ func execSimpleSQL(
log.Infof(ctx, "finished executing txn statements")
return markTaskComplete(ctx, txn,
InfoKeyTaskRef{Environment: envID, Task: taskID},
[]byte("task success"))
[]byte("task success"), execCfg.Settings.Version)
})
}

Expand Down
17 changes: 9 additions & 8 deletions pkg/server/autoconfig/task_markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
Expand Down Expand Up @@ -141,9 +142,9 @@ func (tr *InfoKeyTaskRef) decodeInternal(prefix, infoKey string) error {
// writeStartMarker writes a start marker for the given task ID and
// also writes its job ID into the value part.
func writeStartMarker(
ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID,
ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, jobID jobspb.JobID, cv clusterversion.Handle,
) error {
infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID)
infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv)
return infoStorage.Write(ctx,
taskRef.EncodeStartMarkerKey(),
[]byte(strconv.FormatUint(uint64(jobID), 10)))
Expand All @@ -152,9 +153,9 @@ func writeStartMarker(
// getCurrentlyStartedTaskID retrieves the ID of the last task which
// has a start marker in job_info.
func getCurrentlyStartedTaskID(
ctx context.Context, txn isql.Txn, env EnvironmentID,
ctx context.Context, txn isql.Txn, env EnvironmentID, cv clusterversion.Handle,
) (prevTaskID TaskID, prevJobID jobspb.JobID, err error) {
infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID)
infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv)

if err := infoStorage.GetLast(ctx,
InfoKeyStartPrefix(env),
Expand Down Expand Up @@ -184,9 +185,9 @@ func getCurrentlyStartedTaskID(
// getLastCompletedTaskID retrieves the task ID of the last task which
// has a completion marker in job_info.
func getLastCompletedTaskID(
ctx context.Context, txn isql.Txn, env EnvironmentID,
ctx context.Context, txn isql.Txn, env EnvironmentID, cv clusterversion.Handle,
) (lastTaskID TaskID, err error) {
infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID)
infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv)

if err := infoStorage.GetLast(ctx,
InfoKeyCompletionPrefix(env),
Expand All @@ -208,9 +209,9 @@ func getLastCompletedTaskID(
// markTaskCompletes transactionally removes the task's start marker
// and creates a completion marker.
func markTaskComplete(
ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte,
ctx context.Context, txn isql.Txn, taskRef InfoKeyTaskRef, completionValue []byte, cv clusterversion.Handle,
) error {
infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID)
infoStorage := jobs.InfoStorageForJob(txn, jobs.AutoConfigRunnerJobID, cv)

// Remove the start marker.
if err := infoStorage.Delete(ctx, taskRef.EncodeStartMarkerKey()); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ func populateSystemJobsTableRows(
params...,
)
if err != nil {
return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err)
return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err, p.execCfg.Settings.Version)
}

cleanup := func(ctx context.Context) {
Expand All @@ -1077,7 +1077,7 @@ func populateSystemJobsTableRows(
for {
hasNext, err := it.Next(ctx)
if !hasNext || err != nil {
return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err)
return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err, p.execCfg.Settings.Version)
}

currentRow := it.Cur()
Expand Down