Skip to content

Commit

Permalink
Revert "jobs: attempt to backfill missing info rows on the fly"
Browse files Browse the repository at this point in the history
This reverts commit 9d59881.
  • Loading branch information
dt committed Aug 22, 2023
1 parent a54a41e commit 99c8020
Show file tree
Hide file tree
Showing 7 changed files with 4 additions and 204 deletions.
17 changes: 2 additions & 15 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,7 @@ func (r *Registry) resumeJob(
return err
}
if !exists {
// 23.1.3 could finalize an upgrade but leave some jobs behind with rows
// not copied to info table. If we get here, try backfilling the info
// table for this job in the txn and proceed if it succeeds.
fixedPayload, err := infoStorage.BackfillLegacyPayload(ctx)
if err != nil {
return errors.Wrap(err, "job payload not found in system.job_info")
}
log.Infof(ctx, "fixed missing payload info for job %d", jobID)
payloadBytes = fixedPayload
return errors.Wrap(&JobNotFoundError{jobID: jobID}, "job payload not found in system.job_info")
}
if err := protoutil.Unmarshal(payloadBytes, payload); err != nil {
return err
Expand All @@ -346,12 +338,7 @@ func (r *Registry) resumeJob(
return err
}
if !exists {
fixedProgress, err := infoStorage.BackfillLegacyProgress(ctx)
if err != nil {
return errors.Wrap(err, "job progress not found in system.job_info")
}
log.Infof(ctx, "fixed missing progress info for job %d", jobID)
progressBytes = fixedProgress
return errors.Wrap(&JobNotFoundError{jobID: jobID}, "job progress not found in system.job_info")
}
return protoutil.Unmarshal(progressBytes, progress)
}); err != nil {
Expand Down
37 changes: 0 additions & 37 deletions pkg/jobs/job_info_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package jobs
import (
"bytes"
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// InfoStorage can be used to read and write rows to system.job_info table. All
Expand Down Expand Up @@ -320,38 +318,3 @@ func (i InfoStorage) GetLegacyProgress(ctx context.Context) ([]byte, bool, error
func (i InfoStorage) WriteLegacyProgress(ctx context.Context, progress []byte) error {
return i.Write(ctx, LegacyProgressKey, progress)
}

// BackfillLegacyPayload copies a legacy payload from system.jobs. #104798.
func (i InfoStorage) BackfillLegacyPayload(ctx context.Context) ([]byte, error) {
return i.backfillMissing(ctx, "payload")
}

// BackfillLegacyProgress copies a legacy progress from system.jobs. #104798.
func (i InfoStorage) BackfillLegacyProgress(ctx context.Context) ([]byte, error) {
return i.backfillMissing(ctx, "progress")
}

func (i InfoStorage) backfillMissing(ctx context.Context, kind string) ([]byte, error) {
row, err := i.txn.QueryRowEx(
ctx, fmt.Sprintf("job-info-fix-%s", kind), i.txn.KV(),
sessiondata.NodeUserSessionDataOverride,
`INSERT INTO system.job_info (job_id, info_key, value)
SELECT id, 'legacy_`+kind+`', `+kind+` FROM system.jobs WHERE id = $1 AND `+kind+` IS NOT NULL
RETURNING value`,
i.j.ID(),
)

if err != nil {
return nil, err
}

if row == nil {
return nil, errors.Wrapf(&JobNotFoundError{jobID: i.j.ID()}, "job %s not found in system.jobs", redact.SafeString(kind))
}

value, ok := row[0].(*tree.DBytes)
if !ok {
return nil, errors.AssertionFailedf("job info: expected value to be DBytes (was %T)", row[0])
}
return []byte(*value), nil
}
2 changes: 1 addition & 1 deletion pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1618,7 +1618,7 @@ func TestJobLifecycle(t *testing.T) {
}, registry.MakeJobID(), txn)
return errors.New("boom")
}))
if err := job.Started(ctx); !testutils.IsError(err, "job payload not found in system.jobs") {
if err := job.Started(ctx); !testutils.IsError(err, "not found in system.jobs table") {
t.Fatalf("unexpected error %v", err)
}
})
Expand Down
7 changes: 0 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1861,13 +1861,6 @@ func (r *Registry) MarkIdle(job *Job, isIdle bool) {
}
}

// TestingForgetJob causes the registry to forget it has adopted a job.
func (r *Registry) TestingForgetJob(id jobspb.JobID) {
r.mu.Lock()
defer r.mu.Unlock()
delete(r.mu.adoptedJobs, id)
}

func (r *Registry) cancelAllAdoptedJobs() {
r.mu.Lock()
defer r.mu.Unlock()
Expand Down
28 changes: 1 addition & 27 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,33 +108,7 @@ func (u Updater) update(ctx context.Context, useReadLock bool, updateFn UpdateFn
return err
}
if row == nil {
// Maybe try to fix a row missed by 23.1.3 backfill and re-read. #104798.
if u.j.registry.settings.Version.IsActive(ctx, clusterversion.V23_1JobInfoTableIsBackfilled) {
i := j.InfoStorage(u.txn)

_, err := i.BackfillLegacyPayload(ctx)
if err != nil {
return errors.Wrap(err, "failed to backfill job info payload during update")
}

_, err = i.BackfillLegacyProgress(ctx)
if err != nil {
return errors.Wrap(err, "failed to backfill job info progress during update")
}

row, err = u.txn.QueryRowEx(
ctx, "select-job", u.txn.KV(),
sessiondata.RootUserSessionDataOverride,
getSelectStmtForJobUpdate(ctx, j.session != nil, useReadLock, u.j.registry.settings.Version), j.ID(),
)

if err != nil {
return err
}
}
if row == nil {
return errors.Errorf("not found in system.jobs table")
}
return errors.Errorf("not found in system.jobs table")
}

if status, err = unmarshalStatus(row[0]); err != nil {
Expand Down
6 changes: 0 additions & 6 deletions pkg/upgrade/upgrades/backfill_job_info_table_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,9 @@ const (
backfillJobInfoProgressStmt = backfillJobInfoSharedPrefix + jobs.LegacyProgressKey + `', progress` + backfillJobInfoSharedSuffix
)

// TestingSkipInfoBackfill is a testing hook.
var TestingSkipInfoBackfill bool

func backfillJobInfoTable(
ctx context.Context, cs clusterversion.ClusterVersion, d upgrade.TenantDeps,
) error {
if TestingSkipInfoBackfill {
return nil
}

for step, stmt := range []string{backfillJobInfoPayloadStmt, backfillJobInfoProgressStmt} {
var resumeAfter int
Expand Down
111 changes: 0 additions & 111 deletions pkg/upgrade/upgrades/backfill_job_info_table_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ package upgrades_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
_ "github.com/cockroachdb/cockroach/pkg/ccl/backupccl"
Expand All @@ -24,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -109,111 +106,3 @@ WHERE j.id = i.job_id AND (j.payload = i.value OR j.progress = i.value) AND (j.i
`,
[][]string{{"14"}})
}

var _ jobs.Resumer = &fakeJob{}

type fakeJob struct {
job *jobs.Job
ch1, ch2 chan<- string
}

func (r *fakeJob) Resume(ctx context.Context, _ interface{}) error {
ch := r.ch1
if r.job.Progress().Details.(*jobspb.Progress_Import).Import.ResumePos[0] == 2 {
ch = r.ch2
}
select {
case ch <- fmt.Sprintf("%s %v",
r.job.Details().(jobspb.ImportDetails).BackupPath,
r.job.Progress().Details.(*jobspb.Progress_Import).Import.ResumePos):
return nil
case <-ctx.Done():
return ctx.Err()
}
}

func (r *fakeJob) OnFailOrCancel(ctx context.Context, execCtx interface{}, _ error) error {
return nil
}

func TestIncompleteBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

clusterArgs := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BootstrapVersionKeyOverride: clusterversion.V22_2,
BinaryVersionOverride: clusterversion.ByKey(clusterversion.V22_2),
}}},
}

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, clusterArgs)
defer tc.Stopper().Stop(ctx)

r := tc.Server(0).JobRegistry().(*jobs.Registry)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

ch1 := make(chan string, 1)
ch2 := make(chan string, 1)

jobs.RegisterConstructor(
jobspb.TypeImport,
func(job *jobs.Job, _ *cluster.Settings) jobs.Resumer { return &fakeJob{job: job, ch1: ch1, ch2: ch2} },
jobs.UsesTenantCostControl,
)

var adoptedJob, runningJob *jobs.StartableJob
runningID, adoptedID := jobspb.JobID(5550001), jobspb.JobID(5550002)
require.NoError(t, tc.Server(0).InternalDB().(isql.DB).Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) (err error) {

if err := r.CreateStartableJobWithTxn(ctx, &adoptedJob, adoptedID, txn, jobs.Record{
Username: username.RootUserName(),
Details: jobspb.ImportDetails{BackupPath: "adopted"},
Progress: jobspb.ImportProgress{ResumePos: []int64{1}},
}); err != nil {
return err
}

if err := r.CreateStartableJobWithTxn(ctx, &runningJob, runningID, txn, jobs.Record{
Username: username.RootUserName(),
Details: jobspb.ImportDetails{BackupPath: "running"},
Progress: jobspb.ImportProgress{ResumePos: []int64{2}},
}); err != nil {
return err
}
return nil
}))

upgrades.TestingSkipInfoBackfill = true
defer func() {
upgrades.TestingSkipInfoBackfill = false
}()

sqlDB.Exec(t, "SET CLUSTER SETTING version = $1", clusterversion.ByKey(clusterversion.V23_1).String())
r.TestingForgetJob(adoptedID)
r.NotifyToResume(ctx, adoptedID)

require.NoError(t, runningJob.Start(ctx))
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
require.NoError(t, runningJob.AwaitCompletion(ctx))

select {
case res := <-ch1:
require.Equal(t, "adopted [1]", res)
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for job to run")
}

select {
case res := <-ch2:
require.Equal(t, "running [2]", res)
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for job to run")
}

}

0 comments on commit 99c8020

Please sign in to comment.