Skip to content

Commit

Permalink
Fairness preemption metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
tanveergill committed Jan 10, 2024
1 parent daf1d90 commit 0d5eb84
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 58 deletions.
25 changes: 14 additions & 11 deletions docs/content/reference/observability/prometheus-metrics/agent.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,20 @@ This document describes the Prometheus metrics generated by Aperture Agents.

<!-- vale off -->

| Name | Type | Labels | Unit | Description |
| ----------------------------------- | ------- | -------------------------------------------------------------------------------------------------------------------------------- | --------------- | --------------------------------------------------------------------------------- |
| token_bucket_lm_ratio | Gauge | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id | percentage | A gauge that tracks the load multiplier |
| token_bucket_fill_rate | Gauge | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id | tokens/s | A gauge that tracks the fill rate of token bucket |
| token_bucket_capacity_total | Gauge | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id | count (no unit) | A gauge that tracks the capacity of token bucket |
| token_bucket_available_tokens_total | Gauge | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id | count (no unit) | A gauge that tracks the number of tokens available in token bucket |
| workload_requests_total | Counter | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index, decision_type, limiter_dropped | count (no unit) | A counter of workload requests |
| request_in_queue_duration_ms | Summary | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index | ms | Metric used for grouping durations for requests by workload in queue of Scheduler |
| workload_preempted_tokens | Summary | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index | token | Metric used for counting tokens preempted per request |
| workload_delayed_tokens | Summary | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index | token | Metric used for counting tokens delayed per request |
| workload_on_time_total | Counter | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index, decision_type, limiter_dropped | count (no unit) | Metric used for counting requests that are on time, neither preempted nor delayed |
| Name | Type | Labels | Unit | Description |
| ----------------------------------- | ------- | -------------------------------------------------------------------------------------------------------------------------------- | --------------- | -------------------------------------------------------------------------------------------------------------------------------------------- |
| token_bucket_lm_ratio | Gauge | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id | percentage | A gauge that tracks the load multiplier |
| token_bucket_fill_rate | Gauge | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id | tokens/s | A gauge that tracks the fill rate of token bucket |
| token_bucket_capacity_total | Gauge | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id | count (no unit) | A gauge that tracks the capacity of token bucket |
| token_bucket_available_tokens_total | Gauge | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id | count (no unit) | A gauge that tracks the number of tokens available in token bucket |
| workload_requests_total | Counter | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index, decision_type, limiter_dropped | count (no unit) | A counter of workload requests |
| request_in_queue_duration_ms | Summary | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index | ms | Metric used for grouping durations for requests by workload in queue of Scheduler |
| workload_preempted_tokens | Summary | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index | token | Metric used for counting tokens preempted per request measured end-to-end in the scheduler across all workloads. |
| workload_delayed_tokens | Summary | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index | token | Metric used for counting tokens delayed per request measured end-to-end in the scheduler across all workloads. |
| workload_on_time_total | Counter | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, workload_index, decision_type, limiter_dropped | count (no unit) | Metric used for counting requests that are on time, neither preempted nor delayed measured end-to-end in the scheduler across all workloads. |
| fairness_preempted_tokens | Summary | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, fairness_index | token | Metric used for counting tokens preempted per request measured at fairness queues within the same workload. |
| fairness_delayed_tokens | Summary | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, fairness_index | token | Metric used for counting tokens delayed per request measured at fairness queues within the same workload. |
| fairness_on_time_total | Counter | agent_group, instance, job, process_uuid, policy_name, policy_hash, component_id, fairness_index, decision_type, limiter_dropped | count (no unit) | Metric used for counting requests that are on time, neither preempted nor delayed measured at fairness queues within the same workload. |

<!-- vale on -->

Expand Down
12 changes: 9 additions & 3 deletions pkg/metrics/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,18 @@ const (
WorkloadCounterMetricName = "workload_requests_total"
// RequestInQueueDurationMetricName - metric used for grouping durations for requests in queue of Scheduler.
RequestInQueueDurationMetricName = "request_in_queue_duration_ms"
// WorkloadPreemptedTokensMetricName - metric used for counting tokens preempted per request.
// WorkloadPreemptedTokensMetricName - metric used for counting tokens preempted per request measured end-to-end in the scheduler across all workloads.
WorkloadPreemptedTokensMetricName = "workload_preempted_tokens"
// WorkloadDelayedTokensMetricName - metric used for counting tokens delayed per request.
// WorkloadDelayedTokensMetricName - metric used for counting tokens delayed per request measured end-to-end in the scheduler across all workloads.
WorkloadDelayedTokensMetricName = "workload_delayed_tokens"
// WorkloadOnTimeMetricName - metric used for counting requests that are on time, neither preempted nor delayed.
// WorkloadOnTimeMetricName - metric used for counting requests that are on time, neither preempted nor delayed measured end-to-end in the scheduler across all workloads.
WorkloadOnTimeMetricName = "workload_on_time_total"
// FairnessPreemptedTokensMetricName - metric used for counting tokens preempted per request measured at fairness queues within the same workload.
FairnessPreemptedTokensMetricName = "fairness_preempted_tokens"
// FairnessDelayedTokensMetricName - metric used for counting tokens delayed per request measured at fairness queues within the same workload.
FairnessDelayedTokensMetricName = "fairness_delayed_tokens"
// FairnessOnTimeMetricName - metric used for counting requests that are on time, neither preempted nor delayed measured at fairness queues within the same workload.
FairnessOnTimeMetricName = "fairness_on_time_total"

// IncomingTokensMetricName - total work measured in tokens of all incoming requests.
IncomingTokensMetricName = "incoming_tokens_total"
Expand Down
91 changes: 88 additions & 3 deletions pkg/policies/flowcontrol/actuators/workload-scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Factory struct {
workloadPreemptedTokensSummaryVec *prometheus.SummaryVec
workloadDelayedTokensSummaryVec *prometheus.SummaryVec
workloadOnTimeCounterVec *prometheus.CounterVec

fairnessPreemptedTokensSummaryVec *prometheus.SummaryVec
fairnessDelayedTokensSummaryVec *prometheus.SummaryVec
fairnessOnTimeCounterVec *prometheus.CounterVec
}

// newFactory sets up the load scheduler module in the main fx app.
Expand Down Expand Up @@ -121,7 +125,7 @@ func newFactory(

wsFactory.workloadPreemptedTokensSummaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: metrics.WorkloadPreemptedTokensMetricName,
Help: "Number of tokens a request was preempted by",
Help: "Number of tokens a request was preempted, measured end-to-end in the scheduler across all workloads.",
}, []string{
metrics.PolicyNameLabel,
metrics.PolicyHashLabel,
Expand All @@ -131,7 +135,7 @@ func newFactory(

wsFactory.workloadDelayedTokensSummaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: metrics.WorkloadDelayedTokensMetricName,
Help: "Number of tokens a request was delayed by",
Help: "Number of tokens a request was delayed by, measured end-to-end in the scheduler across all workloads.",
}, []string{
metrics.PolicyNameLabel,
metrics.PolicyHashLabel,
Expand All @@ -141,7 +145,37 @@ func newFactory(

wsFactory.workloadOnTimeCounterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: metrics.WorkloadOnTimeMetricName,
Help: "Counter of workload requests that were on time",
Help: "Counter of workload requests that were on time, measured end-to-end in the scheduler across all workloads.",
}, []string{
metrics.PolicyNameLabel,
metrics.PolicyHashLabel,
metrics.ComponentIDLabel,
metrics.WorkloadIndexLabel,
})

wsFactory.fairnessPreemptedTokensSummaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: metrics.FairnessPreemptedTokensMetricName,
Help: "Number of tokens a request was preempted, measured at fairness queues within the same workload.",
}, []string{
metrics.PolicyNameLabel,
metrics.PolicyHashLabel,
metrics.ComponentIDLabel,
metrics.WorkloadIndexLabel,
})

wsFactory.fairnessDelayedTokensSummaryVec = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: metrics.FairnessDelayedTokensMetricName,
Help: "Number of tokens a request was delayed by, measured at fairness queues within the same workload.",
}, []string{
metrics.PolicyNameLabel,
metrics.PolicyHashLabel,
metrics.ComponentIDLabel,
metrics.WorkloadIndexLabel,
})

wsFactory.fairnessOnTimeCounterVec = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: metrics.FairnessOnTimeMetricName,
Help: "Counter of workload requests that were on time, measured at fairness queues within the same workload.",
}, []string{
metrics.PolicyNameLabel,
metrics.PolicyHashLabel,
Expand Down Expand Up @@ -189,6 +223,18 @@ func newFactory(
if err != nil {
merr = multierr.Append(merr, err)
}
err = prometheusRegistry.Register(wsFactory.fairnessPreemptedTokensSummaryVec)
if err != nil {
merr = multierr.Append(merr, err)
}
err = prometheusRegistry.Register(wsFactory.fairnessDelayedTokensSummaryVec)
if err != nil {
merr = multierr.Append(merr, err)
}
err = prometheusRegistry.Register(wsFactory.fairnessOnTimeCounterVec)
if err != nil {
merr = multierr.Append(merr, err)
}

return merr
},
Expand Down Expand Up @@ -231,6 +277,18 @@ func newFactory(
err := fmt.Errorf("failed to unregister workload_on_time_total metric")
merr = multierr.Append(merr, err)
}
if !prometheusRegistry.Unregister(wsFactory.fairnessPreemptedTokensSummaryVec) {
err := fmt.Errorf("failed to unregister fairness_preempted_tokens metric")
merr = multierr.Append(merr, err)
}
if !prometheusRegistry.Unregister(wsFactory.fairnessDelayedTokensSummaryVec) {
err := fmt.Errorf("failed to unregister fairness_delayed_tokens metric")
merr = multierr.Append(merr, err)
}
if !prometheusRegistry.Unregister(wsFactory.fairnessOnTimeCounterVec) {
err := fmt.Errorf("failed to unregister fairness_on_time_total metric")
merr = multierr.Append(merr, err)
}

return merr
},
Expand Down Expand Up @@ -293,6 +351,9 @@ func (wsFactory *Factory) NewSchedulerMetrics(metricLabels prometheus.Labels) (*
WorkloadPreemptedTokensSummary: wsFactory.workloadPreemptedTokensSummaryVec,
WorkloadDelayedTokensSummary: wsFactory.workloadDelayedTokensSummaryVec,
WorkloadOnTimeCounter: wsFactory.workloadOnTimeCounterVec,
FairnessPreemptedTokensSummary: wsFactory.fairnessPreemptedTokensSummaryVec,
FairnessDelayedTokensSummary: wsFactory.fairnessDelayedTokensSummaryVec,
FairnessOnTimeCounter: wsFactory.fairnessOnTimeCounterVec,
}

return &SchedulerMetrics{
Expand Down Expand Up @@ -342,6 +403,18 @@ func (sm *SchedulerMetrics) Delete() error {
if deletedCount == 0 {
log.Warn().Msg("Could not delete workload_on_time_total counter from its metric vector.")
}
deletedCount = sm.wsFactory.fairnessPreemptedTokensSummaryVec.DeletePartialMatch(sm.metricLabels)
if deletedCount == 0 {
log.Warn().Msg("Could not delete fairness_preempted_tokens summary from its metric vector.")
}
deletedCount = sm.wsFactory.fairnessDelayedTokensSummaryVec.DeletePartialMatch(sm.metricLabels)
if deletedCount == 0 {
log.Warn().Msg("Could not delete fairness_delayed_tokens summary from its metric vector.")
}
deletedCount = sm.wsFactory.fairnessOnTimeCounterVec.DeletePartialMatch(sm.metricLabels)
if deletedCount == 0 {
log.Warn().Msg("Could not delete fairness_on_time_total counter from its metric vector.")
}
return merr
}

Expand Down Expand Up @@ -396,6 +469,18 @@ func (wsFactory *Factory) NewScheduler(
if err != nil {
return fmt.Errorf("%w: failed to get workload_on_time_total counter", err)
}
_, err = schedulerMetrics.wsFactory.fairnessPreemptedTokensSummaryVec.GetMetricWith(workloadLabels)
if err != nil {
return fmt.Errorf("%w: failed to get fairness_preempted_tokens summary", err)
}
_, err = schedulerMetrics.wsFactory.fairnessDelayedTokensSummaryVec.GetMetricWith(workloadLabels)
if err != nil {
return fmt.Errorf("%w: failed to get fairness_delayed_tokens summary", err)
}
_, err = schedulerMetrics.wsFactory.fairnessOnTimeCounterVec.GetMetricWith(workloadLabels)
if err != nil {
return fmt.Errorf("%w: failed to get fairness_on_time_total counter", err)
}
return nil
}

Expand Down
Loading

0 comments on commit 0d5eb84

Please sign in to comment.