diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index c9221790902d..ffaff7411be8 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -327,15 +327,7 @@ func (r *Registry) resumeJob( return err } if !exists { - // 23.1.3 could finalize an upgrade but leave some jobs behind with rows - // not copied to info table. If we get here, try backfilling the info - // table for this job in the txn and proceed if it succeeds. - fixedPayload, err := infoStorage.BackfillLegacyPayload(ctx) - if err != nil { - return errors.Wrap(err, "job payload not found in system.job_info") - } - log.Infof(ctx, "fixed missing payload info for job %d", jobID) - payloadBytes = fixedPayload + return errors.Wrap(&JobNotFoundError{jobID: jobID}, "job payload not found in system.job_info") } if err := protoutil.Unmarshal(payloadBytes, payload); err != nil { return err @@ -346,12 +338,7 @@ func (r *Registry) resumeJob( return err } if !exists { - fixedProgress, err := infoStorage.BackfillLegacyProgress(ctx) - if err != nil { - return errors.Wrap(err, "job progress not found in system.job_info") - } - log.Infof(ctx, "fixed missing progress info for job %d", jobID) - progressBytes = fixedProgress + return errors.Wrap(&JobNotFoundError{jobID: jobID}, "job progress not found in system.job_info") } return protoutil.Unmarshal(progressBytes, progress) }); err != nil { diff --git a/pkg/jobs/job_info_storage.go b/pkg/jobs/job_info_storage.go index 5fdad0af0c9f..d0d0566b1ec6 100644 --- a/pkg/jobs/job_info_storage.go +++ b/pkg/jobs/job_info_storage.go @@ -13,7 +13,6 @@ package jobs import ( "bytes" "context" - "fmt" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -24,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" - "github.com/cockroachdb/redact" ) // InfoStorage can be used to read and write rows to system.job_info table. All @@ -320,38 +318,3 @@ func (i InfoStorage) GetLegacyProgress(ctx context.Context) ([]byte, bool, error func (i InfoStorage) WriteLegacyProgress(ctx context.Context, progress []byte) error { return i.Write(ctx, LegacyProgressKey, progress) } - -// BackfillLegacyPayload copies a legacy payload from system.jobs. #104798. -func (i InfoStorage) BackfillLegacyPayload(ctx context.Context) ([]byte, error) { - return i.backfillMissing(ctx, "payload") -} - -// BackfillLegacyProgress copies a legacy progress from system.jobs. #104798. -func (i InfoStorage) BackfillLegacyProgress(ctx context.Context) ([]byte, error) { - return i.backfillMissing(ctx, "progress") -} - -func (i InfoStorage) backfillMissing(ctx context.Context, kind string) ([]byte, error) { - row, err := i.txn.QueryRowEx( - ctx, fmt.Sprintf("job-info-fix-%s", kind), i.txn.KV(), - sessiondata.NodeUserSessionDataOverride, - `INSERT INTO system.job_info (job_id, info_key, value) - SELECT id, 'legacy_`+kind+`', `+kind+` FROM system.jobs WHERE id = $1 AND `+kind+` IS NOT NULL - RETURNING value`, - i.j.ID(), - ) - - if err != nil { - return nil, err - } - - if row == nil { - return nil, errors.Wrapf(&JobNotFoundError{jobID: i.j.ID()}, "job %s not found in system.jobs", redact.SafeString(kind)) - } - - value, ok := row[0].(*tree.DBytes) - if !ok { - return nil, errors.AssertionFailedf("job info: expected value to be DBytes (was %T)", row[0]) - } - return []byte(*value), nil -} diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 69fab5b7e2d1..65a3cc0efaa9 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -1618,7 +1618,7 @@ func TestJobLifecycle(t *testing.T) { }, registry.MakeJobID(), txn) return errors.New("boom") })) - if err := job.Started(ctx); !testutils.IsError(err, "job payload not found in system.jobs") { + if err := job.Started(ctx); !testutils.IsError(err, "not found in system.jobs table") { t.Fatalf("unexpected error %v", err) } }) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 833ae134a16d..fa420d9bd15f 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1861,13 +1861,6 @@ func (r *Registry) MarkIdle(job *Job, isIdle bool) { } } -// TestingForgetJob causes the registry to forget it has adopted a job. -func (r *Registry) TestingForgetJob(id jobspb.JobID) { - r.mu.Lock() - defer r.mu.Unlock() - delete(r.mu.adoptedJobs, id) -} - func (r *Registry) cancelAllAdoptedJobs() { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/jobs/update.go b/pkg/jobs/update.go index a2b37fe4365f..55b61aaf2c8f 100644 --- a/pkg/jobs/update.go +++ b/pkg/jobs/update.go @@ -108,33 +108,7 @@ func (u Updater) update(ctx context.Context, useReadLock bool, updateFn UpdateFn return err } if row == nil { - // Maybe try to fix a row missed by 23.1.3 backfill and re-read. #104798. - if u.j.registry.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) { - i := j.InfoStorage(u.txn) - - _, err := i.BackfillLegacyPayload(ctx) - if err != nil { - return errors.Wrap(err, "failed to backfill job info payload during update") - } - - _, err = i.BackfillLegacyProgress(ctx) - if err != nil { - return errors.Wrap(err, "failed to backfill job info progress during update") - } - - row, err = u.txn.QueryRowEx( - ctx, "select-job", u.txn.KV(), - sessiondata.RootUserSessionDataOverride, - getSelectStmtForJobUpdate(ctx, j.session != nil, useReadLock, u.j.registry.settings.Version), j.ID(), - ) - - if err != nil { - return err - } - } - if row == nil { - return errors.Errorf("not found in system.jobs table") - } + return errors.Errorf("not found in system.jobs table") } if status, err = unmarshalStatus(row[0]); err != nil { diff --git a/pkg/upgrade/upgrades/backfill_job_info_table_migration.go b/pkg/upgrade/upgrades/backfill_job_info_table_migration.go index 63c5875813ac..5df0d554df78 100644 --- a/pkg/upgrade/upgrades/backfill_job_info_table_migration.go +++ b/pkg/upgrade/upgrades/backfill_job_info_table_migration.go @@ -45,15 +45,9 @@ const ( backfillJobInfoProgressStmt = backfillJobInfoSharedPrefix + jobs.LegacyProgressKey + `', progress` + backfillJobInfoSharedSuffix ) -// TestingSkipInfoBackfill is a testing hook. -var TestingSkipInfoBackfill bool - func backfillJobInfoTable( ctx context.Context, cs clusterversion.ClusterVersion, d upgrade.TenantDeps, ) error { - if TestingSkipInfoBackfill { - return nil - } for step, stmt := range []string{backfillJobInfoPayloadStmt, backfillJobInfoProgressStmt} { var resumeAfter int diff --git a/pkg/upgrade/upgrades/backfill_job_info_table_migration_test.go b/pkg/upgrade/upgrades/backfill_job_info_table_migration_test.go index c0cc6d3c2ce1..4aebc75d7356 100644 --- a/pkg/upgrade/upgrades/backfill_job_info_table_migration_test.go +++ b/pkg/upgrade/upgrades/backfill_job_info_table_migration_test.go @@ -12,9 +12,7 @@ package upgrades_test import ( "context" - "fmt" "testing" - "time" "github.com/cockroachdb/cockroach/pkg/base" _ "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" @@ -24,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -109,111 +106,3 @@ WHERE j.id = i.job_id AND (j.payload = i.value OR j.progress = i.value) AND (j.i `, [][]string{{"14"}}) } - -var _ jobs.Resumer = &fakeJob{} - -type fakeJob struct { - job *jobs.Job - ch1, ch2 chan<- string -} - -func (r *fakeJob) Resume(ctx context.Context, _ interface{}) error { - ch := r.ch1 - if r.job.Progress().Details.(*jobspb.Progress_Import).Import.ResumePos[0] == 2 { - ch = r.ch2 - } - select { - case ch <- fmt.Sprintf("%s %v", - r.job.Details().(jobspb.ImportDetails).BackupPath, - r.job.Progress().Details.(*jobspb.Progress_Import).Import.ResumePos): - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - -func (r *fakeJob) OnFailOrCancel(ctx context.Context, execCtx interface{}, _ error) error { - return nil -} - -func TestIncompleteBackfill(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - clusterArgs := base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Server: &server.TestingKnobs{ - DisableAutomaticVersionUpgrade: make(chan struct{}), - BootstrapVersionKeyOverride: clusterversion.V22_2, - BinaryVersionOverride: clusterversion.ByKey(clusterversion.V22_2), - }}}, - } - - ctx := context.Background() - tc := testcluster.StartTestCluster(t, 1, clusterArgs) - defer tc.Stopper().Stop(ctx) - - r := tc.Server(0).JobRegistry().(*jobs.Registry) - sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - - ch1 := make(chan string, 1) - ch2 := make(chan string, 1) - - jobs.RegisterConstructor( - jobspb.TypeImport, - func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return &fakeJob{job: job, ch1: ch1, ch2: ch2} }, - jobs.UsesTenantCostControl, - ) - - var adoptedJob, runningJob *jobs.StartableJob - runningID, adoptedID := jobspb.JobID(5550001), jobspb.JobID(5550002) - require.NoError(t, tc.Server(0).InternalDB().(isql.DB).Txn(ctx, func( - ctx context.Context, txn isql.Txn, - ) (err error) { - - if err := r.CreateStartableJobWithTxn(ctx, &adoptedJob, adoptedID, txn, jobs.Record{ - Username: username.RootUserName(), - Details: jobspb.ImportDetails{BackupPath: "adopted"}, - Progress: jobspb.ImportProgress{ResumePos: []int64{1}}, - }); err != nil { - return err - } - - if err := r.CreateStartableJobWithTxn(ctx, &runningJob, runningID, txn, jobs.Record{ - Username: username.RootUserName(), - Details: jobspb.ImportDetails{BackupPath: "running"}, - Progress: jobspb.ImportProgress{ResumePos: []int64{2}}, - }); err != nil { - return err - } - return nil - })) - - upgrades.TestingSkipInfoBackfill = true - defer func() { - upgrades.TestingSkipInfoBackfill = false - }() - - sqlDB.Exec(t, "SET CLUSTER SETTING version = $1", clusterversion.ByKey(clusterversion.V23_1).String()) - r.TestingForgetJob(adoptedID) - r.NotifyToResume(ctx, adoptedID) - - require.NoError(t, runningJob.Start(ctx)) - ctx, cancel := context.WithTimeout(ctx, time.Second*5) - defer cancel() - require.NoError(t, runningJob.AwaitCompletion(ctx)) - - select { - case res := <-ch1: - require.Equal(t, "adopted [1]", res) - case <-time.After(time.Second * 5): - t.Fatal("timed out waiting for job to run") - } - - select { - case res := <-ch2: - require.Equal(t, "running [2]", res) - case <-time.After(time.Second * 5): - t.Fatal("timed out waiting for job to run") - } - -}