Skip to content

Commit

Permalink
stats: decrease job infrastructure overhead
Browse files Browse the repository at this point in the history
Every job progress update involves a transaction that reads and writes
to the jobs table (even if the progress didn't actually change), and
this internally does a bunch of KV queries because the system table
metadata/leases are not cached. For auto stats (which run frequently)
we want to minimize any overhead.

This change adds an alternative API that only checks for cancellation.
The sample aggregator now uses this API unless progress has changed by
at least 1%. The reporting interval is also increased to 5 seconds.

Release note: None
  • Loading branch information
RaduBerinde committed Mar 8, 2019
1 parent f0f024d commit 0146662
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 12 deletions.
20 changes: 19 additions & 1 deletion pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,30 @@ func (j *Job) Started(ctx context.Context) error {
})
}

// CheckStatus verifies the status of the job and returns an error if the job's
// status isn't Running.
func (j *Job) CheckStatus(ctx context.Context) error {
return j.updateRow(
ctx, updateProgressOnly,
func(
_ *client.Txn, status *Status, payload *jobspb.Payload, _ *jobspb.Progress,
) (doUpdate bool, _ error) {
if *status != StatusRunning {
return false, &InvalidStatusError{*j.id, *status, "update progress on", payload.Error}
}
return false, nil
},
)
}

// RunningStatus updates the detailed status of a job currently in progress.
// It sets the job's RunningStatus field to the value returned by runningStatusFn
// and persists runningStatusFn's modifications to the job's details, if any.
func (j *Job) RunningStatus(ctx context.Context, runningStatusFn RunningStatusFn) error {
return j.updateRow(ctx, updateProgressAndDetails,
func(_ *client.Txn, status *Status, payload *jobspb.Payload, progress *jobspb.Progress) (bool, error) {
func(
_ *client.Txn, status *Status, payload *jobspb.Payload, progress *jobspb.Progress,
) (doUpdate bool, _ error) {
if *status != StatusRunning {
return false, &InvalidStatusError{*j.id, *status, "update progress on", payload.Error}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestRegistryLifecycle(t *testing.T) {
check(t)
sqlDB.Exec(t, "CANCEL JOB $1", *job.ID())
// Test for a canceled error message.
if err := job.FractionProgressed(ctx, jobs.FractionUpdater(0)); !testutils.IsError(err, "cannot update progress on canceled job") {
if err := job.CheckStatus(ctx); !testutils.IsError(err, "cannot update progress on canceled job") {
t.Fatalf("unexpected %v", err)
}
resumeCheckCh <- struct{}{}
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestRegistryLifecycle(t *testing.T) {
t.Fatal(err)
}
// Test for a paused error message.
if err := job.FractionProgressed(ctx, jobs.FractionUpdater(0)); !testutils.IsError(err, "cannot update progress on paused job") {
if err := job.CheckStatus(ctx); !testutils.IsError(err, "cannot update progress on paused job") {
t.Fatalf("unexpected %v", err)
}
}
Expand Down
27 changes: 18 additions & 9 deletions pkg/sql/distsqlrun/sample_aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const sampleAggregatorProcName = "sample aggregator"

// SampleAggregatorProgressInterval is the frequency at which the
// SampleAggregator processor will report progress. It is mutable for testing.
var SampleAggregatorProgressInterval = time.Second
var SampleAggregatorProgressInterval = 5 * time.Second

func newSampleAggregator(
flowCtx *FlowCtx,
Expand Down Expand Up @@ -152,17 +152,19 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er
}
}

progFn := func(pct float32) error {
lastReportedFractionCompleted := float32(-1)
// Report progress (0 to 1).
progFn := func(fractionCompleted float32) error {
if jobID == 0 {
return nil
}
return job.FractionProgressed(ctx, func(ctx context.Context, _ jobspb.ProgressDetails) float32 {
// Float addition can round such that the sum is > 1.
if pct > 1 {
pct = 1
}
return pct
})
// If it changed by less than 1%, just check for cancellation (which is more
// efficient).
if fractionCompleted < 1.0 && fractionCompleted < lastReportedFractionCompleted+0.01 {
return job.CheckStatus(ctx)
}
lastReportedFractionCompleted = fractionCompleted
return job.FractionProgressed(ctx, jobs.FractionUpdater(fractionCompleted))
}

fractionCompleted := float32(0)
Expand All @@ -174,7 +176,14 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er
if meta != nil {
if meta.Progress != nil {
inputProg := meta.Progress.Progress.(*jobspb.Progress_FractionCompleted).FractionCompleted
// TODO(radu): this calculation is very dubious. It only works because
// inputProg is always 0 except one time when it is 1.
fractionCompleted += inputProg / float32(s.spec.InputProcCnt)
if fractionCompleted > 1 {
// Can happen because of rounding errors.
fractionCompleted = 1
}

if progressUpdates.ShouldProcess(timeutil.Now()) {
// Periodically report fraction progressed and check that the job has
// not been paused or canceled.
Expand Down

0 comments on commit 0146662

Please sign in to comment.