Skip to content

Commit

Permalink
jobs: Remove unnecessary and expensive scheduled jobs metrics.
Browse files Browse the repository at this point in the history
Remove `schedules.round.schedules-ready-to-run` and `schedules.round.num-jobs-running`
metrics from job scheduler.  These metrics are very expensive to compute as they
involve running wider table scans against both `system.jobs` and `system.scheduled_job`.

In addition to being expensive to compute, these metrics are not needed since
the query can be executed directly if needed, and, in addition these metrics are confusing
since these metrics are per node, while the number of running jobs/schedules is cluster wide.
More importantly, they can lead to job scheduler query being more expensive since they increase
the read set of the scheduler transaction, thus causing txn restarts to be more expensive.

Fixes cockroachdb#78447

Release Notes (enterprise): Remove expensive, unnecessary, and never used
`schedules.round.schedules-ready-to-run` and `schedules.round.num-jobs-running` metrics from
job schedulers.

Release Justification: Stability fix for scheduled job system.
  • Loading branch information
Yevgeniy Miretskiy committed Mar 25, 2022
1 parent fc9493a commit 3c9380f
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 60 deletions.
44 changes: 3 additions & 41 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -118,14 +117,11 @@ const recheckRunningAfter = 1 * time.Minute

type loopStats struct {
rescheduleWait, rescheduleSkip, started int64
readyToRun, jobsRunning int64
malformed int64
}

func (s *loopStats) updateMetrics(m *SchedulerMetrics) {
m.NumStarted.Update(s.started)
m.ReadyToRun.Update(s.readyToRun)
m.NumRunning.Update(s.jobsRunning)
m.RescheduleSkip.Update(s.rescheduleSkip)
m.RescheduleWait.Update(s.rescheduleWait)
m.NumMalformedSchedules.Update(s.malformed)
Expand Down Expand Up @@ -194,36 +190,6 @@ func (s *jobScheduler) processSchedule(
return schedule.Update(ctx, s.InternalExecutor, txn)
}

// TODO(yevgeniy): Re-evaluate if we need to have per-loop execution statistics.
func newLoopStats(
ctx context.Context, env scheduledjobs.JobSchedulerEnv, ex sqlutil.InternalExecutor, txn *kv.Txn,
) (*loopStats, error) {
numRunningJobsStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE created_by_type = '%s' AND status NOT IN ('%s', '%s', '%s', '%s')",
env.SystemJobsTableName(), CreatedByScheduledJobs,
StatusSucceeded, StatusCanceled, StatusFailed, StatusRevertFailed)
readyToRunStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE next_run < %s",
env.ScheduledJobsTableName(), env.NowExpr())
statsStmt := fmt.Sprintf(
"SELECT (%s) numReadySchedules, (%s) numRunningJobs",
readyToRunStmt, numRunningJobsStmt)

datums, err := ex.QueryRowEx(ctx, "scheduler-stats", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
statsStmt)
if err != nil {
return nil, err
}
if datums == nil {
return nil, errors.New("failed to read scheduler stats")
}
stats := &loopStats{}
stats.readyToRun = int64(tree.MustBeDInt(datums[0]))
stats.jobsRunning = int64(tree.MustBeDInt(datums[1]))
return stats, nil
}

type savePointError struct {
err error
}
Expand Down Expand Up @@ -268,11 +234,7 @@ func withSavePoint(ctx context.Context, txn *kv.Txn, fn func() error) error {
func (s *jobScheduler) executeSchedules(
ctx context.Context, maxSchedules int64, txn *kv.Txn,
) (retErr error) {
stats, err := newLoopStats(ctx, s.env, s.InternalExecutor, txn)
if err != nil {
return err
}

var stats loopStats
defer stats.updateMetrics(&s.metrics)

findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules)
Expand Down Expand Up @@ -314,10 +276,10 @@ func (s *jobScheduler) executeSchedules(
return contextutil.RunWithTimeout(
ctx, fmt.Sprintf("process-schedule-%d", schedule.ScheduleID()), timeout,
func(ctx context.Context) error {
return s.processSchedule(ctx, schedule, numRunning, stats, txn)
return s.processSchedule(ctx, schedule, numRunning, &stats, txn)
})
}
return s.processSchedule(ctx, schedule, numRunning, stats, txn)
return s.processSchedule(ctx, schedule, numRunning, &stats, txn)
}); processErr != nil {
if errors.HasType(processErr, (*savePointError)(nil)) {
return errors.Wrapf(processErr, "savepoint error for schedule %d", schedule.ScheduleID())
Expand Down
7 changes: 6 additions & 1 deletion pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,13 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) {
daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry())
daemon.runDaemon(ctx, stopper)

readyToRunStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE next_run < %s",
h.env.ScheduledJobsTableName(), h.env.NowExpr())
testutils.SucceedsSoon(t, func() error {
if ready := daemon.metrics.ReadyToRun.Value(); numJobs != ready {
var ready int
h.sqlDB.QueryRow(t, readyToRunStmt).Scan(&ready)
if ready != numJobs-jobsPerIteration {
return errors.Errorf("waiting for metric %d = %d", ready, numJobs)
}
return nil
Expand Down
18 changes: 0 additions & 18 deletions pkg/jobs/schedule_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ func (m *ExecutorMetrics) MetricStruct() {}

// SchedulerMetrics are metrics specific to job scheduler daemon.
type SchedulerMetrics struct {
// Number of schedules that were ready to execute.
ReadyToRun *metric.Gauge
// Number of scheduled jobs started.
NumStarted *metric.Gauge
// Number of jobs started by schedules that are currently running.
NumRunning *metric.Gauge
// Number of schedules rescheduled due to SKIP policy.
RescheduleSkip *metric.Gauge
// Number of schedules rescheduled due to WAIT policy.
Expand All @@ -51,20 +47,6 @@ type SchedulerMetrics struct {
// MakeSchedulerMetrics returns metrics for scheduled job daemon.
func MakeSchedulerMetrics() SchedulerMetrics {
return SchedulerMetrics{
ReadyToRun: metric.NewGauge(metric.Metadata{
Name: "schedules.round.schedules-ready-to-run",
Help: "The number of jobs ready to execute",
Measurement: "Schedules",
Unit: metric.Unit_COUNT,
}),

NumRunning: metric.NewGauge(metric.Metadata{
Name: "schedules.round.num-jobs-running",
Help: "The number of jobs started by schedules that are currently running",
Measurement: "Jobs",
Unit: metric.Unit_COUNT,
}),

NumStarted: metric.NewGauge(metric.Metadata{
Name: "schedules.round.jobs-started",
Help: "The number of jobs started",
Expand Down

0 comments on commit 3c9380f

Please sign in to comment.