From 4de1c9e7131d8185c95bab217f3e8ec5a380d7b4 Mon Sep 17 00:00:00 2001 From: Higan Date: Wed, 24 Jul 2024 23:21:28 +0800 Subject: [PATCH] add SingletonRescheduled metric for executor. --- executor.go | 12 ++++++++---- monitor.go | 7 ++++--- scheduler.go | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/executor.go b/executor.go index 1b13285d..ae346b1e 100644 --- a/executor.go +++ b/executor.go @@ -196,7 +196,7 @@ func (e *executor) start() { default: // runner is busy, reschedule the work for later // which means we just skip it here and do nothing - // TODO when metrics are added, this should increment a rescheduled metric + e.incrementJobCounter(*j, SingletonRescheduled) e.sendOutForRescheduling(&jIn) } } else { @@ -397,9 +397,7 @@ func (e *executor) runJob(j internalJob, jIn jobIn) { startTime := time.Now() err := e.callJobWithRecover(j) - if e.monitor != nil { - e.monitor.RecordJobTiming(startTime, time.Now(), j.id, j.name, j.tags) - } + e.recordJobTiming(startTime, time.Now(), j) if err != nil { _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err) e.incrementJobCounter(j, Fail) @@ -422,6 +420,12 @@ func (e *executor) callJobWithRecover(j internalJob) (err error) { return callJobFuncWithParams(j.function, j.parameters...) } +func (e *executor) recordJobTiming(start time.Time, end time.Time, j internalJob) { + if e.monitor != nil { + e.monitor.RecordJobTiming(start, end, j.id, j.name, j.tags) + } +} + func (e *executor) incrementJobCounter(j internalJob, status JobStatus) { if e.monitor != nil { e.monitor.IncrementJob(j.id, j.name, j.tags, status) diff --git a/monitor.go b/monitor.go index ecf28805..d3c5bbd9 100644 --- a/monitor.go +++ b/monitor.go @@ -11,9 +11,10 @@ type JobStatus string // The different statuses of job that can be used. const ( - Fail JobStatus = "fail" - Success JobStatus = "success" - Skip JobStatus = "skip" + Fail JobStatus = "fail" + Success JobStatus = "success" + Skip JobStatus = "skip" + SingletonRescheduled JobStatus = "singleton_rescheduled" ) // Monitor represents the interface to collect jobs metrics. diff --git a/scheduler.go b/scheduler.go index 4131747d..ec740042 100644 --- a/scheduler.go +++ b/scheduler.go @@ -311,7 +311,7 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) { } // Jobs coming back from the executor to the scheduler that -// need to evaluated for rescheduling. +// need to be evaluated for rescheduling. func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { select { case <-s.shutdownCtx.Done():