diff --git a/pkg/jobs/adopt.go b/pkg/jobs/adopt.go index fce7ff9a599f..7160e77ff5d1 100644 --- a/pkg/jobs/adopt.go +++ b/pkg/jobs/adopt.go @@ -420,26 +420,59 @@ func (r *Registry) runJob( log.Errorf(ctx, "job %d: adoption completed with error %v", job.ID(), err) } - r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) r.maybeRecordExecutionFailure(ctx, err, job) + // NB: After this point, the job may no longer have the claim + // and further updates to the job record from this node may + // fail. + r.maybeClearLease(job, err) + r.maybeDumpTrace(ctx, resumer, int64(job.ID()), int64(span.TraceID()), err) if r.knobs.AfterJobStateMachine != nil { r.knobs.AfterJobStateMachine() } return err } +const clearClaimQuery = ` + UPDATE system.jobs + SET claim_session_id = NULL, claim_instance_id = NULL + WHERE id = $1 + AND claim_session_id = $2 + AND claim_instance_id = $3` + +// maybeClearLease clears the claim on the given job, provided that +// the current lease matches our liveness Session. +func (r *Registry) maybeClearLease(job *Job, jobErr error) { + if jobErr == nil { + return + } + r.clearLeaseForJobID(job.ID(), nil /* txn */) +} + +func (r *Registry) clearLeaseForJobID(jobID jobspb.JobID, txn *kv.Txn) { + // We use the serverCtx here rather than the context from the + // caller since the caller's context may have been canceled. + r.withSession(r.serverCtx, func(ctx context.Context, s sqlliveness.Session) { + n, err := r.ex.ExecEx(ctx, "clear-job-claim", txn, + sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, + clearClaimQuery, jobID, s.ID().UnsafeBytes(), r.ID()) + if err != nil { + log.Warningf(ctx, "could not clear job claim: %s", err.Error()) + return + } + log.VEventf(ctx, 2, "cleared leases for %d jobs", n) + }) +} + const pauseAndCancelUpdate = ` UPDATE system.jobs - SET status = + SET status = CASE WHEN status = '` + string(StatusPauseRequested) + `' THEN '` + string(StatusPaused) + `' WHEN status = '` + string(StatusCancelRequested) + `' THEN '` + string(StatusReverting) + `' ELSE status END, num_runs = 0, - last_run = NULL, - claim_session_id = NULL, - claim_instance_id = NULL + last_run = NULL WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `')) AND ((claim_session_id = $1) AND (claim_instance_id = $2)) RETURNING id, status @@ -469,11 +502,26 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes statusString := *row[1].(*tree.DString) switch Status(statusString) { case StatusPaused: - r.cancelRegisteredJobContext(id) + if !r.cancelRegisteredJobContext(id) { + // If we didn't already have a running job for this lease, + // clear out the lease here since it won't be cleared be + // cleared out on Resume exit. + r.clearLeaseForJobID(id, txn) + } log.Infof(ctx, "job %d, session %s: paused", id, s.ID()) case StatusReverting: if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error { - r.cancelRegisteredJobContext(id) + if !r.cancelRegisteredJobContext(id) { + // If we didn't already have a running job for this + // lease, clear out the lease here since it won't be + // cleared be cleared out on Resume exit. + // + // NB: This working as part of the update depends on + // the fact that the job struct does not have a + // claim set and thus won't validate the claim on + // update. + r.clearLeaseForJobID(id, txn) + } md.Payload.Error = errJobCanceled.Error() encodedErr := errors.EncodeError(ctx, errJobCanceled) md.Payload.FinalResumeError = &encodedErr diff --git a/pkg/jobs/delegate_control_test.go b/pkg/jobs/delegate_control_test.go index 612d5dcfed25..12c5c5d90cfc 100644 --- a/pkg/jobs/delegate_control_test.go +++ b/pkg/jobs/delegate_control_test.go @@ -27,12 +27,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestScheduleControl(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelper(t) defer cleanup() @@ -159,6 +162,8 @@ func TestScheduleControl(t *testing.T) { func TestJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + th, cleanup := newTestHelperForTables(t, jobstest.UseSystemTables, nil) defer cleanup() @@ -267,6 +272,7 @@ func TestJobsControlForSchedules(t *testing.T) { // jobs prior to executing the control command. func TestFilterJobsControlForSchedules(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) defer ResetConstructors()() argsFn := func(args *base.TestServerArgs) { @@ -347,6 +353,7 @@ func TestFilterJobsControlForSchedules(t *testing.T) { func TestJobControlByType(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) defer ResetConstructors()() argsFn := func(args *base.TestServerArgs) { diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 9d4ea4cc6e0a..4c28af6189aa 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -18,8 +18,10 @@ import ( "os" "path/filepath" "reflect" + "regexp" "sort" "strings" + "sync" "sync/atomic" "testing" "time" @@ -616,6 +618,53 @@ func TestRegistryLifecycle(t *testing.T) { rts.resumeCheckCh <- struct{}{} rts.check(t, jobs.StatusRunning) + r, err := regexp.Compile("retry txn") + require.NoError(t, err) + + executeWithRetriableTxn := func(db *gosql.DB, fn func(txn *gosql.Tx) error) error { + txn, err := db.Begin() + if err != nil { + return err + } + defer func() { + if err != nil { + _ = txn.Rollback() + } + + }() + + _, err = txn.Exec("SAVEPOINT cockroach_restart") + if err != nil { + return err + } + + maxRetries := 10 + retryCount := 0 + for { + err = fn(txn) + if err == nil { + _, err = txn.Exec("RELEASE SAVEPOINT cockroach_restart") + if err == nil { + return txn.Commit() + } + } + + if !r.MatchString(err.Error()) { + return err + } + + _, rollbackErr := txn.Exec("ROLLBACK TO SAVEPOINT cockroach_restart") + if rollbackErr != nil { + return errors.CombineErrors(rollbackErr, err) + } + + retryCount++ + if retryCount > maxRetries { + return errors.Wrap(err, "retries exhausted") + } + } + } + // Rollback a CANCEL. { txn, err := rts.outerDB.Begin() @@ -653,19 +702,18 @@ func TestRegistryLifecycle(t *testing.T) { } // Now pause it for reals. { - txn, err := rts.outerDB.Begin() + err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error { + if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil { + return err + } + // Not committed yet, so state shouldn't have changed. + // Don't check status in txn. + rts.check(t, "") + return nil + }) if err != nil { t.Fatal(err) } - if _, err := txn.Exec("PAUSE JOB $1", job.ID()); err != nil { - t.Fatal(err) - } - // Not committed yet, so state shouldn't have changed. - // Don't check status in txn. - rts.check(t, "") - if err := txn.Commit(); err != nil { - t.Fatal(err) - } rts.check(t, jobs.StatusPaused) } // Rollback a RESUME. @@ -684,19 +732,18 @@ func TestRegistryLifecycle(t *testing.T) { } // Commit a RESUME. { - txn, err := rts.outerDB.Begin() + err := executeWithRetriableTxn(rts.outerDB, func(txn *gosql.Tx) error { + if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil { + return err + } + // Not committed yet, so state shouldn't have changed. + // Don't check status in txn. + rts.check(t, "") + return nil + }) if err != nil { t.Fatal(err) } - if _, err := txn.Exec("RESUME JOB $1", job.ID()); err != nil { - t.Fatal(err) - } - // Not committed yet, so state shouldn't have changed. - // Don't check status in txn. - rts.check(t, "") - if err := txn.Commit(); err != nil { - t.Fatal(err) - } } rts.mu.e.ResumeStart = true rts.check(t, jobs.StatusRunning) @@ -1139,6 +1186,42 @@ func TestRegistryLifecycle(t *testing.T) { <-completeCh }) + t.Run("job with created by fields", func(t *testing.T) { + createdByType := "internal_test" + rts := registryTestSuite{} + rts.setUp(t) + defer rts.tearDown() + + resumerJob := make(chan *jobs.Job, 1) + jobs.RegisterConstructor( + jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { + return jobs.FakeResumer{ + OnResume: func(ctx context.Context) error { + resumerJob <- j + return nil + }, + } + }, jobs.UsesTenantCostControl) + + jobID := rts.registry.MakeJobID() + record := jobs.Record{ + Details: jobspb.BackupDetails{}, + Progress: jobspb.BackupProgress{}, + CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123}, + } + job, err := rts.registry.CreateAdoptableJobWithTxn(rts.ctx, record, jobID, nil /* txn */) + require.NoError(t, err) + + loadedJob, err := rts.registry.LoadJob(rts.ctx, jobID) + require.NoError(t, err) + require.NotNil(t, loadedJob.CreatedBy()) + require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy()) + rts.registry.TestingNudgeAdoptionQueue() + resumedJob := <-resumerJob + require.NotNil(t, resumedJob.CreatedBy()) + require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy()) + + }) } func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int) { @@ -1166,6 +1249,10 @@ func checkTraceFiles(t *testing.T, registry *jobs.Registry, expectedNumFiles int } } +// TestJobLifecycle tests the invariants about the job lifecycle +// querires. It does not depend on the registries job management tasks +// and assumes that it maintains the lease on the job through all +// state transitions. func TestJobLifecycle(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1173,14 +1260,16 @@ func TestJobLifecycle(t *testing.T) { ctx := context.Background() - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + params, _ := tests.CreateTestServerParams() + params.Knobs.JobsTestingKnobs = &jobs.TestingKnobs{DisableRegistryLifecycleManagent: true} + s, sqlDB, _ := serverutils.StartServer(t, params) defer s.Stopper().Stop(ctx) registry := s.JobRegistry().(*jobs.Registry) createJob := func(record jobs.Record) (*jobs.Job, expectation) { beforeTime := timeutil.Now() - job, err := registry.CreateAdoptableJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */) + job, err := registry.CreateJobWithTxn(ctx, record, registry.MakeJobID(), nil /* txn */) require.NoError(t, err) payload := job.Payload() return job, expectation{ @@ -1202,37 +1291,6 @@ func TestJobLifecycle(t *testing.T) { return createJob(defaultRecord) } - done := make(chan struct{}) - defer close(done) - - jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { - return jobs.FakeResumer{ - OnResume: func(ctx context.Context) error { - select { - case <-ctx.Done(): - return ctx.Err() - case <-done: - return nil - } - }, - } - }, jobs.UsesTenantCostControl) - - startLeasedJob := func(t *testing.T, record jobs.Record) (*jobs.StartableJob, expectation) { - beforeTime := timeutil.Now() - job, err := jobs.TestingCreateAndStartJob(ctx, registry, s.DB(), record) - if err != nil { - t.Fatal(err) - } - payload := job.Payload() - return job, expectation{ - DB: sqlDB, - Record: record, - Type: payload.Type(), - Before: beforeTime, - } - } - t.Run("valid job lifecycles succeed", func(t *testing.T) { // Woody is a successful job. woodyPride, _ := security.MakeSQLUsernameFromUserInput("Woody Pride", security.UsernameValidation) @@ -1440,7 +1498,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("cancelable jobs can be paused until finished", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil { t.Fatal(err) @@ -1473,7 +1531,7 @@ func TestJobLifecycle(t *testing.T) { t.Run("cancelable jobs can be canceled until finished", func(t *testing.T) { { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil { t.Fatal(err) } @@ -1483,7 +1541,7 @@ func TestJobLifecycle(t *testing.T) { } { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() if err := job.Started(ctx); err != nil { t.Fatal(err) } @@ -1496,7 +1554,7 @@ func TestJobLifecycle(t *testing.T) { } { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil { t.Fatal(err) } @@ -1512,7 +1570,7 @@ func TestJobLifecycle(t *testing.T) { } { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := job.Succeeded(ctx); err != nil { t.Fatal(err) } @@ -1525,7 +1583,7 @@ func TestJobLifecycle(t *testing.T) { t.Run("unpaused jobs cannot be resumed", func(t *testing.T) { { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil { t.Fatal(err) } @@ -1535,7 +1593,7 @@ func TestJobLifecycle(t *testing.T) { } { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := job.Succeeded(ctx); err != nil { t.Fatal(err) } @@ -1656,7 +1714,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("progress on paused job fails", func(t *testing.T) { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := registry.PauseRequested(ctx, nil, job.ID(), ""); err != nil { t.Fatal(err) } @@ -1668,7 +1726,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("progress on canceled job fails", func(t *testing.T) { - job, _ := startLeasedJob(t, defaultRecord) + job, _ := createDefaultJob() if err := registry.CancelRequested(ctx, nil, job.ID()); err != nil { t.Fatal(err) } @@ -1700,7 +1758,7 @@ func TestJobLifecycle(t *testing.T) { updateStatusStmt := `UPDATE system.jobs SET status = $1 WHERE id = $2` t.Run("set details works", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning)) newDetails := jobspb.ImportDetails{URIs: []string{"new"}} exp.Record.Details = newDetails @@ -1716,7 +1774,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("set details fails", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning)) _, err := exp.DB.Exec(updateStatusStmt, jobs.StatusCancelRequested, job.ID()) require.NoError(t, err) @@ -1725,7 +1783,7 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("set progress works", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning)) newProgress := jobspb.ImportProgress{ResumePos: []int64{42}} exp.Record.Progress = newProgress @@ -1740,47 +1798,13 @@ func TestJobLifecycle(t *testing.T) { }) t.Run("set progress fails", func(t *testing.T) { - job, exp := startLeasedJob(t, defaultRecord) + job, exp := createDefaultJob() require.NoError(t, exp.verify(job.ID(), jobs.StatusRunning)) _, err := exp.DB.Exec(updateStatusStmt, jobs.StatusPauseRequested, job.ID()) require.NoError(t, err) require.Error(t, job.SetProgress(ctx, nil /* txn */, jobspb.ImportProgress{ResumePos: []int64{42}})) require.NoError(t, exp.verify(job.ID(), jobs.StatusPauseRequested)) }) - - t.Run("job with created by fields", func(t *testing.T) { - createdByType := "internal_test" - - resumerJob := make(chan *jobs.Job, 1) - jobs.RegisterConstructor( - jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer { - return jobs.FakeResumer{ - OnResume: func(ctx context.Context) error { - resumerJob <- j - return nil - }, - } - }, jobs.UsesTenantCostControl) - - jobID := registry.MakeJobID() - record := jobs.Record{ - Details: jobspb.BackupDetails{}, - Progress: jobspb.BackupProgress{}, - CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123}, - } - job, err := registry.CreateAdoptableJobWithTxn(ctx, record, jobID, nil /* txn */) - require.NoError(t, err) - - loadedJob, err := registry.LoadJob(ctx, jobID) - require.NoError(t, err) - require.NotNil(t, loadedJob.CreatedBy()) - require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy()) - registry.TestingNudgeAdoptionQueue() - resumedJob := <-resumerJob - require.NotNil(t, resumedJob.CreatedBy()) - require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy()) - - }) } // TestShowJobs manually inserts a row into system.jobs and checks that the @@ -3161,6 +3185,35 @@ func checkBundle(t *testing.T, zipFile string, expectedFiles []string) { require.Equal(t, expectedFiles, filesInZip) } +type resumeStartedSignaler struct { + syncutil.Mutex + cond *sync.Cond + isStarted bool +} + +func newResumeStartedSignaler() *resumeStartedSignaler { + ret := &resumeStartedSignaler{} + ret.cond = sync.NewCond(&ret.Mutex) + return ret + +} + +func (r *resumeStartedSignaler) SignalResumeStarted() { + r.Lock() + r.isStarted = true + r.cond.Signal() + r.Unlock() +} + +func (r *resumeStartedSignaler) WaitForResumeStarted() { + r.Lock() + for !r.isStarted { + r.cond.Wait() + } + r.isStarted = false + r.Unlock() +} + // TestPauseReason tests pausing a job with a user specified reason. func TestPauseReason(t *testing.T) { defer leaktest.AfterTest(t)() @@ -3177,10 +3230,11 @@ func TestPauseReason(t *testing.T) { done := make(chan struct{}) defer close(done) - + resumeSignaler := newResumeStartedSignaler() jobs.RegisterConstructor(jobspb.TypeImport, func(job *jobs.Job, settings *cluster.Settings) jobs.Resumer { return jobs.FakeResumer{ OnResume: func(ctx context.Context) error { + resumeSignaler.SignalResumeStarted() select { case <-ctx.Done(): return ctx.Err() @@ -3212,9 +3266,16 @@ func TestPauseReason(t *testing.T) { return n } mustNotHaveClaim := func() { - require.Equal(t, 0, countRowsWithClaimInfo()) + t.Helper() + testutils.SucceedsSoon(t, func() error { + if countRowsWithClaimInfo() == 0 { + return nil + } + return errors.New("still waiting for claim to clear") + }) } mustHaveClaim := func() { + t.Helper() testutils.SucceedsSoon(t, func() error { if countRowsWithClaimInfo() == 1 { return nil @@ -3227,6 +3288,7 @@ func TestPauseReason(t *testing.T) { q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) tdb.CheckQueryResultsRetry(t, q, [][]string{{"running"}}) mustHaveClaim() + resumeSignaler.WaitForResumeStarted() getStatusAndPayload := func(t *testing.T, id jobspb.JobID) (string, jobspb.Payload) { var payloadBytes []byte @@ -3260,6 +3322,7 @@ func TestPauseReason(t *testing.T) { checkStatusAndPauseReason(t, jobID, "running", "for testing") mustHaveClaim() + resumeSignaler.WaitForResumeStarted() } { // Pause the job again with a different reason. Verify that the job is paused with the reason. diff --git a/pkg/jobs/lease_test.go b/pkg/jobs/lease_test.go index ba9c2d027fcf..816882de5ee9 100644 --- a/pkg/jobs/lease_test.go +++ b/pkg/jobs/lease_test.go @@ -20,12 +20,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestJobsTableClaimFamily(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index a0f1fe2e72ee..bc579d59c5a5 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -704,6 +704,10 @@ func (r *Registry) withSession(ctx context.Context, f withSessionFunc) { // jobs if it observes a failure. Otherwise it starts all the main daemons of // registry that poll the jobs table and start/cancel/gc jobs. func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error { + if r.knobs.DisableRegistryLifecycleManagent { + return nil + } + // Since the job polling system is outside user control, exclude it from cost // accounting and control. Individual jobs are not part of this exclusion. ctx = multitenant.WithTenantCostControlExemption(ctx) @@ -1450,12 +1454,14 @@ func (r *Registry) unregister(jobID jobspb.JobID) { } } -func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) { +func (r *Registry) cancelRegisteredJobContext(jobID jobspb.JobID) bool { r.mu.Lock() defer r.mu.Unlock() - if aj, ok := r.mu.adoptedJobs[jobID]; ok { + aj, ok := r.mu.adoptedJobs[jobID] + if ok { aj.cancel() } + return ok } func (r *Registry) getClaimedJob(jobID jobspb.JobID) (*Job, error) { @@ -1522,7 +1528,7 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j return } if updateErr != nil { - log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, err) + log.Warningf(ctx, "failed to record error for job %d: %v: %v", j.ID(), err, updateErr) } } diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index c8e9dc9e4bb2..6657ea76b6f0 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -976,6 +976,7 @@ func TestRunWithoutLoop(t *testing.T) { func TestJobIdleness(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) ctx := context.Background() intervalOverride := time.Millisecond @@ -1114,6 +1115,7 @@ func TestJobIdleness(t *testing.T) { // allow other job registries in the cluster to claim and run this job. func TestDisablingJobAdoptionClearsClaimSessionID(t *testing.T) { defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) intervalOverride := time.Millisecond s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ diff --git a/pkg/jobs/testing_knobs.go b/pkg/jobs/testing_knobs.go index d8f262a4059f..069f3e6beda2 100644 --- a/pkg/jobs/testing_knobs.go +++ b/pkg/jobs/testing_knobs.go @@ -71,6 +71,9 @@ type TestingKnobs struct { // DisableAdoptions disables job adoptions. DisableAdoptions bool + + // DisableRegistryLifecycleManagement + DisableRegistryLifecycleManagent bool } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.