diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 8b91ec5e9619..c3d59fb352c3 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1463,6 +1463,30 @@ func getRegisterOptions(typ jobspb.Type) (registerOptions, bool) { return opts, ok } +// TestingClearConstructors clears all previously registered +// constructors. This is useful in tests when you want to ensure that +// the job system will only run a particular job. +// +// The returned function should be called at the end of the test to +// restore the constructors. +func TestingClearConstructors() func() { + globalMu.Lock() + defer globalMu.Unlock() + + oldConstructors := globalMu.constructors + oldOptions := globalMu.options + + globalMu.constructors = make(map[jobspb.Type]Constructor) + globalMu.options = make(map[jobspb.Type]registerOptions) + return func() { + globalMu.Lock() + defer globalMu.Unlock() + globalMu.constructors = oldConstructors + globalMu.options = oldOptions + } + +} + // RegisterConstructor registers a Resumer constructor for a certain job type. // // NOTE: You must pass either jobs.UsesTenantCostControl or diff --git a/pkg/jobs/registry_test.go b/pkg/jobs/registry_test.go index 62a0d5e6a0b7..2f295f3d773f 100644 --- a/pkg/jobs/registry_test.go +++ b/pkg/jobs/registry_test.go @@ -591,6 +591,9 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { cancel = false ) + restoreConstructors := TestingClearConstructors() + defer restoreConstructors() + // createJob creates a mock job. createJob := func( t *testing.T, ctx context.Context, s serverutils.ApplicationLayerInterface, r *Registry, tdb *sqlutils.SQLRunner, db isql.DB, @@ -667,6 +670,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { // expectImmediateRetry is true if the test should expect immediate // resumption on retry, such as after pausing and resuming job. expectImmediateRetry bool + allowCompletion func() } testInfraSetUp := func(t *testing.T, ctx context.Context, bti *BackoffTestInfra) func() { // We use a manual clock to control and evaluate job execution times. @@ -785,15 +789,17 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { require.GreaterOrEqual(t, maxDelay, delay, "delay exceeds the max") // Advance the clock such that it is before the next expected retry time. bti.clock.AdvanceTo(lastRun.Add(delay - unitTime)) - // This allows adopt-loops to run for a few times, which ensures that - // adopt-loops do not resume jobs without correctly following the job - // schedules. - waitUntilCount(t, bti.adopted, bti.adopted.Count()+2) if bti.expectImmediateRetry && i > 0 { - // Validate that the job did not wait to resume on retry. + // Validate that the job does not wait to resume on retry. + waitUntilCount(t, bti.resumed, expectedResumed+1) require.Equal(t, expectedResumed+1, bti.resumed.Count(), "unexpected number of jobs resumed in retry %d", i) } else { // Validate that the job is not resumed yet. + + // This allows adopt-loops to run for a few times, which ensures that + // adopt-loops do not resume jobs without correctly following the job + // schedules. + waitUntilCount(t, bti.adopted, bti.adopted.Count()+2) require.Equal(t, expectedResumed, bti.resumed.Count(), "unexpected number of jobs resumed in retry %d", i) } // Advance the clock by delta from the expected time of next retry. @@ -813,9 +819,13 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { } lastRun = bti.clock.Now() } - bti.done.Store(true) - // Let the job be retried one more time. - bti.clock.Advance(nextDelay(retryCnt, initialDelay, maxDelay)) + + if bti.allowCompletion != nil { + bti.allowCompletion() + } else { + bti.done.Store(true) + bti.clock.Advance(nextDelay(retryCnt, initialDelay, maxDelay)) + } // Wait until the job completes. testutils.SucceedsSoon(t, func() error { var found Status @@ -856,6 +866,11 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { t.Run("pause running", func(t *testing.T) { ctx := context.Background() bti := BackoffTestInfra{expectImmediateRetry: true} + bti.allowCompletion = func() { + <-bti.resumeCh + bti.errCh <- nil + <-bti.transitionCh + } bti.afterJobStateMachineKnob = func() { if bti.done.Load().(bool) { return @@ -936,7 +951,11 @@ func TestRetriesWithExponentialBackoff(t *testing.T) { t.Run("pause reverting", func(t *testing.T) { ctx := context.Background() bti := BackoffTestInfra{expectImmediateRetry: true} - + bti.allowCompletion = func() { + <-bti.failOrCancelCh + bti.errCh <- nil + <-bti.transitionCh + } bti.afterJobStateMachineKnob = func() { if bti.done.Load().(bool) { return