diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 2e1bfcd214fd..313bde7d939f 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -118,14 +117,11 @@ const recheckRunningAfter = 1 * time.Minute type loopStats struct { rescheduleWait, rescheduleSkip, started int64 - readyToRun, jobsRunning int64 malformed int64 } func (s *loopStats) updateMetrics(m *SchedulerMetrics) { m.NumStarted.Update(s.started) - m.ReadyToRun.Update(s.readyToRun) - m.NumRunning.Update(s.jobsRunning) m.RescheduleSkip.Update(s.rescheduleSkip) m.RescheduleWait.Update(s.rescheduleWait) m.NumMalformedSchedules.Update(s.malformed) @@ -194,36 +190,6 @@ func (s *jobScheduler) processSchedule( return schedule.Update(ctx, s.InternalExecutor, txn) } -// TODO(yevgeniy): Re-evaluate if we need to have per-loop execution statistics. -func newLoopStats( - ctx context.Context, env scheduledjobs.JobSchedulerEnv, ex sqlutil.InternalExecutor, txn *kv.Txn, -) (*loopStats, error) { - numRunningJobsStmt := fmt.Sprintf( - "SELECT count(*) FROM %s WHERE created_by_type = '%s' AND status NOT IN ('%s', '%s', '%s', '%s')", - env.SystemJobsTableName(), CreatedByScheduledJobs, - StatusSucceeded, StatusCanceled, StatusFailed, StatusRevertFailed) - readyToRunStmt := fmt.Sprintf( - "SELECT count(*) FROM %s WHERE next_run < %s", - env.ScheduledJobsTableName(), env.NowExpr()) - statsStmt := fmt.Sprintf( - "SELECT (%s) numReadySchedules, (%s) numRunningJobs", - readyToRunStmt, numRunningJobsStmt) - - datums, err := ex.QueryRowEx(ctx, "scheduler-stats", txn, - sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, - statsStmt) - if err != nil { - return nil, err - } - if datums == nil { - return nil, errors.New("failed to read scheduler stats") - } - stats := &loopStats{} - stats.readyToRun = int64(tree.MustBeDInt(datums[0])) - stats.jobsRunning = int64(tree.MustBeDInt(datums[1])) - return stats, nil -} - type savePointError struct { err error } @@ -268,11 +234,7 @@ func withSavePoint(ctx context.Context, txn *kv.Txn, fn func() error) error { func (s *jobScheduler) executeSchedules( ctx context.Context, maxSchedules int64, txn *kv.Txn, ) (retErr error) { - stats, err := newLoopStats(ctx, s.env, s.InternalExecutor, txn) - if err != nil { - return err - } - + var stats loopStats defer stats.updateMetrics(&s.metrics) findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules) @@ -308,10 +270,10 @@ func (s *jobScheduler) executeSchedules( return contextutil.RunWithTimeout( ctx, fmt.Sprintf("process-schedule-%d", schedule.ScheduleID()), timeout, func(ctx context.Context) error { - return s.processSchedule(ctx, schedule, numRunning, stats, txn) + return s.processSchedule(ctx, schedule, numRunning, &stats, txn) }) } - return s.processSchedule(ctx, schedule, numRunning, stats, txn) + return s.processSchedule(ctx, schedule, numRunning, &stats, txn) }); processErr != nil { if errors.HasType(processErr, (*savePointError)(nil)) { return errors.Wrapf(processErr, "savepoint error for schedule %d", schedule.ScheduleID()) diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 419bbba4623e..964e9f6d5971 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -454,8 +454,13 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) { daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry()) daemon.runDaemon(ctx, stopper) + readyToRunStmt := fmt.Sprintf( + "SELECT count(*) FROM %s WHERE next_run < %s", + h.env.ScheduledJobsTableName(), h.env.NowExpr()) testutils.SucceedsSoon(t, func() error { - if ready := daemon.metrics.ReadyToRun.Value(); numJobs != ready { + var ready int + h.sqlDB.QueryRow(t, readyToRunStmt).Scan(&ready) + if ready != numJobs-jobsPerIteration { return errors.Errorf("waiting for metric %d = %d", ready, numJobs) } return nil diff --git a/pkg/jobs/schedule_metrics.go b/pkg/jobs/schedule_metrics.go index f613e2b5296f..a60e88f64e25 100644 --- a/pkg/jobs/schedule_metrics.go +++ b/pkg/jobs/schedule_metrics.go @@ -31,12 +31,8 @@ func (m *ExecutorMetrics) MetricStruct() {} // SchedulerMetrics are metrics specific to job scheduler daemon. type SchedulerMetrics struct { - // Number of schedules that were ready to execute. - ReadyToRun *metric.Gauge // Number of scheduled jobs started. NumStarted *metric.Gauge - // Number of jobs started by schedules that are currently running. - NumRunning *metric.Gauge // Number of schedules rescheduled due to SKIP policy. RescheduleSkip *metric.Gauge // Number of schedules rescheduled due to WAIT policy. @@ -51,20 +47,6 @@ type SchedulerMetrics struct { // MakeSchedulerMetrics returns metrics for scheduled job daemon. func MakeSchedulerMetrics() SchedulerMetrics { return SchedulerMetrics{ - ReadyToRun: metric.NewGauge(metric.Metadata{ - Name: "schedules.round.schedules-ready-to-run", - Help: "The number of jobs ready to execute", - Measurement: "Schedules", - Unit: metric.Unit_COUNT, - }), - - NumRunning: metric.NewGauge(metric.Metadata{ - Name: "schedules.round.num-jobs-running", - Help: "The number of jobs started by schedules that are currently running", - Measurement: "Jobs", - Unit: metric.Unit_COUNT, - }), - NumStarted: metric.NewGauge(metric.Metadata{ Name: "schedules.round.jobs-started", Help: "The number of jobs started",