Skip to content

Commit

Permalink
Update to use RepairStatements
Browse files Browse the repository at this point in the history
  • Loading branch information
engedaam committed Nov 8, 2024
1 parent 897855b commit c8bed26
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 69 deletions.
4 changes: 2 additions & 2 deletions kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ func (c CloudProvider) GetSupportedNodeClasses() []status.Object {
return []status.Object{&v1alpha1.KWOKNodeClass{}}
}

func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{}
func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairStatement {
return []cloudprovider.RepairStatement{}
}

func (c CloudProvider) getInstanceType(instanceTypeName string) (*cloudprovider.InstanceType, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ func (c *CloudProvider) IsDrifted(context.Context, *v1.NodeClaim) (cloudprovider
return c.Drifted, nil
}

func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{
func (c *CloudProvider) RepairPolicy() []cloudprovider.RepairStatement {
return []cloudprovider.RepairStatement{
{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ var (

type DriftReason string

type RepairPolicy struct {
type RepairStatement struct {
// Type of unhealthy state that is found on the node
Type corev1.NodeConditionType
// Status condition of when a node is unhealthy
// Status condition when a node is unhealthy
Status corev1.ConditionStatus
// TolerationDuration is the duration the controller will wait
// before attempting to terminate nodes that are underutilized.
// before force terminating nodes that are unhealthy.
TolerationDuration time.Duration
}

Expand Down Expand Up @@ -76,7 +76,7 @@ type CloudProvider interface {
IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error)
// RepairPolicy is for CloudProviders to define a set Unhealthy condition for Karpenter
// to monitor on the node.
RepairPolicy() []RepairPolicy
RepairPolicy() []RepairStatement
// Name returns the CloudProvider implementation name.
Name() string
// GetSupportedNodeClasses returns CloudProvider NodeClass that implements status.Object
Expand Down
9 changes: 4 additions & 5 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/awslabs/operatorpkg/status"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -93,13 +92,13 @@ func NewControllers(
nodeclaimdisruption.NewController(clock, kubeClient, cloudProvider),
status.NewController[*v1.NodeClaim](kubeClient, mgr.GetEventRecorderFor("karpenter")),
status.NewController[*v1.NodePool](kubeClient, mgr.GetEventRecorderFor("karpenter")),
// status.NewGenericObjectController[*corev1.Node](kubeClient, mgr.GetEventRecorderFor("karpenter")),
// status.NewGenericObjectController[*corev1.Pod](kubeClient, mgr.GetEventRecorderFor("karpenter")),
}

// The cloud proivder must define status condation for the node repair controller to used for dectecting unhealthy nodes
if len(cloudProvider.RepairPolicy()) != 0 && !options.FromContext(ctx).FeatureGates.NodeRepair {
// The cloud provider must define status conation for the node repair controller to used for detecting unhealthy nodes
if len(cloudProvider.RepairPolicy()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair {
controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock))
} else {
log.FromContext(ctx).V(1).Info("node repair has been disabled")
}

return controllers
Expand Down
77 changes: 31 additions & 46 deletions pkg/controllers/node/health/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -51,69 +49,73 @@ type Controller struct {
// NewController constructs a controller instance
func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider, clock clock.Clock) *Controller {
return &Controller{
clock: clock,
kubeClient: kubeClient,
cloudProvider: cloudProvider,
clock: clock,
}
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.health").
For(&corev1.Node{}).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}

func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.health")
nodeHealthCondation := corev1.NodeCondition{}
cloudProivderPolicy := cloudprovider.RepairPolicy{}
nodeHealthCondition := corev1.NodeCondition{}
foundCloudProviderPolicy := cloudprovider.RepairStatement{}

if !node.GetDeletionTimestamp().IsZero() {
// Validate that the node is owned by us and is not being deleted
if !node.GetDeletionTimestamp().IsZero() || !controllerutil.ContainsFinalizer(node, v1.TerminationFinalizer) {
return reconcile.Result{}, nil
}

for _, policy := range c.cloudProvider.RepairPolicy() {
nodeHealthCondation = nodeutils.GetCondition(node, policy.Type)
if nodeHealthCondation.Status == policy.Status {
cloudProivderPolicy = policy
nodeHealthCondition = nodeutils.GetCondition(node, policy.Type)
if nodeHealthCondition.Status == policy.Status {
// found unhealthy condition on the node
foundCloudProviderPolicy = policy
break
}
}

// From here there are three scenarios to handle:
// 1. If node is healthy, exit node healhty loop
if cloudProivderPolicy.Type == "" {
// 1. If node is healthy, exit node repair loop
if foundCloudProviderPolicy.Type == "" {
return reconcile.Result{}, nil
}

// 2. If the Node is unhealthy, but has not reached it's full toleration disruption, exit the loop
dusruptionTime := nodeHealthCondation.LastTransitionTime.Add(cloudProivderPolicy.TolerationDuration)
if c.clock.Now().Before(dusruptionTime) {
// Use t.Sub(clock.Now()) instead of time.Until() to ensure we're using the injected clock.
return reconcile.Result{RequeueAfter: dusruptionTime.Sub(c.clock.Now())}, nil
disruptionTime := nodeHealthCondition.LastTransitionTime.Add(foundCloudProviderPolicy.TolerationDuration)
if c.clock.Now().Before(disruptionTime) {
return reconcile.Result{RequeueAfter: disruptionTime.Sub(c.clock.Now())}, nil
}

nodeClaims, err := nodeutils.GetNodeClaims(ctx, node, c.kubeClient)
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
}
if err := c.annotateTerminationGracePeriodByDefualt(ctx, nodeClaims[0]); err != nil {
return reconcile.Result{}, fmt.Errorf("annotated termination grace peirod on nodeclaim, %w", err)
if err := c.annotateTerminationGracePeriod(ctx, nodeClaim); err != nil {
return reconcile.Result{}, fmt.Errorf("annotated termination grace period on nodeclaim, %w", err)
}

// 3. Otherwise, if the Node is unhealthy we can forcefully remove the node (by deleting it)
if err := c.kubeClient.Delete(ctx, node); err != nil {
return reconcile.Result{}, err
// 3. Otherwise, if the Node is unhealthy and past it's tolerationDisruption window we can forcefully terminate the node
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
// 4. The deletion timestamp has successfully been set for the Node, update relevant metrics.
log.FromContext(ctx).V(1).Info("deleting unhealthy node")
metrics.NodeClaimsDisruptedTotal.With(prometheus.Labels{
metrics.ReasonLabel: metrics.UnhealthyReason,
metrics.NodePoolLabel: nodeClaims[0].Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaims[0].Labels[v1.CapacityTypeLabelKey],
metrics.NodePoolLabel: nodeClaim.Labels[v1.NodePoolLabelKey],
metrics.CapacityTypeLabel: nodeClaim.Labels[v1.CapacityTypeLabelKey],
}).Inc()
return reconcile.Result{}, nil
}

func (c *Controller) annotateTerminationGracePeriodByDefualt(ctx context.Context, nodeClaim *v1.NodeClaim) error {
if _, ok := nodeClaim.ObjectMeta.Annotations[v1.NodeClaimTerminationTimestampAnnotationKey]; ok {
return nil
}

func (c *Controller) annotateTerminationGracePeriod(ctx context.Context, nodeClaim *v1.NodeClaim) error {
stored := nodeClaim.DeepCopy()
terminationTime := c.clock.Now().Format(time.RFC3339)
nodeClaim.ObjectMeta.Annotations = lo.Assign(nodeClaim.ObjectMeta.Annotations, map[string]string{v1.NodeClaimTerminationTimestampAnnotationKey: terminationTime})
Expand All @@ -124,20 +126,3 @@ func (c *Controller) annotateTerminationGracePeriodByDefualt(ctx context.Context

return nil
}

func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("node.health").
For(&corev1.Node{}).
WithOptions(
controller.Options{
RateLimiter: workqueue.NewTypedMaxOfRateLimiter[reconcile.Request](
workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](100*time.Millisecond, 10*time.Second),
// 10 qps, 100 bucket size
&workqueue.TypedBucketRateLimiter[reconcile.Request]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
),
MaxConcurrentReconciles: 100,
},
).
Complete(reconcile.AsReconciler(m.GetClient(), c))
}
40 changes: 30 additions & 10 deletions pkg/controllers/node/health/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clock "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/cache"

"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/apis"
v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/node/health"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination"
"sigs.k8s.io/karpenter/pkg/controllers/node/termination/terminator"
"sigs.k8s.io/karpenter/pkg/controllers/nodeclaim/lifecycle"
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/test"
. "sigs.k8s.io/karpenter/pkg/test/expectations"
Expand All @@ -48,6 +52,7 @@ var fakeClock *clock.FakeClock
var cloudProvider *fake.CloudProvider
var recorder *test.EventRecorder
var queue *terminator.Queue
var nodeClaimController *lifecycle.Controller

func TestAPIs(t *testing.T) {
ctx = TestContextWithLogger(t)
Expand All @@ -57,14 +62,19 @@ func TestAPIs(t *testing.T) {

var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), test.VolumeAttachmentFieldIndexer(ctx)))

env = test.NewEnvironment(test.WithCRDs(apis.CRDs...), test.WithCRDs(v1alpha1.CRDs...), test.WithFieldIndexers(test.NodeClaimFieldIndexer(ctx), func(c cache.Cache) error {
return c.IndexField(ctx, &corev1.Node{}, "spec.providerID", func(obj client.Object) []string {
return []string{obj.(*corev1.Node).Spec.ProviderID}
})
}))
cloudProvider = fake.NewCloudProvider()
cloudProvider = fake.NewCloudProvider()
recorder = test.NewEventRecorder()
queue = terminator.NewTestingQueue(env.Client, recorder)
healthController = health.NewController(env.Client, cloudProvider, fakeClock)
terminationController = termination.NewController(fakeClock, env.Client, cloudProvider, terminator.NewTerminator(fakeClock, env.Client, queue, recorder), recorder)
nodeClaimController = lifecycle.NewController(fakeClock, env.Client, cloudProvider, recorder)

})

var _ = AfterSuite(func() {
Expand Down Expand Up @@ -95,7 +105,7 @@ var _ = Describe("Health", func() {
})

Context("Reconciliation", func() {
It("should delete nodes that are unhealthy by the cloud proivder", func() {
It("should delete nodes that are unhealthy by the cloud provider", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
Expand All @@ -106,11 +116,12 @@ var _ = Describe("Health", func() {
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should not reconcile when a node has delation timestamp set", func() {
It("should not reconcile when a node has deletion timestamp set", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
Expand All @@ -122,11 +133,12 @@ var _ = Describe("Health", func() {
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)
})
It("should not delete node when unhealthy type does not match cloudprovider passed in value", func() {
It("should not delete node when unhealthy type does not match cloud provider passed in value", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "FakeHealthyNode",
Status: corev1.ConditionFalse,
Expand All @@ -137,11 +149,12 @@ var _ = Describe("Health", func() {
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)
})
It("should not delete node when health status does not match cloudprovider passed in value", func() {
It("should not delete node when health status does not match cloud provider passed in value", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Status: corev1.ConditionTrue,
Expand All @@ -152,6 +165,7 @@ var _ = Describe("Health", func() {
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)
Expand All @@ -160,14 +174,15 @@ var _ = Describe("Health", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minites
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: fakeClock.Now()},
})
fakeClock.Step(20 * time.Minute)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectExists(ctx, env.Client, node)
Expand All @@ -176,14 +191,15 @@ var _ = Describe("Health", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minites
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: time.Now()},
})
fakeClock.Step(60 * time.Minute)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand All @@ -194,14 +210,15 @@ var _ = Describe("Health", func() {
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Status: corev1.ConditionFalse,
// We expect the last transition for HealthyNode condition to wait 30 minites
// We expect the last transition for HealthyNode condition to wait 30 minutes
LastTransitionTime: metav1.Time{Time: time.Now()},
})
fakeClock.Step(60 * time.Minute)
ExpectApplied(ctx, env.Client, nodePool, nodeClaim, node)
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
nodeClaim = ExpectExists(ctx, env.Client, nodeClaim)
Expand Down Expand Up @@ -229,11 +246,12 @@ var _ = Describe("Health", func() {
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
})
It("should not respect do-not-disrupt on node ", func() {
It("should not respect do-not-disrupt on node", func() {
node.Annotations = map[string]string{v1.DoNotDisruptAnnotationKey: "true"}
node.Status.Conditions = append(node.Status.Conditions, corev1.NodeCondition{
Type: "HealthyNode",
Expand All @@ -245,6 +263,7 @@ var _ = Describe("Health", func() {
// Determine to delete unhealthy nodes
ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
Expand All @@ -262,6 +281,7 @@ var _ = Describe("Health", func() {

ExpectObjectReconciled(ctx, env.Client, healthController, node)
// Let the termination controller terminate the node
ExpectObjectReconciled(ctx, env.Client, nodeClaimController, nodeClaim)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectObjectReconciled(ctx, env.Client, terminationController, node)
ExpectNotFound(ctx, env.Client, node)
Expand Down

0 comments on commit c8bed26

Please sign in to comment.