Skip to content

Commit

Permalink
Emit metrics for eval waitUntil as nomad.nomad.broker.eval_waiting (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
joel0 authored Jan 6, 2022
1 parent af08736 commit eacdc63
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration,
delayedEvalsUpdateCh: make(chan struct{}, 1),
}
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)

return b, nil
}
Expand Down Expand Up @@ -241,6 +242,7 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
if !eval.WaitUntil.IsZero() {
b.delayHeap.Push(&evalWrapper{eval}, eval.WaitUntil)
b.stats.TotalWaiting += 1
b.stats.DelayedEvals[eval.ID] = eval
// Signal an update.
select {
case b.delayedEvalsUpdateCh <- struct{}{}:
Expand Down Expand Up @@ -723,6 +725,7 @@ func (b *EvalBroker) flush() {
b.stats.TotalUnacked = 0
b.stats.TotalBlocked = 0
b.stats.TotalWaiting = 0
b.stats.DelayedEvals = make(map[string]*structs.Evaluation)
b.stats.ByScheduler = make(map[string]*SchedulerStats)
b.evals = make(map[string]int)
b.jobEvals = make(map[structs.NamespacedID]string)
Expand Down Expand Up @@ -777,6 +780,7 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan
b.l.Lock()
b.delayHeap.Remove(&evalWrapper{eval})
b.stats.TotalWaiting -= 1
delete(b.stats.DelayedEvals, eval.ID)
b.enqueueLocked(eval, eval.Type)
b.l.Unlock()
case <-updateCh:
Expand Down Expand Up @@ -807,6 +811,7 @@ func (b *EvalBroker) nextDelayedEval() (*structs.Evaluation, time.Time) {
func (b *EvalBroker) Stats() *BrokerStats {
// Allocate a new stats struct
stats := new(BrokerStats)
stats.DelayedEvals = make(map[string]*structs.Evaluation)
stats.ByScheduler = make(map[string]*SchedulerStats)

b.l.RLock()
Expand All @@ -817,10 +822,13 @@ func (b *EvalBroker) Stats() *BrokerStats {
stats.TotalUnacked = b.stats.TotalUnacked
stats.TotalBlocked = b.stats.TotalBlocked
stats.TotalWaiting = b.stats.TotalWaiting
for id, eval := range b.stats.DelayedEvals {
evalCopy := *eval
stats.DelayedEvals[id] = &evalCopy
}
for sched, subStat := range b.stats.ByScheduler {
subStatCopy := new(SchedulerStats)
*subStatCopy = *subStat
stats.ByScheduler[sched] = subStatCopy
subStatCopy := *subStat
stats.ByScheduler[sched] = &subStatCopy
}
return stats
}
Expand All @@ -835,6 +843,15 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked))
metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting))
for _, eval := range stats.DelayedEvals {
metrics.SetGaugeWithLabels([]string{"nomad", "broker", "eval_waiting"},
float32(time.Until(eval.WaitUntil).Seconds()),
[]metrics.Label{
{Name: "eval_id", Value: eval.ID},
{Name: "job", Value: eval.JobID},
{Name: "namespace", Value: eval.Namespace},
})
}
for sched, schedStats := range stats.ByScheduler {
metrics.SetGauge([]string{"nomad", "broker", sched, "ready"}, float32(schedStats.Ready))
metrics.SetGauge([]string{"nomad", "broker", sched, "unacked"}, float32(schedStats.Unacked))
Expand All @@ -852,6 +869,7 @@ type BrokerStats struct {
TotalUnacked int
TotalBlocked int
TotalWaiting int
DelayedEvals map[string]*structs.Evaluation
ByScheduler map[string]*SchedulerStats
}

Expand Down

0 comments on commit eacdc63

Please sign in to comment.