Skip to content

Commit

Permalink
feat: cluster consolidation
Browse files Browse the repository at this point in the history
Implements cluster consolidation via:
- removing nodes if their workloads can run on other nodes
- replacing nodes with cheaper instances

Fixes #1091
  • Loading branch information
tzneal committed Jul 25, 2022
1 parent 4d97adf commit 63e6d43
Show file tree
Hide file tree
Showing 58 changed files with 2,956 additions and 299 deletions.
11 changes: 9 additions & 2 deletions charts/karpenter/crds/karpenter.sh_provisioners.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ spec:
Node properties are determined from a combination of provisioner and
pod scheduling constraints.
properties:
consolidation:
description: Consolidation are the consolidation parameters
properties:
enabled:
description: Enabled enables consolidation if it has been set
type: boolean
type: object
kubeletConfiguration:
description: KubeletConfiguration are options passed to the kubelet
when provisioning nodes
Expand Down Expand Up @@ -192,8 +199,8 @@ spec:
will wait before attempting to delete a node, measured from when
the node is detected to be empty. A Node is considered to be empty
when it does not have pods scheduled to it, excluding daemonsets.
\n Termination due to underutilization is disabled if this field
is not set."
\n Termination due to no utilization is disabled if this field is
not set."
format: int64
type: integer
ttlSecondsUntilExpired:
Expand Down
7 changes: 5 additions & 2 deletions charts/karpenter/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ rules:
resources: ["awsnodetemplates"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims"]
resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers"]
verbs: ["get", "list", "watch"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses", "csinodes"]
verbs: ["get", "watch", "list"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
resources: ["daemonsets", "replicasets", "statefulsets"]
verbs: ["list", "watch"]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"]
verbs: ["get", "watch", "list"]
- apiGroups: [ "policy" ]
resources: [ "poddisruptionbudgets" ]
verbs: [ "get", "list", "watch" ]
# Write
- apiGroups: ["karpenter.sh"]
resources: ["provisioners/status"]
Expand Down
13 changes: 9 additions & 4 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/go-logr/zapr"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/aws/karpenter/pkg/cloudprovider/registry"
"github.com/aws/karpenter/pkg/config"
"github.com/aws/karpenter/pkg/controllers"
"github.com/aws/karpenter/pkg/controllers/consolidation"
"github.com/aws/karpenter/pkg/controllers/counter"
metricspod "github.com/aws/karpenter/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter/pkg/controllers/metrics/provisioner"
Expand Down Expand Up @@ -112,16 +114,19 @@ func main() {
}

recorder := events.NewDedupeRecorder(events.NewRecorder(manager.GetEventRecorderFor(appName)))
cluster := state.NewCluster(cfg, manager.GetClient(), cloudProvider)
realClock := &clock.RealClock{}
cluster := state.NewCluster(realClock, cfg, manager.GetClient(), cloudProvider)
provisioner := provisioning.NewProvisioner(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster)

consolidation.NewController(ctx, realClock, manager.GetClient(), provisioner, cloudProvider, recorder, cluster, manager.Elected())
metricsstate.StartMetricScraper(ctx, cluster)

if err := manager.RegisterControllers(ctx,
provisioning.NewController(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster),
provisioning.NewController(manager.GetClient(), provisioner, recorder),
state.NewNodeController(manager.GetClient(), cluster),
state.NewPodController(manager.GetClient(), cluster),
node.NewController(manager.GetClient(), cloudProvider, cluster),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider),
termination.NewController(ctx, realClock, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider),
node.NewController(realClock, manager.GetClient(), cloudProvider, cluster),
metricspod.NewController(manager.GetClient()),
metricsprovisioner.NewController(manager.GetClient()),
counter.NewController(manager.GetClient(), cluster),
Expand Down
2 changes: 1 addition & 1 deletion hack/docs/metrics_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
if funcPkg != "prometheus" {
continue
}
if len(ce.Args) != 2 {
if len(ce.Args) == 0 {
continue
}
arg := ce.Args[0].(*ast.CompositeLit)
Expand Down
11 changes: 6 additions & 5 deletions pkg/apis/provisioning/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ var (
// Karpenter specific domains and labels
KarpenterLabelDomain = "karpenter.sh"

ProvisionerNameLabelKey = Group + "/provisioner-name"
DoNotEvictPodAnnotationKey = Group + "/do-not-evict"
EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp"
TerminationFinalizer = Group + "/termination"
ProvisionerNameLabelKey = Group + "/provisioner-name"
DoNotEvictPodAnnotationKey = Group + "/do-not-evict"
DoNotConsolidateNodeAnnotationKey = KarpenterLabelDomain + "/do-not-consolidate"
EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp"
TerminationFinalizer = Group + "/termination"

LabelCapacityType = KarpenterLabelDomain + "/capacity-type"
LabelNodeInitialized = KarpenterLabelDomain + "/initialized"
Expand Down Expand Up @@ -64,7 +65,7 @@ var (
)

// RestrictedLabels are labels that should not be used
// because they may interfer the internal provisioning logic.
// because they may interfere with the internal provisioning logic.
RestrictedLabels = sets.NewString(
EmptinessTimestampAnnotationKey,
v1.LabelHostname,
Expand Down
10 changes: 9 additions & 1 deletion pkg/apis/provisioning/v1alpha5/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ProvisionerSpec struct {
// detected to be empty. A Node is considered to be empty when it does not
// have pods scheduled to it, excluding daemonsets.
//
// Termination due to underutilization is disabled if this field is not set.
// Termination due to no utilization is disabled if this field is not set.
// +optional
TTLSecondsAfterEmpty *int64 `json:"ttlSecondsAfterEmpty,omitempty"`
// TTLSecondsUntilExpired is the number of seconds the controller will wait
Expand All @@ -70,6 +70,14 @@ type ProvisionerSpec struct {
TTLSecondsUntilExpired *int64 `json:"ttlSecondsUntilExpired,omitempty"`
// Limits define a set of bounds for provisioning capacity.
Limits *Limits `json:"limits,omitempty"`
// Consolidation are the consolidation parameters
// +optional
Consolidation *Consolidation `json:"consolidation,omitempty"`
}

type Consolidation struct {
// Enabled enables consolidation if it has been set
Enabled *bool `json:"enabled,omitempty"`
}

// +kubebuilder:object:generate=false
Expand Down
10 changes: 8 additions & 2 deletions pkg/apis/provisioning/v1alpha5/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/aws"

"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -27,8 +29,8 @@ import (
)

var (
SupportedNodeSelectorOps sets.String = sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn), string(v1.NodeSelectorOpExists), string(v1.NodeSelectorOpDoesNotExist))
SupportedProvisionerOps sets.String = sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn), string(v1.NodeSelectorOpExists))
SupportedNodeSelectorOps = sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn), string(v1.NodeSelectorOpExists), string(v1.NodeSelectorOpDoesNotExist))
SupportedProvisionerOps = sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn), string(v1.NodeSelectorOpExists))
)

const (
Expand Down Expand Up @@ -63,6 +65,10 @@ func (s *ProvisionerSpec) validateTTLSecondsAfterEmpty() (errs *apis.FieldError)
if ptr.Int64Value(s.TTLSecondsAfterEmpty) < 0 {
return errs.Also(apis.ErrInvalidValue("cannot be negative", "ttlSecondsAfterEmpty"))
}
// TTLSecondsAfterEmpty and consolidation are mutually exclusive
if s.Consolidation != nil && aws.BoolValue(s.Consolidation.Enabled) && s.TTLSecondsAfterEmpty != nil {
return errs.Also(apis.ErrMultipleOneOf("ttlSecondsAfterEmpty", "consolidation.enabled"))
}
return errs
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/provisioning/v1alpha5/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v1alpha5

import (
"context"
"github.com/aws/aws-sdk-go/aws"
"strings"
"testing"

Expand Down Expand Up @@ -64,6 +65,25 @@ var _ = Describe("Validation", func() {
provisioner.Spec.TTLSecondsAfterEmpty = nil
Expect(provisioner.Validate(ctx)).To(Succeed())
})
It("should succeed on a valid empty ttl", func() {
provisioner.Spec.TTLSecondsAfterEmpty = aws.Int64(30)
Expect(provisioner.Validate(ctx)).To(Succeed())
})
It("should fail if both consolidation and TTLSecondsAfterEmpty are enabled", func() {
provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30)
provisioner.Spec.Consolidation = &Consolidation{Enabled: aws.Bool(true)}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should succeed if consolidation is off and TTLSecondsAfterEmpty is set", func() {
provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30)
provisioner.Spec.Consolidation = &Consolidation{Enabled: aws.Bool(false)}
Expect(provisioner.Validate(ctx)).To(Succeed())
})
It("should succeed if consolidation is on and TTLSecondsAfterEmpty is not set", func() {
provisioner.Spec.TTLSecondsAfterEmpty = nil
provisioner.Spec.Consolidation = &Consolidation{Enabled: aws.Bool(true)}
Expect(provisioner.Validate(ctx)).To(Succeed())
})

Context("Limits", func() {
It("should allow undefined limits", func() {
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/provisioning/v1alpha5/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ func init() {
v1alpha5.NormalizedLabels = functional.UnionStringMaps(v1alpha5.NormalizedLabels, map[string]string{"topology.ebs.csi.aws.com/zone": v1.LabelTopologyZone})
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)

type CloudProvider struct {
instanceTypeProvider *InstanceTypeProvider
subnetProvider *SubnetProvider
Expand Down
Loading

0 comments on commit 63e6d43

Please sign in to comment.