Skip to content

Commit

Permalink
feat: cluster consolidation
Browse files Browse the repository at this point in the history
Implements cluster consolidation via:
- removing nodes if their workloads can run on other nodes
- replacing nodes with cheaper instances

Fixes #1091
  • Loading branch information
tzneal committed Jul 14, 2022
1 parent 2c98771 commit e55c61c
Show file tree
Hide file tree
Showing 57 changed files with 2,953 additions and 325 deletions.
11 changes: 9 additions & 2 deletions charts/karpenter/crds/karpenter.sh_provisioners.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ spec:
Node properties are determined from a combination of provisioner and
pod scheduling constraints.
properties:
consolidation:
description: Consolidation are the consolidation parameters
properties:
enabled:
description: Enabled enables consolidation if it has been set
type: boolean
type: object
kubeletConfiguration:
description: KubeletConfiguration are options passed to the kubelet
when provisioning nodes
Expand Down Expand Up @@ -192,8 +199,8 @@ spec:
will wait before attempting to delete a node, measured from when
the node is detected to be empty. A Node is considered to be empty
when it does not have pods scheduled to it, excluding daemonsets.
\n Termination due to underutilization is disabled if this field
is not set."
\n Termination due to no utilization is disabled if this field is
not set."
format: int64
type: integer
ttlSecondsUntilExpired:
Expand Down
7 changes: 5 additions & 2 deletions charts/karpenter/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@ rules:
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses", "csinodes"]
verbs: ["get", "watch", "list"]
- apiGroups: ["apps"]
resources: ["daemonsets"]
- apiGroups: ["apps", ""]
resources: ["daemonsets","replicasets", "replicationcontrollers"]
verbs: ["list", "watch"]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["validatingwebhookconfigurations", "mutatingwebhookconfigurations"]
verbs: ["get", "watch", "list"]
- apiGroups: [ "policy" ]
resources: [ "poddisruptionbudgets" ]
verbs: [ "get", "list", "watch" ]
# Write
- apiGroups: ["karpenter.sh"]
resources: ["provisioners/status"]
Expand Down
4 changes: 3 additions & 1 deletion charts/karpenter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ controller:
# -- SecurityContext for the controller container.
securityContext: {}
# -- Additional environment variables for the controller pod.
env: []
env:
- name: ENABLE_PROFILING
value: "true"
# - name: AWS_REGION
# value: eu-west-1

Expand Down
13 changes: 9 additions & 4 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/go-logr/zapr"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/aws/karpenter/pkg/cloudprovider/registry"
"github.com/aws/karpenter/pkg/config"
"github.com/aws/karpenter/pkg/controllers"
"github.com/aws/karpenter/pkg/controllers/consolidation"
"github.com/aws/karpenter/pkg/controllers/counter"
metricspod "github.com/aws/karpenter/pkg/controllers/metrics/pod"
metricsprovisioner "github.com/aws/karpenter/pkg/controllers/metrics/provisioner"
Expand Down Expand Up @@ -112,16 +114,19 @@ func main() {
}

recorder := events.NewDedupeRecorder(events.NewRecorder(manager.GetEventRecorderFor(appName)))
cluster := state.NewCluster(cfg, manager.GetClient(), cloudProvider)
realClock := &clock.RealClock{}
cluster := state.NewCluster(realClock, cfg, manager.GetClient(), cloudProvider)
provisioner := provisioning.NewProvisioner(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster)

consolidation.NewController(ctx, realClock, manager.GetClient(), provisioner, cloudProvider, recorder, cluster, manager.Elected())
statemetrics.StartMetricScraper(ctx, cluster)

if err := manager.RegisterControllers(ctx,
provisioning.NewController(ctx, cfg, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider, cluster),
provisioning.NewController(manager.GetClient(), provisioner, recorder),
state.NewNodeController(manager.GetClient(), cluster),
state.NewPodController(manager.GetClient(), cluster),
node.NewController(manager.GetClient(), cloudProvider, cluster),
termination.NewController(ctx, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider),
termination.NewController(ctx, realClock, manager.GetClient(), clientSet.CoreV1(), recorder, cloudProvider),
node.NewController(realClock, manager.GetClient(), cloudProvider, cluster),
metricspod.NewController(manager.GetClient()),
metricsprovisioner.NewController(manager.GetClient()),
counter.NewController(manager.GetClient(), cluster),
Expand Down
2 changes: 1 addition & 1 deletion hack/docs/metrics_gen_docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func handleVariableDeclaration(v *ast.GenDecl) []metricInfo {
if funcPkg != "prometheus" {
continue
}
if len(ce.Args) != 2 {
if len(ce.Args) == 0 {
continue
}
arg := ce.Args[0].(*ast.CompositeLit)
Expand Down
11 changes: 6 additions & 5 deletions pkg/apis/provisioning/v1alpha5/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ var (
// Karpenter specific domains and labels
KarpenterLabelDomain = "karpenter.sh"

ProvisionerNameLabelKey = Group + "/provisioner-name"
DoNotEvictPodAnnotationKey = Group + "/do-not-evict"
EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp"
TerminationFinalizer = Group + "/termination"
ProvisionerNameLabelKey = Group + "/provisioner-name"
DoNotEvictPodAnnotationKey = Group + "/do-not-evict"
DoNotConsolidateNodeAnnotationKey = KarpenterLabelDomain + "/do-not-consolidate"
EmptinessTimestampAnnotationKey = Group + "/emptiness-timestamp"
TerminationFinalizer = Group + "/termination"

LabelCapacityType = KarpenterLabelDomain + "/capacity-type"
LabelNodeInitialized = KarpenterLabelDomain + "/initialized"
Expand Down Expand Up @@ -64,7 +65,7 @@ var (
)

// RestrictedLabels are labels that should not be used
// because they may interfer the internal provisioning logic.
// because they may interfere with the internal provisioning logic.
RestrictedLabels = sets.NewString(
EmptinessTimestampAnnotationKey,
v1.LabelHostname,
Expand Down
10 changes: 9 additions & 1 deletion pkg/apis/provisioning/v1alpha5/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ProvisionerSpec struct {
// detected to be empty. A Node is considered to be empty when it does not
// have pods scheduled to it, excluding daemonsets.
//
// Termination due to underutilization is disabled if this field is not set.
// Termination due to no utilization is disabled if this field is not set.
// +optional
TTLSecondsAfterEmpty *int64 `json:"ttlSecondsAfterEmpty,omitempty"`
// TTLSecondsUntilExpired is the number of seconds the controller will wait
Expand All @@ -70,6 +70,14 @@ type ProvisionerSpec struct {
TTLSecondsUntilExpired *int64 `json:"ttlSecondsUntilExpired,omitempty"`
// Limits define a set of bounds for provisioning capacity.
Limits *Limits `json:"limits,omitempty"`
// Consolidation are the consolidation parameters
// +optional
Consolidation *Consolidation `json:"consolidation,omitempty"`
}

type Consolidation struct {
// Enabled enables consolidation if it has been set
Enabled *bool `json:"enabled,omitempty"`
}

// +kubebuilder:object:generate=false
Expand Down
10 changes: 8 additions & 2 deletions pkg/apis/provisioning/v1alpha5/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"

"github.com/aws/aws-sdk-go/aws"

"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -27,8 +29,8 @@ import (
)

var (
SupportedNodeSelectorOps sets.String = sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn), string(v1.NodeSelectorOpExists), string(v1.NodeSelectorOpDoesNotExist))
SupportedProvisionerOps sets.String = sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn), string(v1.NodeSelectorOpExists))
SupportedNodeSelectorOps = sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn), string(v1.NodeSelectorOpExists), string(v1.NodeSelectorOpDoesNotExist))
SupportedProvisionerOps = sets.NewString(string(v1.NodeSelectorOpIn), string(v1.NodeSelectorOpNotIn), string(v1.NodeSelectorOpExists))
)

const (
Expand Down Expand Up @@ -62,6 +64,10 @@ func (s *ProvisionerSpec) validateTTLSecondsAfterEmpty() (errs *apis.FieldError)
if ptr.Int64Value(s.TTLSecondsAfterEmpty) < 0 {
return errs.Also(apis.ErrInvalidValue("cannot be negative", "ttlSecondsAfterEmpty"))
}
// TTLSecondsAfterEmpty and consolidation are mutually exclusive
if s.Consolidation != nil && aws.BoolValue(s.Consolidation.Enabled) && s.TTLSecondsAfterEmpty != nil {
return errs.Also(apis.ErrGeneric("must not set the field if consolidation is enabled", "ttlSecondsAfterEmpty"))
}
return errs
}

Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/provisioning/v1alpha5/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package v1alpha5

import (
"context"
"github.com/aws/aws-sdk-go/aws"
"strings"
"testing"

Expand Down Expand Up @@ -64,6 +65,25 @@ var _ = Describe("Validation", func() {
provisioner.Spec.TTLSecondsAfterEmpty = nil
Expect(provisioner.Validate(ctx)).To(Succeed())
})
It("should succeed on a valid empty ttl", func() {
provisioner.Spec.TTLSecondsAfterEmpty = aws.Int64(30)
Expect(provisioner.Validate(ctx)).To(Succeed())
})
It("should fail if both consolidation and TTLSecondsAfterEmpty are enabled", func() {
provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30)
provisioner.Spec.Consolidation = &Consolidation{Enabled: aws.Bool(true)}
Expect(provisioner.Validate(ctx)).ToNot(Succeed())
})
It("should succeed if consolidation is off and TTLSecondsAfterEmpty is set", func() {
provisioner.Spec.TTLSecondsAfterEmpty = ptr.Int64(30)
provisioner.Spec.Consolidation = &Consolidation{Enabled: aws.Bool(false)}
Expect(provisioner.Validate(ctx)).To(Succeed())
})
It("should succeed if consolidation is on and TTLSecondsAfterEmpty is not set", func() {
provisioner.Spec.TTLSecondsAfterEmpty = nil
provisioner.Spec.Consolidation = &Consolidation{Enabled: aws.Bool(true)}
Expect(provisioner.Validate(ctx)).To(Succeed())
})

Context("Limits", func() {
It("should allow undefined limits", func() {
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/provisioning/v1alpha5/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 35 additions & 9 deletions pkg/cloudprovider/aws/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,23 @@ import (
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ssm"
"github.com/patrickmn/go-cache"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/transport"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"
k8sClient "sigs.k8s.io/controller-runtime/pkg/client"

awsv1alpha1 "github.com/aws/karpenter/pkg/apis/awsnodetemplate/v1alpha1"
"github.com/aws/karpenter/pkg/apis/provisioning/v1alpha5"
"github.com/aws/karpenter/pkg/cloudprovider"
"github.com/aws/karpenter/pkg/cloudprovider/aws/amifamily"
"github.com/aws/karpenter/pkg/cloudprovider/aws/apis/v1alpha1"
"github.com/aws/karpenter/pkg/scheduling"
"github.com/aws/karpenter/pkg/utils/functional"
"github.com/aws/karpenter/pkg/utils/injection"
"github.com/aws/karpenter/pkg/utils/project"

k8sClient "sigs.k8s.io/controller-runtime/pkg/client"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/transport"
"knative.dev/pkg/logging"
"knative.dev/pkg/ptr"
)

const (
Expand All @@ -67,6 +66,8 @@ func init() {
v1alpha5.NormalizedLabels = functional.UnionStringMaps(v1alpha5.NormalizedLabels, map[string]string{"topology.ebs.csi.aws.com/zone": v1.LabelTopologyZone})
}

var _ cloudprovider.CloudProvider = (*CloudProvider)(nil)

type CloudProvider struct {
instanceTypeProvider *InstanceTypeProvider
subnetProvider *SubnetProvider
Expand Down Expand Up @@ -147,6 +148,31 @@ func (c *CloudProvider) Delete(ctx context.Context, node *v1.Node) error {
return c.instanceProvider.Terminate(ctx, node)
}

func (c *CloudProvider) ShouldConsolidate(existingNode *v1.Node, newNodeRequirements scheduling.Requirements, instanceTypeOptions []cloudprovider.InstanceType) bool {
// no replacement node is being launched, so we always prefer to delete a node
if newNodeRequirements == nil {
return true
}

// If the entire list of replacement instance types are de-prioritized, then don't perform the node replacement.
// This prevents us from doing something like a c6g.large -> c6g.medium -> a1.medium.
allAreDeprioritized := true
for _, it := range instanceTypeOptions {
it, ok := it.(*InstanceType)
if !ok {
continue
}
allAreDeprioritized = allAreDeprioritized && isDeprioritized(it)
}
if allAreDeprioritized {
return false
}

// We currently don't replace spot nodes as we don't know if the replacement node is less available than the node we already
// have.
return existingNode.Labels[v1alpha5.LabelCapacityType] != v1alpha1.CapacityTypeSpot
}

// Name returns the CloudProvider implementation name.
func (c *CloudProvider) Name() string {
return "aws"
Expand Down
44 changes: 28 additions & 16 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,31 +351,43 @@ func (p *InstanceProvider) getCapacityType(nodeRequest *cloudprovider.NodeReques
return v1alpha1.CapacityTypeOnDemand
}

func isDeprioritized(it *InstanceType) bool {
// allow regular instance families and prioritize all others last
if !functional.HasAnyPrefix(*it.InstanceType, "m", "c", "r", "a", "t", "i") {
return true
}

// deprioritize some older instance types including 1st/2nd gen burstable, compute and graviton
if functional.HasAnyPrefix(*it.InstanceType, "t1", "t2", "a1", "c1") {
return true
}

// deprioritize metal
if aws.BoolValue(it.BareMetal) {
return true
}
itRes := it.Resources()

// deprioritize GPU instance types
if !resources.IsZero(itRes[v1alpha1.ResourceAWSNeuron]) ||
!resources.IsZero(itRes[v1alpha1.ResourceAMDGPU]) ||
!resources.IsZero(itRes[v1alpha1.ResourceNVIDIAGPU]) {
return true
}
return false
}

// filterInstanceTypes is used to eliminate less desirable instance types (like GPUs) from the list of possible instance types when
// a set of more appropriate instance types would work. If a set of more desirable instance types is not found, then the original slice
// of instance types are returned.
func (p *InstanceProvider) filterInstanceTypes(instanceTypes []cloudprovider.InstanceType) []cloudprovider.InstanceType {
var genericInstanceTypes []cloudprovider.InstanceType
for _, it := range instanceTypes {
it := it.(*InstanceType)
// allow regular instance families and prioritize all others last
if !functional.HasAnyPrefix(*it.InstanceType, "m", "c", "r", "a", "t", "i") {
continue
}
// deprioritize some older instance types including 1st/2nd gen burstable, compute and graviton
if functional.HasAnyPrefix(*it.InstanceType, "t1", "t2", "a1", "c1") {
continue
}
// deprioritize metal
if aws.BoolValue(it.BareMetal) {
continue
}
itRes := it.Resources()
if !resources.IsZero(itRes[v1alpha1.ResourceAWSNeuron]) ||
!resources.IsZero(itRes[v1alpha1.ResourceAMDGPU]) ||
!resources.IsZero(itRes[v1alpha1.ResourceNVIDIAGPU]) {
if isDeprioritized(it) {
continue
}

genericInstanceTypes = append(genericInstanceTypes, it)
}
// if we got some subset of instance types, then prefer to use those
Expand Down
Loading

0 comments on commit e55c61c

Please sign in to comment.