diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 6277df392d7c..39372e22169d 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -152,12 +152,30 @@ func (j *Job) Started(ctx context.Context) error { }) } +// CheckStatus verifies the status of the job and returns an error if the job's +// status isn't Running. +func (j *Job) CheckStatus(ctx context.Context) error { + return j.updateRow( + ctx, updateProgressOnly, + func( + _ *client.Txn, status *Status, payload *jobspb.Payload, _ *jobspb.Progress, + ) (doUpdate bool, _ error) { + if *status != StatusRunning { + return false, &InvalidStatusError{*j.id, *status, "update progress on", payload.Error} + } + return false, nil + }, + ) +} + // RunningStatus updates the detailed status of a job currently in progress. // It sets the job's RunningStatus field to the value returned by runningStatusFn // and persists runningStatusFn's modifications to the job's details, if any. func (j *Job) RunningStatus(ctx context.Context, runningStatusFn RunningStatusFn) error { return j.updateRow(ctx, updateProgressAndDetails, - func(_ *client.Txn, status *Status, payload *jobspb.Payload, progress *jobspb.Progress) (bool, error) { + func( + _ *client.Txn, status *Status, payload *jobspb.Payload, progress *jobspb.Progress, + ) (doUpdate bool, _ error) { if *status != StatusRunning { return false, &InvalidStatusError{*j.id, *status, "update progress on", payload.Error} } diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index f5bade9f6398..17b15ff83a4b 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -299,7 +299,7 @@ func TestRegistryLifecycle(t *testing.T) { check(t) sqlDB.Exec(t, "CANCEL JOB $1", *job.ID()) // Test for a canceled error message. - if err := job.FractionProgressed(ctx, jobs.FractionUpdater(0)); !testutils.IsError(err, "cannot update progress on canceled job") { + if err := job.CheckStatus(ctx); !testutils.IsError(err, "cannot update progress on canceled job") { t.Fatalf("unexpected %v", err) } resumeCheckCh <- struct{}{} @@ -370,7 +370,7 @@ func TestRegistryLifecycle(t *testing.T) { t.Fatal(err) } // Test for a paused error message. - if err := job.FractionProgressed(ctx, jobs.FractionUpdater(0)); !testutils.IsError(err, "cannot update progress on paused job") { + if err := job.CheckStatus(ctx); !testutils.IsError(err, "cannot update progress on paused job") { t.Fatalf("unexpected %v", err) } } diff --git a/pkg/sql/distsqlrun/sample_aggregator.go b/pkg/sql/distsqlrun/sample_aggregator.go index c469112bf86e..bcc292d5d08f 100644 --- a/pkg/sql/distsqlrun/sample_aggregator.go +++ b/pkg/sql/distsqlrun/sample_aggregator.go @@ -60,7 +60,7 @@ const sampleAggregatorProcName = "sample aggregator" // SampleAggregatorProgressInterval is the frequency at which the // SampleAggregator processor will report progress. It is mutable for testing. -var SampleAggregatorProgressInterval = time.Second +var SampleAggregatorProgressInterval = 5 * time.Second func newSampleAggregator( flowCtx *FlowCtx, @@ -152,17 +152,19 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er } } - progFn := func(pct float32) error { + lastReportedFractionCompleted := float32(-1) + // Report progress (0 to 1). + progFn := func(fractionCompleted float32) error { if jobID == 0 { return nil } - return job.FractionProgressed(ctx, func(ctx context.Context, _ jobspb.ProgressDetails) float32 { - // Float addition can round such that the sum is > 1. - if pct > 1 { - pct = 1 - } - return pct - }) + // If it changed by less than 1%, just check for cancellation (which is more + // efficient). + if fractionCompleted < 1.0 && fractionCompleted < lastReportedFractionCompleted+0.01 { + return job.CheckStatus(ctx) + } + lastReportedFractionCompleted = fractionCompleted + return job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted)) } fractionCompleted := float32(0) @@ -174,7 +176,14 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er if meta != nil { if meta.Progress != nil { inputProg := meta.Progress.Progress.(*jobspb.Progress_FractionCompleted).FractionCompleted + // TODO(radu): this calculation is very dubious. It only works because + // inputProg is always 0 except one time when it is 1. fractionCompleted += inputProg / float32(s.spec.InputProcCnt) + if fractionCompleted > 1 { + // Can happen because of rounding errors. + fractionCompleted = 1 + } + if progressUpdates.ShouldProcess(timeutil.Now()) { // Periodically report fraction progressed and check that the job has // not been paused or canceled.