Skip to content

Commit

Permalink
Abstract prometheus metrics into interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-innis committed Nov 8, 2024
1 parent fea57a1 commit be4981e
Show file tree
Hide file tree
Showing 44 changed files with 457 additions and 326 deletions.
2 changes: 1 addition & 1 deletion kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.5
controller-gen.kubebuilder.io/version: v0.16.3
name: kwoknodeclasses.karpenter.kwok.sh
spec:
group: karpenter.kwok.sh
Expand Down
2 changes: 1 addition & 1 deletion kwok/charts/crds/karpenter.sh_nodeclaims.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.5
controller-gen.kubebuilder.io/version: v0.16.3
name: nodeclaims.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.5
controller-gen.kubebuilder.io/version: v0.16.3
name: nodepools.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_nodeclaims.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.5
controller-gen.kubebuilder.io/version: v0.16.3
name: nodeclaims.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.5
controller-gen.kubebuilder.io/version: v0.16.3
name: nodepools.karpenter.sh
spec:
group: karpenter.sh
Expand Down
38 changes: 18 additions & 20 deletions pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ const (
// decorator implements CloudProvider
var _ cloudprovider.CloudProvider = (*decorator)(nil)

var methodDuration = prometheus.NewHistogramVec(
var MethodDuration = metrics.NewPrometheusHistogram(
crmetrics.Registry,
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: "cloudprovider",
Expand All @@ -59,7 +60,8 @@ var methodDuration = prometheus.NewHistogramVec(
)

var (
errorsTotal = prometheus.NewCounterVec(
ErrorsTotal = metrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: "cloudprovider",
Expand All @@ -75,10 +77,6 @@ var (
)
)

func init() {
crmetrics.Registry.MustRegister(methodDuration, errorsTotal)
}

type decorator struct {
cloudprovider.CloudProvider
}
Expand All @@ -96,68 +94,68 @@ func Decorate(cloudProvider cloudprovider.CloudProvider) cloudprovider.CloudProv

func (d *decorator) Create(ctx context.Context, nodeClaim *v1.NodeClaim) (*v1.NodeClaim, error) {
method := "Create"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
nodeClaim, err := d.CloudProvider.Create(ctx, nodeClaim)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return nodeClaim, err
}

func (d *decorator) Delete(ctx context.Context, nodeClaim *v1.NodeClaim) error {
method := "Delete"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
err := d.CloudProvider.Delete(ctx, nodeClaim)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return err
}

func (d *decorator) Get(ctx context.Context, id string) (*v1.NodeClaim, error) {
method := "Get"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
nodeClaim, err := d.CloudProvider.Get(ctx, id)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return nodeClaim, err
}

func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) {
method := "List"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
nodeClaims, err := d.CloudProvider.List(ctx)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return nodeClaims, err
}

func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
method := "GetInstanceTypes"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return instanceType, err
}

func (d *decorator) IsDrifted(ctx context.Context, nodeClaim *v1.NodeClaim) (cloudprovider.DriftReason, error) {
method := "IsDrifted"
defer metrics.Measure(methodDuration.With(getLabelsMapForDuration(ctx, d, method)))()
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
isDrifted, err := d.CloudProvider.IsDrifted(ctx, nodeClaim)
if err != nil {
errorsTotal.With(getLabelsMapForError(ctx, d, method, err)).Inc()
ErrorsTotal.Inc(getLabelsMapForError(ctx, d, method, err))
}
return isDrifted, err
}

// getLabelsMapForDuration is a convenience func that constructs a map[string]string
// for a prometheus Label map used to compose a duration metric spec
func getLabelsMapForDuration(ctx context.Context, d *decorator, method string) map[string]string {
return prometheus.Labels{
return map[string]string{
metricLabelController: injection.GetControllerName(ctx),
metricLabelMethod: method,
metricLabelProvider: d.Name(),
Expand All @@ -167,7 +165,7 @@ func getLabelsMapForDuration(ctx context.Context, d *decorator, method string) m
// getLabelsMapForError is a convenience func that constructs a map[string]string
// for a prometheus Label map used to compose a counter metric spec
func getLabelsMapForError(ctx context.Context, d *decorator, method string, err error) map[string]string {
return prometheus.Labels{
return map[string]string{
metricLabelController: injection.GetControllerName(ctx),
metricLabelMethod: method,
metricLabelProvider: d.Name(),
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,17 +160,17 @@ func (c *Controller) Reconcile(ctx context.Context) (reconcile.Result, error) {
}

func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, error) {
defer metrics.Measure(EvaluationDurationSeconds.With(map[string]string{
defer metrics.Measure(EvaluationDurationSeconds, map[string]string{
metrics.ReasonLabel: strings.ToLower(string(disruption.Reason())),
consolidationTypeLabel: disruption.ConsolidationType(),
}))()
})()
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, disruption.ShouldDisrupt, disruption.Class(), c.queue)
if err != nil {
return false, fmt.Errorf("determining candidates, %w", err)
}
EligibleNodes.With(map[string]string{
EligibleNodes.Set(float64(len(candidates)), map[string]string{
metrics.ReasonLabel: strings.ToLower(string(disruption.Reason())),
}).Set(float64(len(candidates)))
})

// If there are no candidates, move to the next disruption
if len(candidates) == 0 {
Expand Down Expand Up @@ -244,11 +244,11 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
}

// An action is only performed and pods/nodes are only disrupted after a successful add to the queue
DecisionsPerformedTotal.With(map[string]string{
DecisionsPerformedTotal.Inc(map[string]string{
decisionLabel: string(cmd.Decision()),
metrics.ReasonLabel: strings.ToLower(string(m.Reason())),
consolidationTypeLabel: m.ConsolidationType(),
}).Inc()
})
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func BuildDisruptionBudgetMapping(ctx context.Context, cluster *state.Cluster, c
allowedDisruptions := nodePool.MustGetAllowedDisruptions(clk, numNodes[nodePool.Name], reason)
disruptionBudgetMapping[nodePool.Name] = lo.Max([]int{allowedDisruptions - disrupting[nodePool.Name], 0})

NodePoolAllowedDisruptions.With(map[string]string{
NodePoolAllowedDisruptions.Set(float64(allowedDisruptions), map[string]string{
metrics.NodePoolLabel: nodePool.Name, metrics.ReasonLabel: string(reason),
}).Set(float64(allowedDisruptions))
})
if allowedDisruptions == 0 {
recorder.Publish(disruptionevents.NodePoolBlockedForDisruptionReason(lo.ToPtr(nodePool), reason))
}
Expand Down
25 changes: 10 additions & 15 deletions pkg/controllers/disruption/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,15 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
)

func init() {
crmetrics.Registry.MustRegister(
EvaluationDurationSeconds,
DecisionsPerformedTotal,
EligibleNodes,
ConsolidationTimeoutsTotal,
NodePoolAllowedDisruptions,
)
}

const (
voluntaryDisruptionSubsystem = "voluntary_disruption"
decisionLabel = "decision"
consolidationTypeLabel = "consolidation_type"
)

var (
EvaluationDurationSeconds = prometheus.NewHistogramVec(
EvaluationDurationSeconds = metrics.NewPrometheusHistogram(
crmetrics.Registry,
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand All @@ -50,7 +41,8 @@ var (
},
[]string{metrics.ReasonLabel, consolidationTypeLabel},
)
DecisionsPerformedTotal = prometheus.NewCounterVec(
DecisionsPerformedTotal = metrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand All @@ -59,7 +51,8 @@ var (
},
[]string{decisionLabel, metrics.ReasonLabel, consolidationTypeLabel},
)
EligibleNodes = prometheus.NewGaugeVec(
EligibleNodes = metrics.NewPrometheusGauge(
crmetrics.Registry,
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand All @@ -68,7 +61,8 @@ var (
},
[]string{metrics.ReasonLabel},
)
ConsolidationTimeoutsTotal = prometheus.NewCounterVec(
ConsolidationTimeoutsTotal = metrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand All @@ -77,7 +71,8 @@ var (
},
[]string{consolidationTypeLabel},
)
NodePoolAllowedDisruptions = prometheus.NewGaugeVec(
NodePoolAllowedDisruptions = metrics.NewPrometheusGauge(
crmetrics.Registry,
prometheus.GaugeOpts{
Namespace: metrics.Namespace,
Subsystem: metrics.NodePoolSubsystem,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (m *MultiNodeConsolidation) firstNConsolidationOption(ctx context.Context,
// binary search to find the maximum number of NodeClaims we can terminate
for min <= max {
if m.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.WithLabelValues(m.ConsolidationType()).Inc()
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: m.ConsolidationType()})
if lastSavedCommand.candidates == nil {
log.FromContext(ctx).V(1).Info(fmt.Sprintf("failed to find a multi-node consolidation after timeout, last considered batch had %d", (min+max)/2))
} else {
Expand Down
7 changes: 2 additions & 5 deletions pkg/controllers/disruption/orchestration/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
)

func init() {
crmetrics.Registry.MustRegister(disruptionQueueFailuresTotal)
}

const (
voluntaryDisruptionSubsystem = "voluntary_disruption"
consolidationTypeLabel = "consolidation_type"
decisionLabel = "decision"
)

var (
disruptionQueueFailuresTotal = prometheus.NewCounterVec(
DisruptionQueueFailuresTotal = metrics.NewPrometheusCounter(
crmetrics.Registry,
prometheus.CounterOpts{
Namespace: metrics.Namespace,
Subsystem: voluntaryDisruptionSubsystem,
Expand Down
9 changes: 4 additions & 5 deletions pkg/controllers/disruption/orchestration/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"github.com/awslabs/operatorpkg/singleton"
"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"go.uber.org/multierr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -195,11 +194,11 @@ func (q *Queue) Reconcile(ctx context.Context) (reconcile.Result, error) {
failedLaunches := lo.Filter(cmd.Replacements, func(r Replacement, _ int) bool {
return !r.Initialized
})
disruptionQueueFailuresTotal.With(map[string]string{
DisruptionQueueFailuresTotal.Add(float64(len(failedLaunches)), map[string]string{
decisionLabel: cmd.Decision(),
metrics.ReasonLabel: string(cmd.reason),
consolidationTypeLabel: cmd.consolidationType,
}).Add(float64(len(failedLaunches)))
})
multiErr := multierr.Combine(err, cmd.lastError, state.RequireNoScheduleTaint(ctx, q.kubeClient, false, cmd.candidates...))
// Log the error
log.FromContext(ctx).WithValues("nodes", strings.Join(lo.Map(cmd.candidates, func(s *state.StateNode, _ int) string {
Expand Down Expand Up @@ -265,11 +264,11 @@ func (q *Queue) waitOrTerminate(ctx context.Context, cmd *Command) error {
if err := q.kubeClient.Delete(ctx, candidate.NodeClaim); err != nil {
multiErr = multierr.Append(multiErr, client.IgnoreNotFound(err))
} else {
metrics.NodeClaimsDisruptedTotal.With(prometheus.Labels{
metrics.NodeClaimsDisruptedTotal.Inc(map[string]string{
metrics.ReasonLabel: string(cmd.reason),
metrics.NodePoolLabel: cmd.candidates[i].NodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: cmd.candidates[i].NodeClaim.Labels[v1.CapacityTypeLabelKey],
}).Inc()
})
}
}
// If there were any deletion failures, we should requeue.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
continue
}
if s.clock.Now().After(timeout) {
ConsolidationTimeoutsTotal.WithLabelValues(s.ConsolidationType()).Inc()
ConsolidationTimeoutsTotal.Inc(map[string]string{consolidationTypeLabel: s.ConsolidationType()})
log.FromContext(ctx).V(1).Info(fmt.Sprintf("abandoning single-node consolidation due to timeout after evaluating %d candidates", i))
return Command{}, scheduling.Results{}, nil
}
Expand Down
Loading

0 comments on commit be4981e

Please sign in to comment.