Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drain and volume detachment status conditions #1876

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/apis/v1/nodeclaim_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
ConditionTypeInitialized = "Initialized"
ConditionTypeConsolidatable = "Consolidatable"
ConditionTypeDrifted = "Drifted"
ConditionTypeDrained = "Drained"
ConditionTypeVolumesDetached = "VolumesDetached"
ConditionTypeInstanceTerminating = "InstanceTerminating"
ConditionTypeConsistentStateFound = "ConsistentStateFound"
ConditionTypeDisruptionReason = "DisruptionReason"
Expand Down
159 changes: 105 additions & 54 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand Down Expand Up @@ -76,6 +77,7 @@ func NewController(clk clock.Clock, kubeClient client.Client, cloudProvider clou

func (c *Controller) Reconcile(ctx context.Context, n *corev1.Node) (reconcile.Result, error) {
ctx = injection.WithControllerName(ctx, "node.termination")
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("Node", klog.KRef(n.Namespace, n.Name)))

if !n.GetDeletionTimestamp().IsZero() {
return c.finalize(ctx, n)
Expand All @@ -92,99 +94,148 @@ func (c *Controller) finalize(ctx context.Context, node *corev1.Node) (reconcile
return reconcile.Result{}, nil
}

nodeClaims, err := nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
nodeClaim, err := nodeutils.NodeClaimForNode(ctx, c.kubeClient, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("listing nodeclaims, %w", err)
if nodeutils.IsDuplicateNodeClaimError(err) || nodeutils.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("failed to terminate node, %w", err))
}
return reconcile.Result{}, err
}
ctx = log.IntoContext(ctx, log.FromContext(ctx).WithValues("NodeClaim", klog.KRef(nodeClaim.Namespace, nodeClaim.Name)))
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("deleting nodeclaim, %w", err)
}
}

if err = c.deleteAllNodeClaims(ctx, nodeClaims...); err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully drain nodes that are
// no longer alive. We do a check on the Ready condition of the node since, even though the CloudProvider says the
// instance is not around, we know that the kubelet process is still running if the Node Ready condition is true.
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
}
}

nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaims...)
nodeTerminationTime, err := c.nodeTerminationTime(node, nodeClaim)
if err != nil {
return reconcile.Result{}, err
}

if err = c.terminator.Taint(ctx, node, v1.DisruptedNoScheduleTaint); err != nil {
if errors.IsConflict(err) {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, client.IgnoreNotFound(fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err))
return reconcile.Result{}, fmt.Errorf("tainting node with %s, %w", pretty.Taint(v1.DisruptedNoScheduleTaint), err)
}
if err = c.terminator.Drain(ctx, node, nodeTerminationTime); err != nil {
if !terminator.IsNodeDrainError(err) {
return reconcile.Result{}, fmt.Errorf("draining node, %w", err)
}
c.recorder.Publish(terminatorevents.NodeFailedToDrain(node, err))
// If the underlying NodeClaim no longer exists, we want to delete to avoid trying to gracefully draining
// on nodes that are no longer alive. We do a check on the Ready condition of the node since, even
// though the CloudProvider says the instance is not around, we know that the kubelet process is still running
// if the Node Ready condition is true
// Similar logic to: https://github.com/kubernetes/kubernetes/blob/3a75a8c8d9e6a1ebd98d8572132e675d4980f184/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go#L144
if nodeutils.GetCondition(node, corev1.NodeReady).Status != corev1.ConditionTrue {
if _, err = c.cloudProvider.Get(ctx, node.Spec.ProviderID); err != nil {
if cloudprovider.IsNodeClaimNotFoundError(err) {
return reconcile.Result{}, c.removeFinalizer(ctx, node)
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeDrained, "Draining", "Draining"); modified {
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("getting nodeclaim, %w", err)
return reconcile.Result{}, err
}
}

return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
NodesDrainedTotal.Inc(map[string]string{
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
if !nodeClaim.StatusConditions().Get(v1.ConditionTypeDrained).IsTrue() {
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
stored := nodeClaim.DeepCopy()
_ = nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeDrained)
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
NodesDrainedTotal.Inc(map[string]string{
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
metrics.NodePoolLabel: node.Labels[v1.NodePoolLabelKey],
})
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}

// In order for Pods associated with PersistentVolumes to smoothly migrate from the terminating Node, we wait
// for VolumeAttachments of drain-able Pods to be cleaned up before terminating Node and removing its finalizer.
// However, if TerminationGracePeriod is configured for Node, and we are past that period, we will skip waiting.
if nodeTerminationTime == nil || c.clock.Now().Before(*nodeTerminationTime) {
areVolumesDetached, err := c.ensureVolumesDetached(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !areVolumesDetached {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
}
nodeClaims, err = nodeutils.GetNodeClaims(ctx, c.kubeClient, node)
volumesDetached, err := c.ensureVolumesDetached(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
for _, nodeClaim := range nodeClaims {
isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if err != nil {
// 404 = the nodeClaim no longer exists
if errors.IsNotFound(err) {
continue
if volumesDetached {
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetTrue(v1.ConditionTypeVolumesDetached); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
// 409 - The nodeClaim exists, but its status has already been modified
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
} else if !c.hasTerminationGracePeriodElapsed(nodeTerminationTime) {
c.recorder.Publish(terminatorevents.NodeAwaitingVolumeDetachmentEvent(node))
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "AwaitingVolumeDetachment", "AwaitingVolumeDetachment"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
} else {
stored := nodeClaim.DeepCopy()
if modified := nodeClaim.StatusConditions().SetFalse(v1.ConditionTypeVolumesDetached, "TerminationGracePeriodElapsed", "TerminationGracePeriodElapsed"); modified {
if err := c.kubeClient.Status().Patch(ctx, nodeClaim, client.MergeFromWithOptions(stored, client.MergeFromWithOptimisticLock{})); err != nil {
if errors.IsConflict(err) || errors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
// We requeue after a patch operation since we want to ensure we read our own writes before any subsequent
// operations on the NodeClaim.
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}

isInstanceTerminated, err := termination.EnsureTerminated(ctx, c.kubeClient, nodeClaim, c.cloudProvider)
if client.IgnoreNotFound(err) != nil {
// 409 - The nodeClaim exists, but its status has already been modified
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("ensuring instance termination, %w", err)
}
if !isInstanceTerminated {
return reconcile.Result{RequeueAfter: 5 * time.Second}, nil
}
if err := c.removeFinalizer(ctx, node); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}

func (c *Controller) deleteAllNodeClaims(ctx context.Context, nodeClaims ...*v1.NodeClaim) error {
for _, nodeClaim := range nodeClaims {
// If we still get the NodeClaim, but it's already marked as terminating, we don't need to call Delete again
if nodeClaim.DeletionTimestamp.IsZero() {
if err := c.kubeClient.Delete(ctx, nodeClaim); err != nil {
return client.IgnoreNotFound(err)
}
}
func (c *Controller) hasTerminationGracePeriodElapsed(nodeTerminationTime *time.Time) bool {
if nodeTerminationTime == nil {
return false
}
return nil
return !c.clock.Now().Before(*nodeTerminationTime)
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *corev1.Node) (volumesDetached bool, err error) {
Expand Down
Loading
Loading