diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index 95fa5700ec0d..a7674e35a701 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -2534,18 +2534,20 @@ func TestMetrics(t *testing.T) { jobs.RegisterConstructor(jobspb.TypeImport, func(_ *jobs.Job, _ *cluster.Settings) jobs.Resumer { return res }) - setup := func(t *testing.T) (s serverutils.TestServerInterface, r *jobs.Registry, cleanup func()) { + setup := func(t *testing.T) ( + s serverutils.TestServerInterface, db *gosql.DB, r *jobs.Registry, cleanup func(), + ) { jobConstructorCleanup := jobs.ResetConstructors() - s, _, _ = serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ = serverutils.StartServer(t, base.TestServerArgs{}) r = s.JobRegistry().(*jobs.Registry) - return s, r, func() { + return s, db, r, func() { jobConstructorCleanup() s.Stopper().Stop(ctx) } } t.Run("success", func(t *testing.T) { - _, registry, cleanup := setup(t) + _, _, registry, cleanup := setup(t) defer cleanup() rec := jobs.Record{ DescriptorIDs: []descpb.ID{1}, @@ -2561,7 +2563,7 @@ func TestMetrics(t *testing.T) { int64EqSoon(t, backupMetrics.ResumeCompleted.Count, 1) }) t.Run("restart, pause, resume, then success", func(t *testing.T) { - _, registry, cleanup := setup(t) + _, db, registry, cleanup := setup(t) defer cleanup() rec := jobs.Record{ DescriptorIDs: []descpb.ID{1}, @@ -2592,6 +2594,12 @@ func TestMetrics(t *testing.T) { require.Equal(t, int64(0), importMetrics.ResumeCompleted.Count()) require.Equal(t, int64(0), importMetrics.CurrentlyRunning.Value()) } + { + // Wait for the job to be marked paused. + tdb := sqlutils.MakeSQLRunner(db) + q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) + tdb.CheckQueryResultsRetry(t, q, [][]string{{"paused"}}) + } { // Now resume the job and let it succeed. require.NoError(t, registry.Unpause(ctx, nil, jobID)) @@ -2602,7 +2610,7 @@ func TestMetrics(t *testing.T) { } }) t.Run("failure then restarts in revert", func(t *testing.T) { - _, registry, cleanup := setup(t) + _, _, registry, cleanup := setup(t) defer cleanup() rec := jobs.Record{ DescriptorIDs: []descpb.ID{1}, @@ -2637,7 +2645,7 @@ func TestMetrics(t *testing.T) { } }) t.Run("fail, pause, resume, then success on failure", func(t *testing.T) { - _, registry, cleanup := setup(t) + _, db, registry, cleanup := setup(t) defer cleanup() rec := jobs.Record{ DescriptorIDs: []descpb.ID{1}, @@ -2668,6 +2676,12 @@ func TestMetrics(t *testing.T) { require.Equal(t, int64(0), importMetrics.ResumeCompleted.Count()) require.Equal(t, int64(0), importMetrics.CurrentlyRunning.Value()) } + { + // Wait for the job to be marked paused. + tdb := sqlutils.MakeSQLRunner(db) + q := fmt.Sprintf("SELECT status FROM system.jobs WHERE id = %d", jobID) + tdb.CheckQueryResultsRetry(t, q, [][]string{{"paused"}}) + } { // Now resume the job and let it succeed. require.NoError(t, registry.Unpause(ctx, nil, jobID))