From 0146662d10000a5a23c42935e5bf837af6c4b97b Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Thu, 7 Mar 2019 10:43:13 -0500 Subject: [PATCH] stats: decrease job infrastructure overhead Every job progress update involves a transaction that reads and writes to the jobs table (even if the progress didn't actually change), and this internally does a bunch of KV queries because the system table metadata/leases are not cached. For auto stats (which run frequently) we want to minimize any overhead. This change adds an alternative API that only checks for cancellation. The sample aggregator now uses this API unless progress has changed by at least 1%. The reporting interval is also increased to 5 seconds. Release note: None --- pkg/jobs/jobs.go | 20 +++++++++++++++++- pkg/jobs/jobs_test.go | 4 ++-- pkg/sql/distsqlrun/sample_aggregator.go | 27 ++++++++++++++++--------- 3 files changed, 39 insertions(+), 12 deletions(-) diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index ba3fab51b624..1a4e1ece8286 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -151,12 +151,30 @@ func (j *Job) Started(ctx context.Context) error { }) } +// CheckStatus verifies the status of the job and returns an error if the job's +// status isn't Running. +func (j *Job) CheckStatus(ctx context.Context) error { + return j.updateRow( + ctx, updateProgressOnly, + func( + _ *client.Txn, status *Status, payload *jobspb.Payload, _ *jobspb.Progress, + ) (doUpdate bool, _ error) { + if *status != StatusRunning { + return false, &InvalidStatusError{*j.id, *status, "update progress on", payload.Error} + } + return false, nil + }, + ) +} + // RunningStatus updates the detailed status of a job currently in progress. // It sets the job's RunningStatus field to the value returned by runningStatusFn // and persists runningStatusFn's modifications to the job's details, if any. func (j *Job) RunningStatus(ctx context.Context, runningStatusFn RunningStatusFn) error { return j.updateRow(ctx, updateProgressAndDetails, - func(_ *client.Txn, status *Status, payload *jobspb.Payload, progress *jobspb.Progress) (bool, error) { + func( + _ *client.Txn, status *Status, payload *jobspb.Payload, progress *jobspb.Progress, + ) (doUpdate bool, _ error) { if *status != StatusRunning { return false, &InvalidStatusError{*j.id, *status, "update progress on", payload.Error} } diff --git a/pkg/jobs/jobs_test.go b/pkg/jobs/jobs_test.go index f5bade9f6398..17b15ff83a4b 100644 --- a/pkg/jobs/jobs_test.go +++ b/pkg/jobs/jobs_test.go @@ -299,7 +299,7 @@ func TestRegistryLifecycle(t *testing.T) { check(t) sqlDB.Exec(t, "CANCEL JOB $1", *job.ID()) // Test for a canceled error message. - if err := job.FractionProgressed(ctx, jobs.FractionUpdater(0)); !testutils.IsError(err, "cannot update progress on canceled job") { + if err := job.CheckStatus(ctx); !testutils.IsError(err, "cannot update progress on canceled job") { t.Fatalf("unexpected %v", err) } resumeCheckCh <- struct{}{} @@ -370,7 +370,7 @@ func TestRegistryLifecycle(t *testing.T) { t.Fatal(err) } // Test for a paused error message. - if err := job.FractionProgressed(ctx, jobs.FractionUpdater(0)); !testutils.IsError(err, "cannot update progress on paused job") { + if err := job.CheckStatus(ctx); !testutils.IsError(err, "cannot update progress on paused job") { t.Fatalf("unexpected %v", err) } } diff --git a/pkg/sql/distsqlrun/sample_aggregator.go b/pkg/sql/distsqlrun/sample_aggregator.go index c469112bf86e..bcc292d5d08f 100644 --- a/pkg/sql/distsqlrun/sample_aggregator.go +++ b/pkg/sql/distsqlrun/sample_aggregator.go @@ -60,7 +60,7 @@ const sampleAggregatorProcName = "sample aggregator" // SampleAggregatorProgressInterval is the frequency at which the // SampleAggregator processor will report progress. It is mutable for testing. -var SampleAggregatorProgressInterval = time.Second +var SampleAggregatorProgressInterval = 5 * time.Second func newSampleAggregator( flowCtx *FlowCtx, @@ -152,17 +152,19 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er } } - progFn := func(pct float32) error { + lastReportedFractionCompleted := float32(-1) + // Report progress (0 to 1). + progFn := func(fractionCompleted float32) error { if jobID == 0 { return nil } - return job.FractionProgressed(ctx, func(ctx context.Context, _ jobspb.ProgressDetails) float32 { - // Float addition can round such that the sum is > 1. - if pct > 1 { - pct = 1 - } - return pct - }) + // If it changed by less than 1%, just check for cancellation (which is more + // efficient). + if fractionCompleted < 1.0 && fractionCompleted < lastReportedFractionCompleted+0.01 { + return job.CheckStatus(ctx) + } + lastReportedFractionCompleted = fractionCompleted + return job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted)) } fractionCompleted := float32(0) @@ -174,7 +176,14 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er if meta != nil { if meta.Progress != nil { inputProg := meta.Progress.Progress.(*jobspb.Progress_FractionCompleted).FractionCompleted + // TODO(radu): this calculation is very dubious. It only works because + // inputProg is always 0 except one time when it is 1. fractionCompleted += inputProg / float32(s.spec.InputProcCnt) + if fractionCompleted > 1 { + // Can happen because of rounding errors. + fractionCompleted = 1 + } + if progressUpdates.ShouldProcess(timeutil.Now()) { // Periodically report fraction progressed and check that the job has // not been paused or canceled.