Skip to content

Commit

Permalink
jobs: fix panic when a job has lost its claim during update
Browse files Browse the repository at this point in the history
Jobs can lose their claims. We detect when the claim doesn't match a new
claim but we don't detect when the claim has been set to NULL (which is
effectively equivalent). This is important because the way we clear claims
is to set them to `NULL` and then we adopt jobs which have a `NULL` claim.
This bug was introduced in cockroachdb#55120 and has been released only in the 21.1
alpha.

Fixes cockroachdb#58049.

Release note (bug fix): Fixed a bug from the previous alpha where the binary
could crash if a running node lost its claim to a job while updating.
  • Loading branch information
ajwerner committed Dec 22, 2020
1 parent 25eb03d commit 21c3ad1
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
54 changes: 54 additions & 0 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -2642,3 +2643,56 @@ func TestMetrics(t *testing.T) {
}
})
}

// TestLoseLeaseDuringExecution tests that it is safe to call update during
// job execution when the job has lost its lease and that that update operation
// will fail.
//
// This is a regression test for #58049.
func TestLoseLeaseDuringExecution(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.ResetConstructors()()

// Disable the loops from messing with the job execution.
defer jobs.TestingSetAdoptAndCancelIntervals(time.Hour, time.Hour)()

ctx := context.Background()

s, _, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer s.Stopper().Stop(ctx)
registry := s.JobRegistry().(*jobs.Registry)

// The default FormatVersion value in SchemaChangeDetails corresponds to a
// pre-20.1 job.
rec := jobs.Record{
DescriptorIDs: []descpb.ID{1},
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
}

defer jobs.ResetConstructors()()
resumed := make(chan error, 1)
jobs.RegisterConstructor(jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context, _ chan<- tree.Datums) error {
defer close(resumed)
_, err := s.InternalExecutor().(sqlutil.InternalExecutor).Exec(
ctx, "set-claim-null", nil, /* txn */
`UPDATE system.jobs SET claim_session_id = NULL WHERE id = $1`,
*j.ID())
assert.NoError(t, err)
err = j.WithTxn(nil).Update(ctx, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
return nil
})
resumed <- err
return err
},
}
})

_, err := registry.CreateJobWithTxn(ctx, rec, nil /* txn */)
require.NoError(t, err)
registry.TestingNudgeAdoptionQueue()
require.Regexp(t, `expected session '\w+' but found NULL`, <-resumed)
}
5 changes: 5 additions & 0 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ func (j *Job) Update(ctx context.Context, updateFn UpdateFn) error {
}

if j.sessionID != "" {
if row[3] == tree.DNull {
return errors.Errorf(
"job %d: with status '%s': expected session '%s' but found NULL",
*j.ID(), statusString, j.sessionID)
}
storedSession := []byte(*row[3].(*tree.DBytes))
if !bytes.Equal(storedSession, j.sessionID.UnsafeBytes()) {
return errors.Errorf(
Expand Down

0 comments on commit 21c3ad1

Please sign in to comment.