Skip to content

Commit

Permalink
jobs: clear claim for already-dead paused jobs
Browse files Browse the repository at this point in the history
Previously we only cleared the claim after the state machine returned
and only if the status wasn't pause-requested or
cancel-requested. This filter on status, however, was unnecessary.

The job may still be in the cancel-requested or pause-requested state
when we go to clear the claim because the transaction that resulted in
the canceled context may not have completed. But, it is still fine to
clear the claim. There are 1 of two cases:

1) Either the transaction that cancelled us fails and we are thus
   still in the state cancel-requested or paused-requested with no
   claim. This is fine. The adoption loop will adopt the job and move
   the state to paused or reverting, just with no context to cancel.

2) The transaction succeeds and we are in paused or reverting without
   a claim set. Just as we wanted.

Here we remove the where clause to always clear the claim when we
return from the state machine.

In the case of (1), when processing the cancel-requested or
paused-requested state the second time, we may still want the claim
cleared. Here, we make sure it gets cleared even in the case where
there is no running job that actually needs to be canceled.

Fixes #92112

Release note: None
  • Loading branch information
stevendanna committed Jan 3, 2023
1 parent e2b9737 commit aa4f388
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 93 deletions.
29 changes: 23 additions & 6 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,22 +437,24 @@ const clearClaimQuery = `
SET claim_session_id = NULL, claim_instance_id = NULL
WHERE id = $1
AND claim_session_id = $2
AND claim_instance_id = $3
AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')`
AND claim_instance_id = $3`

// maybeClearLease clears the claim on the given job, provided that
// the current lease matches our liveness Session.
func (r *Registry) maybeClearLease(job *Job, jobErr error) {
if jobErr == nil {
return
}
r.clearLeaseForJobID(job.ID(), nil /* txn */)
}

func (r *Registry) clearLeaseForJobID(jobID jobspb.JobID, txn *kv.Txn) {
// We use the serverCtx here rather than the context from the
// caller since the caller's context may have been canceled.
r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) {
n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */
n, err := r.ex.ExecEx(ctx, "clear-job-claim", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID())
clearClaimQuery, jobID, s.ID().UnsafeBytes(), r.ID())
if err != nil {
log.Warningf(ctx, "could not clear job claim: %s", err.Error())
return
Expand Down Expand Up @@ -500,11 +502,26 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
statusString := *row[1].(*tree.DString)
switch Status(statusString) {
case StatusPaused:
r.cancelRegisteredJobContext(id)
if !r.cancelRegisteredJobContext(id) {
// If we didn't already have a running job for this lease,
// clear out the lease here since it won't be cleared be
// cleared out on Resume exit.
r.clearLeaseForJobID(id, txn)
}
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
r.cancelRegisteredJobContext(id)
if !r.cancelRegisteredJobContext(id) {
// If we didn't already have a running job for this
// lease, clear out the lease here since it won't be
// cleared be cleared out on Resume exit.
//
// NB: This working as part of the update depends on
// the fact that the job struct does not have a
// claim set and thus won't validate the claim on
// update.
r.clearLeaseForJobID(id, txn)
}
md.Payload.Error = errJobCanceled.Error()
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
Expand Down
142 changes: 57 additions & 85 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,6 +1186,42 @@ func TestRegistryLifecycle(t *testing.T) {

<-completeCh
})
t.Run("job with created by fields", func(t *testing.T) {
createdByType := "internal_test"
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()

resumerJob := make(chan *jobs.Job, 1)
jobs.RegisterConstructor(
jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumerJob <- j
return nil
},
}
}, jobs.UsesTenantCostControl)

jobID := rts.registry.MakeJobID()
record := jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123},
}
job, err := rts.registry.CreateAdoptableJobWithTxn(rts.ctx, record, jobID, nil /* txn */)
require.NoError(t, err)

loadedJob, err := rts.registry.LoadJob(rts.ctx, jobID)
require.NoError(t, err)
require.NotNil(t, loadedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy())
rts.registry.TestingNudgeAdoptionQueue()
resumedJob := <-resumerJob
require.NotNil(t, resumedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy())

})
}

func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int) {
Expand Down Expand Up @@ -1213,21 +1249,27 @@ func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int
}
}

// TestJobLifecycle tests the invariants about the job lifecycle
// querires. It does not depend on the registries job management tasks
// and assumes that it maintains the lease on the job through all
// state transitions.
func TestJobLifecycle(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.ResetConstructors()()

ctx := context.Background()

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
params, _ := tests.CreateTestServerParams()
params.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{DisableRegistryLifecycleManagent: true}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

registry := s.JobRegistry().(*jobs.Registry)

createJob := func(record jobs.Record) (*jobs.Job, expectation) {
beforeTime := timeutil.Now()
job, err := registry.CreateAdoptableJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */)
job, err := registry.CreateJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */)
require.NoError(t, err)
payload := job.Payload()
return job, expectation{
Expand All @@ -1249,38 +1291,6 @@ func TestJobLifecycle(t *testing.T) {
return createJob(defaultRecord)
}

done := make(chan struct{})
defer close(done)
resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
},
}
}, jobs.UsesTenantCostControl)

startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) {
beforeTime := timeutil.Now()
job, err := jobs.TestingCreateAndStartJob(ctx, registry, s.DB(), record)
if err != nil {
t.Fatal(err)
}
payload := job.Payload()
return job, expectation{
DB: sqlDB,
Record: record,
Type: payload.Type(),
Before: beforeTime,
}
}

t.Run("valid job lifecycles succeed", func(t *testing.T) {
// Woody is a successful job.
woodyPride, _ := security.MakeSQLUsernameFromUserInput("Woody Pride", security.UsernameValidation)
Expand Down Expand Up @@ -1488,7 +1498,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("cancelable jobs can be paused until finished", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()

if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
Expand All @@ -1510,10 +1520,6 @@ func TestJobLifecycle(t *testing.T) {
t.Fatal(err)
}

// Wait for job to be adopted so that we have the
// lease and can move to succeeded.
resumeSignaler.WaitForResumeStarted()

// PauseRequested fails after job is successful.
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
Expand All @@ -1525,7 +1531,7 @@ func TestJobLifecycle(t *testing.T) {

t.Run("cancelable jobs can be canceled until finished", func(t *testing.T) {
{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand All @@ -1535,7 +1541,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := job.Started(ctx); err != nil {
t.Fatal(err)
}
Expand All @@ -1548,7 +1554,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
}
Expand All @@ -1564,7 +1570,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
}
Expand All @@ -1577,7 +1583,7 @@ func TestJobLifecycle(t *testing.T) {

t.Run("unpaused jobs cannot be resumed", func(t *testing.T) {
{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand All @@ -1587,7 +1593,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1708,7 +1714,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("progress on paused job fails", func(t *testing.T) {
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
}
Expand All @@ -1720,7 +1726,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("progress on canceled job fails", func(t *testing.T) {
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1752,7 +1758,7 @@ func TestJobLifecycle(t *testing.T) {
updateStatusStmt := `UPDATE system.jobs SET status = $1 WHERE id = $2`

t.Run("set details works", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
newDetails := jobspb.ImportDetails{URIs: []string{"new"}}
exp.Record.Details = newDetails
Expand All @@ -1768,7 +1774,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set details fails", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
_, err := exp.DB.Exec(updateStatusStmt, jobs.StatusCancelRequested, job.ID())
require.NoError(t, err)
Expand All @@ -1777,7 +1783,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set progress works", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
newProgress := jobspb.ImportProgress{ResumePos: []int64{42}}
exp.Record.Progress = newProgress
Expand All @@ -1792,47 +1798,13 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set progress fails", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
_, err := exp.DB.Exec(updateStatusStmt, jobs.StatusPauseRequested, job.ID())
require.NoError(t, err)
require.Error(t, job.SetProgress(ctx, nil /* txn */, jobspb.ImportProgress{ResumePos: []int64{42}}))
require.NoError(t, exp.verify(job.ID(), jobs.StatusPauseRequested))
})

t.Run("job with created by fields", func(t *testing.T) {
createdByType := "internal_test"

resumerJob := make(chan *jobs.Job, 1)
jobs.RegisterConstructor(
jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumerJob <- j
return nil
},
}
}, jobs.UsesTenantCostControl)

jobID := registry.MakeJobID()
record := jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123},
}
job, err := registry.CreateAdoptableJobWithTxn(ctx, record, jobID, nil /* txn */)
require.NoError(t, err)

loadedJob, err := registry.LoadJob(ctx, jobID)
require.NoError(t, err)
require.NotNil(t, loadedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy())
registry.TestingNudgeAdoptionQueue()
resumedJob := <-resumerJob
require.NotNil(t, resumedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy())

})
}

// TestShowJobs manually inserts a row into system.jobs and checks that the
Expand Down
10 changes: 8 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,10 @@ func (r *Registry) withSession(ctx context.Context, f withSessionFunc) {
// jobs if it observes a failure. Otherwise it starts all the main daemons of
// registry that poll the jobs table and start/cancel/gc jobs.
func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
if r.knobs.DisableRegistryLifecycleManagent {
return nil
}

// Since the job polling system is outside user control, exclude it from cost
// accounting and control. Individual jobs are not part of this exclusion.
ctx = multitenant.WithTenantCostControlExemption(ctx)
Expand Down Expand Up @@ -1450,12 +1454,14 @@ func (r *Registry) unregister(jobID jobspb.JobID) {
}
}

func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) {
func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) bool {
r.mu.Lock()
defer r.mu.Unlock()
if aj, ok := r.mu.adoptedJobs[jobID]; ok {
aj, ok := r.mu.adoptedJobs[jobID]
if ok {
aj.cancel()
}
return ok
}

func (r *Registry) getClaimedJob(jobID jobspb.JobID) (*Job, error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type TestingKnobs struct {

// DisableAdoptions disables job adoptions.
DisableAdoptions bool

// DisableRegistryLifecycleManagement
DisableRegistryLifecycleManagent bool
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit aa4f388

Please sign in to comment.