Skip to content

Commit

Permalink
Merge #113494
Browse files Browse the repository at this point in the history
113494: jobs: deflake TestRetriesWithExponentialBackoff r=adityamaru a=stevendanna

This attempts to deflake TestRetriesWithExponentialBackoff.

The majority of the problems with this test are the result of pause
clearing the backoff state and thus needing to take a different path
through most of this test.

If this PR doesn't solve the flakes, I recommend that we just remove those
two test cases and write a new test for them.  

The only reason I haven't done that here is that this test has revealed interesting
job system behaviour in the past since it is so involved.

See individual commits for more details.

Fixes #112763

First two commits are form #113382

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Dec 6, 2023
2 parents ccdc317 + 3e9a4bf commit 6032f68
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
24 changes: 24 additions & 0 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,30 @@ func getRegisterOptions(typ jobspb.Type) (registerOptions, bool) {
return opts, ok
}

// TestingClearConstructors clears all previously registered
// constructors. This is useful in tests when you want to ensure that
// the job system will only run a particular job.
//
// The returned function should be called at the end of the test to
// restore the constructors.
func TestingClearConstructors() func() {
globalMu.Lock()
defer globalMu.Unlock()

oldConstructors := globalMu.constructors
oldOptions := globalMu.options

globalMu.constructors = make(map[jobspb.Type]Constructor)
globalMu.options = make(map[jobspb.Type]registerOptions)
return func() {
globalMu.Lock()
defer globalMu.Unlock()
globalMu.constructors = oldConstructors
globalMu.options = oldOptions
}

}

// RegisterConstructor registers a Resumer constructor for a certain job type.
//
// NOTE: You must pass either jobs.UsesTenantCostControl or
Expand Down
37 changes: 28 additions & 9 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,9 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
cancel = false
)

restoreConstructors := TestingClearConstructors()
defer restoreConstructors()

// createJob creates a mock job.
createJob := func(
t *testing.T, ctx context.Context, s serverutils.ApplicationLayerInterface, r *Registry, tdb *sqlutils.SQLRunner, db isql.DB,
Expand Down Expand Up @@ -667,6 +670,7 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
// expectImmediateRetry is true if the test should expect immediate
// resumption on retry, such as after pausing and resuming job.
expectImmediateRetry bool
allowCompletion func()
}
testInfraSetUp := func(t *testing.T, ctx context.Context, bti *BackoffTestInfra) func() {
// We use a manual clock to control and evaluate job execution times.
Expand Down Expand Up @@ -785,15 +789,17 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
require.GreaterOrEqual(t, maxDelay, delay, "delay exceeds the max")
// Advance the clock such that it is before the next expected retry time.
bti.clock.AdvanceTo(lastRun.Add(delay - unitTime))
// This allows adopt-loops to run for a few times, which ensures that
// adopt-loops do not resume jobs without correctly following the job
// schedules.
waitUntilCount(t, bti.adopted, bti.adopted.Count()+2)
if bti.expectImmediateRetry && i > 0 {
// Validate that the job did not wait to resume on retry.
// Validate that the job does not wait to resume on retry.
waitUntilCount(t, bti.resumed, expectedResumed+1)
require.Equal(t, expectedResumed+1, bti.resumed.Count(), "unexpected number of jobs resumed in retry %d", i)
} else {
// Validate that the job is not resumed yet.

// This allows adopt-loops to run for a few times, which ensures that
// adopt-loops do not resume jobs without correctly following the job
// schedules.
waitUntilCount(t, bti.adopted, bti.adopted.Count()+2)
require.Equal(t, expectedResumed, bti.resumed.Count(), "unexpected number of jobs resumed in retry %d", i)
}
// Advance the clock by delta from the expected time of next retry.
Expand All @@ -813,9 +819,13 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
}
lastRun = bti.clock.Now()
}
bti.done.Store(true)
// Let the job be retried one more time.
bti.clock.Advance(nextDelay(retryCnt, initialDelay, maxDelay))

if bti.allowCompletion != nil {
bti.allowCompletion()
} else {
bti.done.Store(true)
bti.clock.Advance(nextDelay(retryCnt, initialDelay, maxDelay))
}
// Wait until the job completes.
testutils.SucceedsSoon(t, func() error {
var found Status
Expand Down Expand Up @@ -856,6 +866,11 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
t.Run("pause running", func(t *testing.T) {
ctx := context.Background()
bti := BackoffTestInfra{expectImmediateRetry: true}
bti.allowCompletion = func() {
<-bti.resumeCh
bti.errCh <- nil
<-bti.transitionCh
}
bti.afterJobStateMachineKnob = func() {
if bti.done.Load().(bool) {
return
Expand Down Expand Up @@ -936,7 +951,11 @@ func TestRetriesWithExponentialBackoff(t *testing.T) {
t.Run("pause reverting", func(t *testing.T) {
ctx := context.Background()
bti := BackoffTestInfra{expectImmediateRetry: true}

bti.allowCompletion = func() {
<-bti.failOrCancelCh
bti.errCh <- nil
<-bti.transitionCh
}
bti.afterJobStateMachineKnob = func() {
if bti.done.Load().(bool) {
return
Expand Down

0 comments on commit 6032f68

Please sign in to comment.