Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jobs: clear claim for already-dead paused jobs #92121

Merged
merged 1 commit into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,22 +434,24 @@ const clearClaimQuery = `
SET claim_session_id = NULL, claim_instance_id = NULL
WHERE id = $1
AND claim_session_id = $2
AND claim_instance_id = $3
AND status NOT IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')`
AND claim_instance_id = $3`

// maybeClearLease clears the claim on the given job, provided that
// the current lease matches our liveness Session.
func (r *Registry) maybeClearLease(job *Job, jobErr error) {
if jobErr == nil {
return
}
r.clearLeaseForJobID(job.ID(), nil /* txn */)
}

func (r *Registry) clearLeaseForJobID(jobID jobspb.JobID, txn *kv.Txn) {
// We use the serverCtx here rather than the context from the
// caller since the caller's context may have been canceled.
r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) {
n, err := r.ex.ExecEx(ctx, "clear-job-claim", nil, /* txn */
n, err := r.ex.ExecEx(ctx, "clear-job-claim", txn,
sessiondata.InternalExecutorOverride{User: username.NodeUserName()},
clearClaimQuery, job.ID(), s.ID().UnsafeBytes(), r.ID())
clearClaimQuery, jobID, s.ID().UnsafeBytes(), r.ID())
if err != nil {
log.Warningf(ctx, "could not clear job claim: %s", err.Error())
return
Expand Down Expand Up @@ -497,11 +499,26 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
statusString := *row[1].(*tree.DString)
switch Status(statusString) {
case StatusPaused:
r.cancelRegisteredJobContext(id)
if !r.cancelRegisteredJobContext(id) {
// If we didn't already have a running job for this lease,
// clear out the lease here since it won't be cleared be
// cleared out on Resume exit.
r.clearLeaseForJobID(id, txn)
}
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
r.cancelRegisteredJobContext(id)
if !r.cancelRegisteredJobContext(id) {
// If we didn't already have a running job for this
// lease, clear out the lease here since it won't be
// cleared be cleared out on Resume exit.
//
// NB: This working as part of the update depends on
// the fact that the job struct does not have a
// claim set and thus won't validate the claim on
// update.
r.clearLeaseForJobID(id, txn)
}
md.Payload.Error = errJobCanceled.Error()
encodedErr := errors.EncodeError(ctx, errJobCanceled)
md.Payload.FinalResumeError = &encodedErr
Expand Down
142 changes: 57 additions & 85 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,6 +1193,42 @@ func TestRegistryLifecycle(t *testing.T) {

<-completeCh
})
t.Run("job with created by fields", func(t *testing.T) {
createdByType := "internal_test"
rts := registryTestSuite{}
rts.setUp(t)
defer rts.tearDown()

resumerJob := make(chan *jobs.Job, 1)
jobs.RegisterConstructor(
jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumerJob <- j
return nil
},
}
}, jobs.UsesTenantCostControl)

jobID := rts.registry.MakeJobID()
record := jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123},
}
job, err := rts.registry.CreateAdoptableJobWithTxn(rts.ctx, record, jobID, nil /* txn */)
require.NoError(t, err)

loadedJob, err := rts.registry.LoadJob(rts.ctx, jobID)
require.NoError(t, err)
require.NotNil(t, loadedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy())
rts.registry.TestingNudgeAdoptionQueue()
resumedJob := <-resumerJob
require.NotNil(t, resumedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy())

})
}

func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int) {
Expand Down Expand Up @@ -1220,21 +1256,27 @@ func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int
}
}

// TestJobLifecycle tests the invariants about the job lifecycle
// querires. It does not depend on the registries job management tasks
// and assumes that it maintains the lease on the job through all
// state transitions.
func TestJobLifecycle(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer jobs.ResetConstructors()()

ctx := context.Background()

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
params, _ := tests.CreateTestServerParams()
params.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{DisableRegistryLifecycleManagent: true}
s, sqlDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(ctx)

registry := s.JobRegistry().(*jobs.Registry)

createJob := func(record jobs.Record) (*jobs.Job, expectation) {
beforeTime := timeutil.Now()
job, err := registry.CreateAdoptableJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */)
job, err := registry.CreateJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */)
require.NoError(t, err)
payload := job.Payload()
return job, expectation{
Expand All @@ -1256,38 +1298,6 @@ func TestJobLifecycle(t *testing.T) {
return createJob(defaultRecord)
}

done := make(chan struct{})
defer close(done)
resumeSignaler := newResumeStartedSignaler()
jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumeSignaler.SignalResumeStarted()
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
return nil
}
},
}
}, jobs.UsesTenantCostControl)

startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) {
beforeTime := timeutil.Now()
job, err := jobs.TestingCreateAndStartJob(ctx, registry, s.DB(), record)
if err != nil {
t.Fatal(err)
}
payload := job.Payload()
return job, expectation{
DB: sqlDB,
Record: record,
Type: payload.Type(),
Before: beforeTime,
}
}

t.Run("valid job lifecycles succeed", func(t *testing.T) {
// Woody is a successful job.
woodyPride, _ := username.MakeSQLUsernameFromUserInput("Woody Pride", username.PurposeValidation)
Expand Down Expand Up @@ -1495,7 +1505,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("cancelable jobs can be paused until finished", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()

if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
Expand All @@ -1517,10 +1527,6 @@ func TestJobLifecycle(t *testing.T) {
t.Fatal(err)
}

// Wait for job to be adopted so that we have the
// lease and can move to succeeded.
resumeSignaler.WaitForResumeStarted()

// PauseRequested fails after job is successful.
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
Expand All @@ -1532,7 +1538,7 @@ func TestJobLifecycle(t *testing.T) {

t.Run("cancelable jobs can be canceled until finished", func(t *testing.T) {
{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand All @@ -1542,7 +1548,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := job.Started(ctx); err != nil {
t.Fatal(err)
}
Expand All @@ -1555,7 +1561,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
}
Expand All @@ -1571,7 +1577,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
}
Expand All @@ -1584,7 +1590,7 @@ func TestJobLifecycle(t *testing.T) {

t.Run("unpaused jobs cannot be resumed", func(t *testing.T) {
{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand All @@ -1594,7 +1600,7 @@ func TestJobLifecycle(t *testing.T) {
}

{
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := job.Succeeded(ctx); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1715,7 +1721,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("progress on paused job fails", func(t *testing.T) {
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil {
t.Fatal(err)
}
Expand All @@ -1727,7 +1733,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("progress on canceled job fails", func(t *testing.T) {
job, _ := startLeasedJob(t, defaultRecord)
job, _ := createDefaultJob()
if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1759,7 +1765,7 @@ func TestJobLifecycle(t *testing.T) {
updateStatusStmt := `UPDATE system.jobs SET status = $1 WHERE id = $2`

t.Run("set details works", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
newDetails := jobspb.ImportDetails{URIs: []string{"new"}}
exp.Record.Details = newDetails
Expand All @@ -1775,7 +1781,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set details fails", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
_, err := exp.DB.Exec(updateStatusStmt, jobs.StatusCancelRequested, job.ID())
require.NoError(t, err)
Expand All @@ -1784,7 +1790,7 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set progress works", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
newProgress := jobspb.ImportProgress{ResumePos: []int64{42}}
exp.Record.Progress = newProgress
Expand All @@ -1799,47 +1805,13 @@ func TestJobLifecycle(t *testing.T) {
})

t.Run("set progress fails", func(t *testing.T) {
job, exp := startLeasedJob(t, defaultRecord)
job, exp := createDefaultJob()
require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning))
_, err := exp.DB.Exec(updateStatusStmt, jobs.StatusPauseRequested, job.ID())
require.NoError(t, err)
require.Error(t, job.SetProgress(ctx, nil /* txn */, jobspb.ImportProgress{ResumePos: []int64{42}}))
require.NoError(t, exp.verify(job.ID(), jobs.StatusPauseRequested))
})

t.Run("job with created by fields", func(t *testing.T) {
createdByType := "internal_test"

resumerJob := make(chan *jobs.Job, 1)
jobs.RegisterConstructor(
jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
return jobs.FakeResumer{
OnResume: func(ctx context.Context) error {
resumerJob <- j
return nil
},
}
}, jobs.UsesTenantCostControl)

jobID := registry.MakeJobID()
record := jobs.Record{
Details: jobspb.BackupDetails{},
Progress: jobspb.BackupProgress{},
CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123},
}
job, err := registry.CreateAdoptableJobWithTxn(ctx, record, jobID, nil /* txn */)
require.NoError(t, err)

loadedJob, err := registry.LoadJob(ctx, jobID)
require.NoError(t, err)
require.NotNil(t, loadedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy())
registry.TestingNudgeAdoptionQueue()
resumedJob := <-resumerJob
require.NotNil(t, resumedJob.CreatedBy())
require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy())

})
}

// TestShowJobs manually inserts a row into system.jobs and checks that the
Expand Down
10 changes: 8 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,10 @@ func (r *Registry) withSession(ctx context.Context, f withSessionFunc) {
// jobs if it observes a failure. Otherwise it starts all the main daemons of
// registry that poll the jobs table and start/cancel/gc jobs.
func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
if r.knobs.DisableRegistryLifecycleManagent {
return nil
}

// Since the job polling system is outside user control, exclude it from cost
// accounting and control. Individual jobs are not part of this exclusion.
ctx = multitenant.WithTenantCostControlExemption(ctx)
Expand Down Expand Up @@ -1504,12 +1508,14 @@ func (r *Registry) unregister(jobID jobspb.JobID) {
}
}

func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) {
func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) bool {
r.mu.Lock()
defer r.mu.Unlock()
if aj, ok := r.mu.adoptedJobs[jobID]; ok {
aj, ok := r.mu.adoptedJobs[jobID]
if ok {
aj.cancel()
}
return ok
}

func (r *Registry) getClaimedJob(jobID jobspb.JobID) (*Job, error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/jobs/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type TestingKnobs struct {
// DisableAdoptions disables job adoptions.
DisableAdoptions bool

// DisableRegistryLifecycleManagement
DisableRegistryLifecycleManagent bool

// BeforeWaitForJobsQuery is called once per invocation of the
// poll-show-jobs query in WaitForJobs.
BeforeWaitForJobsQuery func()
Expand Down