Skip to content

Commit

Permalink
fix: Ensure volumes are unpublished before deleting node
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSirenko committed Jul 5, 2024
1 parent e356a51 commit 497eb96
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 1 deletion.
3 changes: 3 additions & 0 deletions kwok/charts/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ rules:
- apiGroups: ["karpenter.sh"]
resources: ["nodepools", "nodepools/status", "nodeclaims", "nodeclaims/status"]
verbs: ["get", "list", "watch"]
- apiGroups: [ "storage.k8s.io" ]
resources: [ "volumeattachments" ]
verbs: [ "get", "list", "watch" ]
- apiGroups: [""]
resources: ["pods", "nodes", "persistentvolumes", "persistentvolumeclaims", "replicationcontrollers", "namespaces"]
verbs: ["get", "list", "watch"]
Expand Down
23 changes: 23 additions & 0 deletions pkg/controllers/node/termination/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
nodeutils "sigs.k8s.io/karpenter/pkg/utils/node"
volumeattachmentutils "sigs.k8s.io/karpenter/pkg/utils/volumeattachment"
)

// Controller for the resource
Expand Down Expand Up @@ -107,6 +108,15 @@ func (c *Controller) finalize(ctx context.Context, node *v1.Node) (reconcile.Res
}
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
// In order for stateful pods to smoothly migrate from the terminating Node, we wait for VolumeAttachments
// of drain-able pods to be cleaned up before terminating the node and removing it from the cluster.
areVolumesDetached, err := c.ensureVolumesDetached(ctx, node)
if err != nil {
return reconcile.Result{}, fmt.Errorf("ensuring no volume attachments, %w", err)
}
if !areVolumesDetached {
return reconcile.Result{RequeueAfter: 1 * time.Second}, nil
}
nodeClaims, err := nodeutils.GetNodeClaims(ctx, node, c.kubeClient)
if err != nil {
return reconcile.Result{}, fmt.Errorf("deleting nodeclaims, %w", err)
Expand Down Expand Up @@ -150,6 +160,19 @@ func (c *Controller) deleteAllNodeClaims(ctx context.Context, node *v1.Node) err
return nil
}

func (c *Controller) ensureVolumesDetached(ctx context.Context, node *v1.Node) (volumesDetached bool, err error) {
volumeAttachments, err := nodeutils.GetVolumeAttachments(ctx, c.kubeClient, node)
if err != nil {
return false, err
}
// Filter out volume attachments associated with non-drain-able nodes or multi-attachable volumes
filteredVolumeAttachments, err := volumeattachmentutils.FilterVolumeAttachments(ctx, c.kubeClient, node, volumeAttachments)
if err != nil {
return false, err
}
return len(filteredVolumeAttachments) == 0, nil
}

func (c *Controller) removeFinalizer(ctx context.Context, n *v1.Node) error {
stored := n.DeepCopy()
controllerutil.RemoveFinalizer(n, v1beta1.TerminationFinalizer)
Expand Down
5 changes: 5 additions & 0 deletions pkg/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"
"time"

storagev1 "k8s.io/api/storage/v1"

"github.com/awslabs/operatorpkg/controller"
opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -204,6 +206,9 @@ func NewOperator() (context.Context, *Operator) {
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &v1beta1.NodeClaim{}, "spec.nodeClassRef.name", func(o client.Object) []string {
return []string{o.(*v1beta1.NodeClaim).Spec.NodeClassRef.Name}
}), "failed to setup nodeclaim nodeclassref name indexer")
lo.Must0(mgr.GetFieldIndexer().IndexField(ctx, &storagev1.VolumeAttachment{}, "spec.nodeName", func(o client.Object) []string {
return []string{o.(*storagev1.VolumeAttachment).Spec.NodeName}
}), "failed to setup volumeattachment indexer")

lo.Must0(mgr.AddReadyzCheck("manager", func(req *http.Request) error {
return lo.Ternary(mgr.GetCache().WaitForCacheSync(req.Context()), nil, fmt.Errorf("failed to sync caches"))
Expand Down
19 changes: 18 additions & 1 deletion pkg/utils/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
"context"
"fmt"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
storagev1 "k8s.io/api/storage/v1"

"github.com/samber/lo"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/utils/pod"
)

Expand Down Expand Up @@ -75,6 +76,22 @@ func GetProvisionablePods(ctx context.Context, kubeClient client.Client) ([]*v1.
}), nil
}

// GetVolumeAttachments grabs all volumeAttachments of passed node
func GetVolumeAttachments(ctx context.Context, kubeClient client.Client, node *v1.Node) ([]*storagev1.VolumeAttachment, error) {
var volumeAttachments []*storagev1.VolumeAttachment
var volumeAttachmentList storagev1.VolumeAttachmentList
// TODO why this field index does not work with kwok...
//if err := kubeClient.List(ctx, &volumeAttachmentList, client.MatchingFields{"spec.nodeName": node.Name}); err != nil {
// return nil, fmt.Errorf("listing volumeattachments, %w", err)
//}
for i := range volumeAttachmentList.Items {
if volumeAttachmentList.Items[i].Spec.NodeName == node.Name {
volumeAttachments = append(volumeAttachments, &volumeAttachmentList.Items[i])
}
}
return volumeAttachments, nil
}

func GetCondition(n *v1.Node, match v1.NodeConditionType) v1.NodeCondition {
for _, condition := range n.Status.Conditions {
if condition.Type == match {
Expand Down
78 changes: 78 additions & 0 deletions pkg/utils/volumeattachment/volumeattachment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package volumeattachment

import (
"context"
"github.com/samber/lo"
"sigs.k8s.io/karpenter/pkg/utils/pod"

v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

nodeutil "sigs.k8s.io/karpenter/pkg/utils/node"
volumeutil "sigs.k8s.io/karpenter/pkg/utils/volume"
)

// FilterVolumeAttachments filters out volumeAttachments that should not block the termination of the passed node
func FilterVolumeAttachments(ctx context.Context, kubeClient client.Client, node *v1.Node, volumeAttachments []*storagev1.VolumeAttachment) ([]*storagev1.VolumeAttachment, error) {
var filteredVolumeAttachments []*storagev1.VolumeAttachment
// No need to filter empty volumeAttachments list
if len(volumeAttachments) == 0 {
return volumeAttachments, nil
}
// Filter out non-drain-able pods
pods, err := nodeutil.GetPods(ctx, kubeClient, node)
if err != nil {
return nil, err
}
drainablePods := lo.Reject(pods, func(p *v1.Pod, _ int) bool {
return pod.ToleratesDisruptionNoScheduleTaint(p)
})
// Filter out Multi-Attach volumes
shouldFilterOutVolume := make(map[string]bool)
for _, p := range drainablePods {
for _, v := range p.Spec.Volumes {
pvc, err := volumeutil.GetPersistentVolumeClaim(ctx, kubeClient, p, v)
if err != nil {
return nil, err
}
if pvc != nil {
shouldFilterOutVolume[pvc.Spec.VolumeName] = CannotMultiAttach(*pvc)
}
}
}
for i := range volumeAttachments {
pvName := volumeAttachments[i].Spec.Source.PersistentVolumeName
if pvName != nil && shouldFilterOutVolume[*pvName] {
filteredVolumeAttachments = append(filteredVolumeAttachments, volumeAttachments[i])
}
}
return filteredVolumeAttachments, nil
}

// CannotMultiAttach returns true if the persistentVolumeClaim's underlying volume cannot be attached to multiple nodes
// i.e. its access mode is not ReadWriteOnce/ReadWriteOncePod
func CannotMultiAttach(pvc v1.PersistentVolumeClaim) bool {
for _, accessMode := range pvc.Spec.AccessModes {
if accessMode == v1.ReadWriteOnce || accessMode == v1.ReadWriteOncePod {
return true
}
}
return false
}

0 comments on commit 497eb96

Please sign in to comment.