From d488fd5e2970fc88babe9990c641777fb46fb0ad Mon Sep 17 00:00:00 2001 From: prashanth26 Date: Sun, 2 May 2021 12:08:20 +0530 Subject: [PATCH] Wait for re-attachment of PVs during serialized eviction of pods - Draining of pods with PV (Persistent Volume) now waits for re-attachment of PV on a different node. - When volumeAttachments support is enabled on the cluster, it tracks volume attachments to determine this. - Else it falls back to the default PV reattachment timeout value configured. Default value is 3mins. Co-authored-by: Amshuman K R --- .gitignore | 1 + pkg/util/backoff/wait_test.go | 3 - pkg/util/k8sutils/helper.go | 74 +++++ pkg/util/provider/app/app.go | 1 + pkg/util/provider/app/options/options.go | 2 + pkg/util/provider/drain/drain.go | 219 +++++++++++--- pkg/util/provider/drain/drain_test.go | 280 +++++++++++++++--- pkg/util/provider/drain/fake_controller.go | 18 +- pkg/util/provider/drain/util.go | 25 ++ pkg/util/provider/drain/volume_attachment.go | 115 +++++++ .../provider/machinecontroller/controller.go | 77 +++-- .../provider/machinecontroller/machine.go | 18 +- .../machinecontroller/machine_safety.go | 2 +- .../machinecontroller/machine_util.go | 4 + pkg/util/provider/machinecontroller/secret.go | 4 +- pkg/util/provider/options/types.go | 2 + 16 files changed, 725 insertions(+), 120 deletions(-) create mode 100644 pkg/util/k8sutils/helper.go create mode 100644 pkg/util/provider/drain/util.go create mode 100644 pkg/util/provider/drain/volume_attachment.go diff --git a/.gitignore b/.gitignore index ad847e22c..7abbeb352 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ managevm controller_manager mcm.out kubectl +.cache_ggshield # Binary files of MCM ./machine-controller-manager diff --git a/pkg/util/backoff/wait_test.go b/pkg/util/backoff/wait_test.go index a9c9b06dc..1c2f07f70 100644 --- a/pkg/util/backoff/wait_test.go +++ b/pkg/util/backoff/wait_test.go @@ -23,7 +23,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" - "k8s.io/klog" ) var ( @@ -132,8 +131,6 @@ var _ = Describe("#wait", func() { action: action{ operation: func() error { invokationCount += 1 - klog.Error(invokationCount) - if invokationCount > 4 { return nil } diff --git a/pkg/util/k8sutils/helper.go b/pkg/util/k8sutils/helper.go new file mode 100644 index 000000000..d9ae05f13 --- /dev/null +++ b/pkg/util/k8sutils/helper.go @@ -0,0 +1,74 @@ +/* +Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package k8sutils is used to provider helper consts and functions for k8s operations +package k8sutils + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" + "k8s.io/klog" +) + +const ( + // VolumeAttachmentGroupName group name + VolumeAttachmentGroupName = "storage.k8s.io" + // VolumeAttachmentResourceName is the kind used for VolumeAttachment + VolumeAttachmentResourceName = "volumeattachments" +) + +// IsResourceSupported uses Discovery API to find out if the server supports +// the given GroupResource. +// If supported, it will return its groupVersion; Otherwise, it will return "" +func IsResourceSupported( + clientset kubernetes.Interface, + gr schema.GroupResource, +) bool { + var ( + foundDesiredGroup bool + desiredGroupVersion string + ) + + discoveryClient := clientset.Discovery() + groupList, err := discoveryClient.ServerGroups() + if err != nil { + return false + } + + for _, group := range groupList.Groups { + if group.Name == gr.Group { + foundDesiredGroup = true + desiredGroupVersion = group.PreferredVersion.GroupVersion + break + } + } + if !foundDesiredGroup { + return false + } + + resourceList, err := discoveryClient.ServerResourcesForGroupVersion(desiredGroupVersion) + if err != nil { + return false + } + + for _, resource := range resourceList.APIResources { + if resource.Name == gr.Resource { + klog.V(3).Infof("Found Resource: %s/%s", gr.Group, gr.Resource) + return true + } + } + return false +} diff --git a/pkg/util/provider/app/app.go b/pkg/util/provider/app/app.go index cc2e8e455..0f4930846 100644 --- a/pkg/util/provider/app/app.go +++ b/pkg/util/provider/app/app.go @@ -262,6 +262,7 @@ func StartControllers(s *options.MCServer, controlCoreInformerFactory.Core().V1().Secrets(), targetCoreInformerFactory.Core().V1().Nodes(), targetCoreInformerFactory.Policy().V1beta1().PodDisruptionBudgets(), + targetCoreInformerFactory.Storage().V1().VolumeAttachments(), machineSharedInformers.MachineClasses(), machineSharedInformers.Machines(), recorder, diff --git a/pkg/util/provider/app/options/options.go b/pkg/util/provider/app/options/options.go index b0da27243..7b25e39b6 100644 --- a/pkg/util/provider/app/options/options.go +++ b/pkg/util/provider/app/options/options.go @@ -68,6 +68,7 @@ func NewMCServer() *MCServer { MachineDrainTimeout: metav1.Duration{Duration: drain.DefaultMachineDrainTimeout}, MaxEvictRetries: drain.DefaultMaxEvictRetries, PvDetachTimeout: metav1.Duration{Duration: 2 * time.Minute}, + PvReattachTimeout: metav1.Duration{Duration: 90 * time.Second}, MachineSafetyOrphanVMsPeriod: metav1.Duration{Duration: 30 * time.Minute}, MachineSafetyAPIServerStatusCheckPeriod: metav1.Duration{Duration: 1 * time.Minute}, MachineSafetyAPIServerStatusCheckTimeout: metav1.Duration{Duration: 30 * time.Second}, @@ -100,6 +101,7 @@ func (s *MCServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.SafetyOptions.MachineDrainTimeout.Duration, "machine-drain-timeout", drain.DefaultMachineDrainTimeout, "Timeout (in durartion) used while draining of machine before deletion, beyond which MCM forcefully deletes machine.") fs.Int32Var(&s.SafetyOptions.MaxEvictRetries, "machine-max-evict-retries", drain.DefaultMaxEvictRetries, "Maximum number of times evicts would be attempted on a pod before it is forcibly deleted during draining of a machine.") fs.DurationVar(&s.SafetyOptions.PvDetachTimeout.Duration, "machine-pv-detach-timeout", s.SafetyOptions.PvDetachTimeout.Duration, "Timeout (in duration) used while waiting for detach of PV while evicting/deleting pods") + fs.DurationVar(&s.SafetyOptions.PvReattachTimeout.Duration, "machine-pv-reattach-timeout", s.SafetyOptions.PvReattachTimeout.Duration, "Timeout (in duration) used while waiting for reattach of PV onto a different node") fs.DurationVar(&s.SafetyOptions.MachineSafetyAPIServerStatusCheckTimeout.Duration, "machine-safety-apiserver-statuscheck-timeout", s.SafetyOptions.MachineSafetyAPIServerStatusCheckTimeout.Duration, "Timeout (in duration) for which the APIServer can be down before declare the machine controller frozen by safety controller") fs.DurationVar(&s.SafetyOptions.MachineSafetyOrphanVMsPeriod.Duration, "machine-safety-orphan-vms-period", s.SafetyOptions.MachineSafetyOrphanVMsPeriod.Duration, "Time period (in durartion) used to poll for orphan VMs by safety controller.") diff --git a/pkg/util/provider/drain/drain.go b/pkg/util/provider/drain/drain.go index 05f622e0f..d33b0c265 100644 --- a/pkg/util/provider/drain/drain.go +++ b/pkg/util/provider/drain/drain.go @@ -36,6 +36,7 @@ import ( api "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" + storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -50,35 +51,47 @@ import ( // Options are configurable options while draining a node before deletion type Options struct { client kubernetes.Interface + DeleteLocalData bool + Driver driver.Driver + drainStartedOn time.Time + drainEndedOn time.Time + ErrOut io.Writer ForceDeletePods bool - IgnorePodsWithoutControllers bool GracePeriodSeconds int + IgnorePodsWithoutControllers bool IgnoreDaemonsets bool - Timeout time.Duration MaxEvictRetries int32 PvDetachTimeout time.Duration - DeleteLocalData bool + PvReattachTimeout time.Duration nodeName string Out io.Writer - ErrOut io.Writer - Driver driver.Driver pvcLister corelisters.PersistentVolumeClaimLister pvLister corelisters.PersistentVolumeLister pdbLister policylisters.PodDisruptionBudgetLister - drainStartedOn time.Time - drainEndedOn time.Time + nodeLister corelisters.NodeLister + volumeAttachmentHandler *VolumeAttachmentHandler + Timeout time.Duration } // Takes a pod and returns a bool indicating whether or not to operate on the // pod, an optional warning message, and an optional fatal error. type podFilter func(api.Pod) (include bool, w *warning, f *fatal) + type warning struct { string } + type fatal struct { string } +// PodVolumeInfo is the struct used to hold the PersistantVolumeID and volumeID +// for all the PVs attached to the pod +type PodVolumeInfo struct { + persistantVolumeList []string + volumeList []string +} + const ( // EvictionKind is the kind used for eviction EvictionKind = "Eviction" @@ -111,6 +124,7 @@ const ( localStorageWarning = "Deleting pods with local storage" unmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use --force to override)" unmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet" + reattachTimeoutErr = "Timeout occurred while waiting for PV to reattach to a different node" ) var ( @@ -124,6 +138,7 @@ func NewDrainOptions( timeout time.Duration, maxEvictRetries int32, pvDetachTimeout time.Duration, + pvReattachTimeout time.Duration, nodeName string, gracePeriodSeconds int, forceDeletePods bool, @@ -136,8 +151,9 @@ func NewDrainOptions( pvcLister corelisters.PersistentVolumeClaimLister, pvLister corelisters.PersistentVolumeLister, pdbLister policylisters.PodDisruptionBudgetLister, + nodeLister corelisters.NodeLister, + volumeAttachmentHandler *VolumeAttachmentHandler, ) *Options { - return &Options{ client: client, ForceDeletePods: forceDeletePods, @@ -147,6 +163,7 @@ func NewDrainOptions( MaxEvictRetries: maxEvictRetries, Timeout: timeout, PvDetachTimeout: pvDetachTimeout, + PvReattachTimeout: pvReattachTimeout, DeleteLocalData: deleteLocalData, nodeName: nodeName, Out: out, @@ -155,8 +172,9 @@ func NewDrainOptions( pvcLister: pvcLister, pvLister: pvLister, pdbLister: pdbLister, + nodeLister: nodeLister, + volumeAttachmentHandler: volumeAttachmentHandler, } - } // RunDrain runs the 'drain' command @@ -477,31 +495,40 @@ func sortPodsByPriority(pods []*corev1.Pod) { }) } -// doAccountingOfPvs returns the map with keys as pod names and values as array of attached volumes' IDs -func (o *Options) doAccountingOfPvs(pods []*corev1.Pod) map[string][]string { - volMap := make(map[string][]string) - pvMap := make(map[string][]string) +// doAccountingOfPvs returns a map with the key as a hash of +// pod-namespace/pod-name and value as a PodVolumeInfo object +func (o *Options) doAccountingOfPvs(pods []*corev1.Pod) map[string]PodVolumeInfo { + var ( + pvMap = make(map[string][]string) + podVolumeInfoMap = make(map[string]PodVolumeInfo) + ) for _, pod := range pods { - podPVs, _ := o.getPvs(pod) - pvMap[pod.Namespace+"/"+pod.Name] = podPVs + podPVs, _ := o.getPVList(pod) + pvMap[getPodKey(pod)] = podPVs } - klog.V(4).Info("PV map: ", pvMap) + // Filter the list of shared PVs filterSharedPVs(pvMap) - for i := range pvMap { - pvList := pvMap[i] - vols, err := o.getVolIDsFromDriver(pvList) + for podKey, persistantVolumeList := range pvMap { + persistantVolumeListDeepCopy := persistantVolumeList + volumeList, err := o.getVolIDsFromDriver(persistantVolumeList) if err != nil { // In case of error, log and skip this set of volumes - klog.Errorf("Error getting volume ID from cloud provider. Skipping volumes for pod: %v. Err: %v", i, err) + klog.Errorf("Error getting volume ID from cloud provider. Skipping volumes for pod: %v. Err: %v", podKey, err) continue } - volMap[i] = vols + + podVolumeInfo := PodVolumeInfo{ + persistantVolumeList: persistantVolumeListDeepCopy, + volumeList: volumeList, + } + podVolumeInfoMap[podKey] = podVolumeInfo } - klog.V(4).Info("Volume map: ", volMap) - return volMap + klog.V(4).Infof("PodVolumeInfoMap = %v", podVolumeInfoMap) + + return podVolumeInfoMap } // filterSharedPVs filters out the PVs that are shared among pods. @@ -548,7 +575,7 @@ func (o *Options) evictPodsWithPv(attemptEvict bool, pods []*corev1.Pod, ) { sortPodsByPriority(pods) - volMap := o.doAccountingOfPvs(pods) + podVolumeInfoMap := o.doAccountingOfPvs(pods) var ( remainingPods []*api.Pod @@ -558,7 +585,7 @@ func (o *Options) evictPodsWithPv(attemptEvict bool, pods []*corev1.Pod, if attemptEvict { for i := 0; i < nretries; i++ { - remainingPods, fastTrack = o.evictPodsWithPVInternal(attemptEvict, pods, volMap, policyGroupVersion, getPodFn, returnCh) + remainingPods, fastTrack = o.evictPodsWithPVInternal(attemptEvict, pods, podVolumeInfoMap, policyGroupVersion, getPodFn, returnCh) if fastTrack || len(remainingPods) == 0 { //Either all pods got evicted or we need to fast track the return (node deletion detected) break @@ -576,10 +603,10 @@ func (o *Options) evictPodsWithPv(attemptEvict bool, pods []*corev1.Pod, if !fastTrack && len(remainingPods) > 0 { // Force delete the pods remaining after evict retries. pods = remainingPods - remainingPods, _ = o.evictPodsWithPVInternal(false, pods, volMap, policyGroupVersion, getPodFn, returnCh) + remainingPods, _ = o.evictPodsWithPVInternal(false, pods, podVolumeInfoMap, policyGroupVersion, getPodFn, returnCh) } } else { - remainingPods, _ = o.evictPodsWithPVInternal(false, pods, volMap, policyGroupVersion, getPodFn, returnCh) + remainingPods, _ = o.evictPodsWithPVInternal(false, pods, podVolumeInfoMap, policyGroupVersion, getPodFn, returnCh) } // Placate the caller by returning the nil status for the remaining pods. @@ -599,10 +626,14 @@ func (o *Options) evictPodsWithPv(attemptEvict bool, pods []*corev1.Pod, return } -func (o *Options) evictPodsWithPVInternal(attemptEvict bool, pods []*corev1.Pod, volMap map[string][]string, +func (o *Options) evictPodsWithPVInternal( + attemptEvict bool, + pods []*corev1.Pod, + podVolumeInfoMap map[string]PodVolumeInfo, policyGroupVersion string, getPodFn func(namespace, name string) (*api.Pod, error), - returnCh chan error) (remainingPods []*api.Pod, fastTrack bool) { + returnCh chan error, +) (remainingPods []*api.Pod, fastTrack bool) { var ( mainContext context.Context cancelMainContext context.CancelFunc @@ -621,11 +652,15 @@ func (o *Options) evictPodsWithPVInternal(attemptEvict bool, pods []*corev1.Pod, } var ( - err error - podEvictionStartTime time.Time + err error + volumeAttachmentEventCh chan *storagev1.VolumeAttachment + podEvictionStartTime = time.Now() ) - podEvictionStartTime = time.Now() + if o.volumeAttachmentHandler != nil { + // Initialize event handler before triggerring pod delete/evict to avoid missing of events + volumeAttachmentEventCh = o.volumeAttachmentHandler.AddWorker() + } if attemptEvict { err = o.evictPod(pod, policyGroupVersion) @@ -668,9 +703,9 @@ func (o *Options) evictPodsWithPVInternal(attemptEvict bool, pods []*corev1.Pod, time.Since(podEvictionStartTime), ) - pvs := volMap[pod.Namespace+"/"+pod.Name] + podVolumeInfo := podVolumeInfoMap[getPodKey(pod)] ctx, cancelFn := context.WithTimeout(mainContext, o.getTerminationGracePeriod(pod)+o.PvDetachTimeout) - err = o.waitForDetach(ctx, pvs, o.nodeName) + err = o.waitForDetach(ctx, podVolumeInfo, o.nodeName) cancelFn() if apierrors.IsNotFound(err) { @@ -682,20 +717,47 @@ func (o *Options) evictPodsWithPVInternal(attemptEvict bool, pods []*corev1.Pod, returnCh <- err continue } - klog.V(3).Infof( - "Volume detached for Pod %s/%s in Node %q and took %v (including pod eviction/deletion time).", + klog.V(4).Infof( + "Pod + volume detachment from Node %s for Pod %s/%s and took %v", pod.Namespace, pod.Name, pod.Spec.NodeName, time.Since(podEvictionStartTime), ) + + ctx, cancelFn = context.WithTimeout(mainContext, o.PvReattachTimeout) + err = o.waitForReattach(ctx, podVolumeInfo, o.nodeName, volumeAttachmentEventCh) + cancelFn() + + if err != nil { + if err.Error() == reattachTimeoutErr { + klog.Warningf("Timeout occurred for following volumes to reattach: %v", podVolumeInfo.persistantVolumeList) + } else { + klog.Errorf("Error when waiting for volume reattachment. Err: %v", err) + returnCh <- err + continue + } + } + + if o.volumeAttachmentHandler != nil { + o.volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh) + } + + klog.V(3).Infof( + "Pod + volume detachment from node %s + volume reattachment to another node for Pod %s/%s took %v", + o.nodeName, + pod.Namespace, + pod.Name, + time.Since(podEvictionStartTime), + ) + returnCh <- nil } return retryPods, false } -func (o *Options) getPvs(pod *corev1.Pod) ([]string, error) { +func (o *Options) getPVList(pod *corev1.Pod) ([]string, error) { pvs := []string{} for i := range pod.Spec.Volumes { vol := &pod.Spec.Volumes[i] @@ -731,14 +793,14 @@ func (o *Options) getPvs(pod *corev1.Pod) ([]string, error) { return pvs, nil } -func (o *Options) waitForDetach(ctx context.Context, volumeIDs []string, nodeName string) error { - if volumeIDs == nil || len(volumeIDs) == 0 || nodeName == "" { +func (o *Options) waitForDetach(ctx context.Context, podVolumeInfo PodVolumeInfo, nodeName string) error { + if len(podVolumeInfo.volumeList) == 0 || nodeName == "" { // If volume or node name is not available, nothing to do. Just log this as warning - klog.Warningf("Node name: %q, list of pod PVs to wait for detach: %v", nodeName, volumeIDs) + klog.Warningf("Node name: %q, list of pod PVs to wait for detach: %v", nodeName, podVolumeInfo.volumeList) return nil } - klog.V(4).Info("Waiting for following volumes to detach: ", volumeIDs) + klog.V(4).Info("Waiting for following volumes to detach: ", podVolumeInfo.volumeList) found := true @@ -769,18 +831,17 @@ func (o *Options) waitForDetach(ctx context.Context, volumeIDs []string, nodeNam } LookUpVolume: - for i := range volumeIDs { - volumeID := &volumeIDs[i] + for _, volumeID := range podVolumeInfo.volumeList { for j := range attachedVols { attachedVol := &attachedVols[j] - found, _ = regexp.MatchString(*volumeID, string(attachedVol.Name)) + found, _ = regexp.MatchString(volumeID, string(attachedVol.Name)) if found { klog.V(4).Infof( "Found volume:%s still attached to node %q. Will re-check in %s", - *volumeID, + volumeID, nodeName, VolumeDetachPollInterval, ) @@ -791,7 +852,71 @@ func (o *Options) waitForDetach(ctx context.Context, volumeIDs []string, nodeNam } } - klog.V(4).Infof("Detached volumes:%s from node %q", volumeIDs, nodeName) + klog.V(4).Infof("Detached volumes:%s from node %q", podVolumeInfo.volumeList, nodeName) + return nil +} + +func isDesiredReattachment(volumeAttachment *storagev1.VolumeAttachment, previousNodeName string) bool { + // klog.Errorf("\nPV: %s = %s, \nAttached: %b, \nNode: %s = %s", *volumeAttachment.Spec.Source.PersistentVolumeName, persistantVolumeName, volumeAttachment.Status.Attached, volumeAttachment.Spec.NodeName, previousNodeName) + if volumeAttachment.Status.Attached && volumeAttachment.Spec.NodeName != previousNodeName { + klog.V(4).Infof("ReattachmentSuccessful for PV: %q", *volumeAttachment.Spec.Source.PersistentVolumeName) + return true + } + + return false +} + +// waitForReattach to consider 2 cases +// 1. If CSI is enabled use determine reattach +// 2. If all else fails, fallback to static timeout +func (o *Options) waitForReattach(ctx context.Context, podVolumeInfo PodVolumeInfo, previousNodeName string, volumeAttachmentEventCh chan *storagev1.VolumeAttachment) error { + if len(podVolumeInfo.persistantVolumeList) == 0 || previousNodeName == "" { + // If volume or node name is not available, nothing to do. Just log this as warning + klog.Warningf("List of pod PVs waiting for reattachment is 0: %v", podVolumeInfo.persistantVolumeList) + return nil + } + + klog.V(3).Infof("Waiting for following volumes to reattach: %v", podVolumeInfo.persistantVolumeList) + + var pvsWaitingForReattachments map[string]bool + if volumeAttachmentEventCh != nil { + pvsWaitingForReattachments = make(map[string]bool) + for _, persistantVolumeName := range podVolumeInfo.persistantVolumeList { + pvsWaitingForReattachments[persistantVolumeName] = true + } + } + + // This loop exits in either of the following cases + // 1. Context timeout occurs - PV rettachment timeout or Drain Timeout + // 2. All PVs for given pod are reattached successfully + for { + select { + + case <-ctx.Done(): + // Timeout occurred waiting for reattachment, exit function with error + klog.Warningf("Timeout occurred while waiting for PVs %v to reattach to a different node", podVolumeInfo.persistantVolumeList) + return fmt.Errorf(reattachTimeoutErr) + + case incomingEvent := <-volumeAttachmentEventCh: + persistantVolumeName := *incomingEvent.Spec.Source.PersistentVolumeName + klog.V(5).Infof("VolumeAttachment event recieved for PV: %s", persistantVolumeName) + + // Checking if event for an PV that is being waited on + if _, present := pvsWaitingForReattachments[persistantVolumeName]; present { + // Check if reattachment was successful + if reattachmentSuccess := isDesiredReattachment(incomingEvent, previousNodeName); reattachmentSuccess { + delete(pvsWaitingForReattachments, persistantVolumeName) + } + } + } + + if len(pvsWaitingForReattachments) == 0 { + // If all PVs have been reattached, break out of for loop + break + } + } + + klog.V(3).Infof("Successfully reattached volumes: %s", podVolumeInfo.persistantVolumeList) return nil } @@ -964,6 +1089,8 @@ func SupportEviction(clientset kubernetes.Interface) (string, error) { // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for // "Unschedulable" is passed as the first arg. func (o *Options) RunCordonOrUncordon(desired bool) error { + // TODO: Change this to lister call + // node, err := o.nodeLister.Get(o.nodeName) node, err := o.client.CoreV1().Nodes().Get(o.nodeName, metav1.GetOptions{}) if err != nil { // Deletion could be triggered when machine is just being created, no node present then diff --git a/pkg/util/provider/drain/drain_test.go b/pkg/util/provider/drain/drain_test.go index f1398d26c..d5e2467bb 100644 --- a/pkg/util/provider/drain/drain_test.go +++ b/pkg/util/provider/drain/drain_test.go @@ -13,15 +13,16 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -*/ // Package drain is used to drain nodes +*/ + +// Package drain is used to drain nodes package drain -/* -TODO: Fix timeout issue for tests import ( "context" "fmt" "regexp" + "strconv" "sync" "time" @@ -32,38 +33,46 @@ import ( . "github.com/onsi/gomega" api "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" + storagev1 "k8s.io/api/storage/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" ) var _ = Describe("drain", func() { - const testNodeName = "node" - const terminationGracePeriodShort = 5 * time.Second - const terminationGracePeriodShortBy4 = terminationGracePeriodShort / 4 - const terminationGracePeriodShortBy8 = terminationGracePeriodShort / 8 - const terminationGracePeriodMedium = 10 * time.Second - const terminationGracePeriodDefault = 20 * time.Second - const terminationGracePeriodLong = 2 * time.Minute - const testNamespace = "test" + const ( + oldNodeName = "old-node" + newNodeName = "new-node" + terminationGracePeriodShort = 5 * time.Second + terminationGracePeriodShortBy4 = terminationGracePeriodShort / 4 + terminationGracePeriodShortBy8 = terminationGracePeriodShort / 8 + terminationGracePeriodMedium = 10 * time.Second + terminationGracePeriodDefault = 20 * time.Second + terminationGracePeriodLong = 2 * time.Minute + testNamespace = "test" + ) type stats struct { nPodsWithoutPV int nPodsWithOnlyExclusivePV int nPodsWithOnlySharedPV int nPodsWithExclusiveAndSharedPV int + nPVsPerPodWithExclusivePV int } type setup struct { stats - attemptEviction bool - maxEvictRetries int32 - terminationGracePeriod time.Duration - force bool - evictError error - deleteError error + attemptEviction bool + volumeAttachmentSupported bool + maxEvictRetries int32 + terminationGracePeriod time.Duration + pvReattachTimeout time.Duration + force bool + evictError error + deleteError error } type expectation struct { @@ -83,16 +92,16 @@ var _ = Describe("drain", func() { wg := sync.WaitGroup{} - podsWithoutPV := getPodsWithoutPV(setup.nPodsWithoutPV, testNamespace, "nopv-", testNodeName, setup.terminationGracePeriod, map[string]string{ + podsWithoutPV := getPodsWithoutPV(setup.nPodsWithoutPV, testNamespace, "nopv-", oldNodeName, setup.terminationGracePeriod, map[string]string{ "volumes": "none", }) - podsWithOnlyExclusivePV := getPodsWithPV(setup.nPodsWithOnlyExclusivePV, setup.nPodsWithOnlyExclusivePV, 0, testNamespace, "expv-", "expv-", "", testNodeName, setup.terminationGracePeriod, map[string]string{ + podsWithOnlyExclusivePV := getPodsWithPV(setup.nPodsWithOnlyExclusivePV, setup.nPodsWithOnlyExclusivePV, 0, setup.nPVsPerPodWithExclusivePV, testNamespace, "expv-", "expv-", "", oldNodeName, setup.terminationGracePeriod, map[string]string{ "volumes": "only-exclusive", }) - podsWithOnlySharedPV := getPodsWithPV(setup.nPodsWithOnlySharedPV, 0, setup.nPodsWithOnlySharedPV/2, testNamespace, "shpv-", "", "shpv-", testNodeName, setup.terminationGracePeriod, map[string]string{ + podsWithOnlySharedPV := getPodsWithPV(setup.nPodsWithOnlySharedPV, 0, setup.nPodsWithOnlySharedPV/2, setup.nPVsPerPodWithExclusivePV, testNamespace, "shpv-", "", "shpv-", oldNodeName, setup.terminationGracePeriod, map[string]string{ "volumes": "only-shared", }) - nPodsWithExclusiveAndSharedPV := getPodsWithPV(setup.nPodsWithExclusiveAndSharedPV, setup.nPodsWithExclusiveAndSharedPV, setup.nPodsWithExclusiveAndSharedPV/2, testNamespace, "exshpv-", "exshexpv-", "exshshpv-", testNodeName, setup.terminationGracePeriod, map[string]string{ + nPodsWithExclusiveAndSharedPV := getPodsWithPV(setup.nPodsWithExclusiveAndSharedPV, setup.nPodsWithExclusiveAndSharedPV, setup.nPodsWithExclusiveAndSharedPV/2, setup.nPVsPerPodWithExclusivePV, testNamespace, "exshpv-", "exshexpv-", "exshshpv-", oldNodeName, setup.terminationGracePeriod, map[string]string{ "volumes": "exclusive-and-shared", }) @@ -104,41 +113,65 @@ var _ = Describe("drain", func() { pvcs := getPVCs(pods) pvs := getPVs(pvcs) - nodes := []*corev1.Node{getNode(testNodeName, pvs)} + nodes := []*corev1.Node{getNode(oldNodeName, pvs)} var targetCoreObjects []runtime.Object targetCoreObjects = appendPods(targetCoreObjects, pods) targetCoreObjects = appendPVCs(targetCoreObjects, pvcs) targetCoreObjects = appendPVs(targetCoreObjects, pvs) targetCoreObjects = appendNodes(targetCoreObjects, nodes) - fakeTargetCoreClient, fakePVLister, fakePVCLister, tracker := createFakeController( + + var volumeAttachmentHandler *VolumeAttachmentHandler + // If volumeAttachmentSupported is enabled + // setup volume attachments as well + if setup.volumeAttachmentSupported { + volumeAttachmentHandler = NewVolumeAttachmentHandler() + volumeAttachments := getVolumeAttachments(pvs, oldNodeName) + targetCoreObjects = appendVolumeAttachments(targetCoreObjects, volumeAttachments) + } + + fakeTargetCoreClient, fakePVLister, fakePVCLister, fakeNodeLister, pvcSynced, pvSynced, nodeSynced, tracker := createFakeController( stop, testNamespace, targetCoreObjects, ) defer tracker.Stop() - //Expect(cache.WaitForCacheSync(stop, fakePVCLister)).To(BeTrue()) + // Waiting for cache sync + Expect(cache.WaitForCacheSync(stop, pvcSynced, pvSynced, nodeSynced)).To(BeTrue()) - //fakeDriver := driver.NewFakeDriver(driver.FakeDriver{Err: nil,}) maxEvictRetries := setup.maxEvictRetries if maxEvictRetries <= 0 { maxEvictRetries = 3 } + + pvReattachTimeout := setup.pvReattachTimeout + if pvReattachTimeout == time.Duration(0) { + // To mock quick reattachments by setting + // reattachment time to 1 millisecond + pvReattachTimeout = 1 * time.Millisecond + } + d := &Options{ + client: fakeTargetCoreClient, DeleteLocalData: true, Driver: &drainDriver{}, + drainStartedOn: time.Time{}, + drainEndedOn: time.Time{}, ErrOut: GinkgoWriter, ForceDeletePods: setup.force, - IgnorePodsWithoutControllers: true, GracePeriodSeconds: 30, + IgnorePodsWithoutControllers: true, IgnoreDaemonsets: true, MaxEvictRetries: maxEvictRetries, + PvDetachTimeout: 30 * time.Second, + PvReattachTimeout: pvReattachTimeout, + nodeName: oldNodeName, Out: GinkgoWriter, - PvDetachTimeout: 3 * time.Minute, - Timeout: time.Minute, - client: fakeTargetCoreClient, - nodeName: testNodeName, - pvLister: fakePVLister, pvcLister: fakePVCLister, + pvLister: fakePVLister, + pdbLister: nil, + nodeLister: fakeNodeLister, + Timeout: 2 * time.Minute, + volumeAttachmentHandler: volumeAttachmentHandler, } // Get the pod directly from the ObjectTracker to avoid locking issues in the Fake object. @@ -194,9 +227,16 @@ var _ = Describe("drain", func() { } found := false - for _, pv := range pvs { - if va.Name == corev1.UniqueVolumeName(getDrainTestVolumeName(&pv.Spec)) { + n := len(pvs) + for i := range pvs { + // Inverting reattachment logic to support to test out of order reattach + j := n - i - 1 + if va.Name == corev1.UniqueVolumeName(getDrainTestVolumeName(&pvs[j].Spec)) { found = true + if setup.volumeAttachmentSupported { + // Serially reattach + updateVolumeAttachments(d, pvs[j].Name, newNodeName) + } break } } @@ -215,6 +255,9 @@ var _ = Describe("drain", func() { _, err = nodes.Update(node) fmt.Fprintln(GinkgoWriter, err) + + _, err = nodes.UpdateStatus(node) + fmt.Fprintln(GinkgoWriter, err) } }() @@ -381,6 +424,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 0, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: false, terminationGracePeriod: terminationGracePeriodShort, @@ -406,6 +450,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 0, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: true, terminationGracePeriod: terminationGracePeriodShort, @@ -433,6 +478,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 2, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: false, terminationGracePeriod: terminationGracePeriodShort, @@ -460,6 +506,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 2, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: true, terminationGracePeriod: terminationGracePeriodShort, @@ -480,6 +527,66 @@ var _ = Describe("drain", func() { // Because waitForDetach polling Interval is equal to terminationGracePeriodShort minDrainDuration: terminationGracePeriodMedium, }), + Entry("Successful drain with support for eviction of pods with exclusive volumes with volume attachments", + &setup{ + stats: stats{ + nPodsWithoutPV: 0, + nPodsWithOnlyExclusivePV: 2, + nPodsWithOnlySharedPV: 0, + nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, + }, + attemptEviction: true, + volumeAttachmentSupported: true, + pvReattachTimeout: 30 * time.Second, + terminationGracePeriod: terminationGracePeriodShort, + }, + []podDrainHandler{deletePod, sleepFor(terminationGracePeriodShortBy8), detachExclusiveVolumes}, + &expectation{ + stats: stats{ + nPodsWithoutPV: 0, + nPodsWithOnlyExclusivePV: 0, + nPodsWithOnlySharedPV: 0, + nPodsWithExclusiveAndSharedPV: 0, + }, + // Because waitForDetach polling Interval is equal to terminationGracePeriodDefault + timeout: terminationGracePeriodLong, + drainTimeout: false, + drainError: nil, + nEvictions: 2, + // Because waitForDetach polling Interval is equal to terminationGracePeriodMedium + minDrainDuration: terminationGracePeriodMedium, + }), + Entry("Successful drain with support for eviction of pods with 2 exclusive volumes with volume attachments", + &setup{ + stats: stats{ + nPodsWithoutPV: 0, + nPodsWithOnlyExclusivePV: 1, + nPodsWithOnlySharedPV: 0, + nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 2, + }, + attemptEviction: true, + volumeAttachmentSupported: true, + pvReattachTimeout: 30 * time.Second, + terminationGracePeriod: terminationGracePeriodShort, + }, + []podDrainHandler{deletePod, sleepFor(terminationGracePeriodShortBy8), detachExclusiveVolumes}, + &expectation{ + stats: stats{ + nPodsWithoutPV: 0, + nPodsWithOnlyExclusivePV: 0, + nPodsWithOnlySharedPV: 0, + nPodsWithExclusiveAndSharedPV: 0, + }, + // Because waitForDetach polling Interval is equal to terminationGracePeriodDefault + timeout: terminationGracePeriodDefault, + drainTimeout: false, + drainError: nil, + nEvictions: 1, + // Because waitForDetach polling Interval is equal to terminationGracePeriodMedium + minDrainDuration: terminationGracePeriodMedium, + }), Entry("Successful drain without support for eviction of pods with shared volumes", &setup{ stats: stats{ @@ -487,6 +594,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 0, nPodsWithOnlySharedPV: 2, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: false, terminationGracePeriod: terminationGracePeriodShort, @@ -512,6 +620,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 0, nPodsWithOnlySharedPV: 2, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: true, terminationGracePeriod: terminationGracePeriodShort, @@ -537,6 +646,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 0, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 2, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: false, terminationGracePeriod: terminationGracePeriodShort, @@ -564,6 +674,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 0, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 2, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: true, terminationGracePeriod: terminationGracePeriodShort, @@ -591,6 +702,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 2, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: false, terminationGracePeriod: terminationGracePeriodShort, @@ -618,6 +730,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 2, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: true, terminationGracePeriod: terminationGracePeriodShort, @@ -645,6 +758,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 2, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: false, terminationGracePeriod: terminationGracePeriodShort, @@ -671,6 +785,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 2, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: true, terminationGracePeriod: terminationGracePeriodShort, @@ -697,6 +812,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 2, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, maxEvictRetries: 1, attemptEviction: true, @@ -725,6 +841,7 @@ var _ = Describe("drain", func() { nPodsWithOnlyExclusivePV: 2, nPodsWithOnlySharedPV: 0, nPodsWithExclusiveAndSharedPV: 0, + nPVsPerPodWithExclusivePV: 1, }, attemptEviction: true, terminationGracePeriod: terminationGracePeriodLong, @@ -777,7 +894,7 @@ func getPodsWithoutPV(n int, ns, podPrefix, nodeName string, terminationGracePer return pods } -func getPodWithPV(ns, name, exclusivePV, sharedPV, nodeName string, terminationGracePeriod time.Duration, labels map[string]string) *corev1.Pod { +func getPodWithPV(ns, name, exclusivePV, sharedPV, nodeName string, terminationGracePeriod time.Duration, labels map[string]string, numberOfExclusivePVs int) *corev1.Pod { pod := getPodWithoutPV(ns, name, nodeName, terminationGracePeriod, labels) appendVolume := func(pod *api.Pod, vol string) { @@ -792,7 +909,9 @@ func getPodWithPV(ns, name, exclusivePV, sharedPV, nodeName string, terminationG } if exclusivePV != "" { - appendVolume(pod, exclusivePV) + for i := 0; i < numberOfExclusivePVs; i++ { + appendVolume(pod, exclusivePV+"-"+strconv.Itoa(i)) + } } if sharedPV != "" { appendVolume(pod, sharedPV) @@ -800,7 +919,7 @@ func getPodWithPV(ns, name, exclusivePV, sharedPV, nodeName string, terminationG return pod } -func getPodsWithPV(nPod, nExclusivePV, nSharedPV int, ns, podPrefix, exclusivePVPrefix, sharedPVPrefix, nodeName string, terminationGracePeriod time.Duration, labels map[string]string) []*corev1.Pod { +func getPodsWithPV(nPod, nExclusivePV, nSharedPV, numberOfExclusivePVs int, ns, podPrefix, exclusivePVPrefix, sharedPVPrefix, nodeName string, terminationGracePeriod time.Duration, labels map[string]string) []*corev1.Pod { pods := make([]*corev1.Pod, nPod) for i := range pods { var ( @@ -814,7 +933,7 @@ func getPodsWithPV(nPod, nExclusivePV, nSharedPV int, ns, podPrefix, exclusivePV if nSharedPV > 0 { sharedPV = fmt.Sprintf("%s%d", sharedPVPrefix, i%nSharedPV) } - pods[i] = getPodWithPV(ns, podName, exclusivePV, sharedPV, nodeName, terminationGracePeriod, labels) + pods[i] = getPodWithPV(ns, podName, exclusivePV, sharedPV, nodeName, terminationGracePeriod, labels, numberOfExclusivePVs) } return pods } @@ -883,6 +1002,35 @@ func getPVs(pvcs []*corev1.PersistentVolumeClaim) []*corev1.PersistentVolume { return pvs } +func getVolumeAttachments(pvs []*corev1.PersistentVolume, nodeName string) []*storagev1.VolumeAttachment { + volumeAttachments := make([]*storagev1.VolumeAttachment, 0) + + for _, pv := range pvs { + pvName := pv.Name + + volumeAttachment := &storagev1.VolumeAttachment{ + ObjectMeta: metav1.ObjectMeta{ + // TODO: Get random value + Name: "csi-old-" + pv.Name, + }, + Spec: storagev1.VolumeAttachmentSpec{ + Attacher: "disk.csi.azure.com", + Source: storagev1.VolumeAttachmentSource{ + PersistentVolumeName: &pvName, + }, + NodeName: nodeName, + }, + Status: storagev1.VolumeAttachmentStatus{ + Attached: true, + }, + } + + volumeAttachments = append(volumeAttachments, volumeAttachment) + } + + return volumeAttachments +} + func getNode(name string, pvs []*corev1.PersistentVolume) *corev1.Node { n := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -950,4 +1098,60 @@ func appendNodes(objects []runtime.Object, nodes []*corev1.Node) []runtime.Objec } return objects } -*/ + +func appendVolumeAttachments(objects []runtime.Object, volumeAttachments []*storagev1.VolumeAttachment) []runtime.Object { + for _, va := range volumeAttachments { + objects = append(objects, va) + } + return objects +} + +func updateVolumeAttachments(drainOptions *Options, pvName string, nodeName string) { + var ( + found bool + volumeAttachment storagev1.VolumeAttachment + ) + const reattachmentDelay = 5 * time.Second + time.Sleep(reattachmentDelay) + + // Delete existing volume attachment + volumeAttachments, err := drainOptions.client.StorageV1().VolumeAttachments().List(metav1.ListOptions{}) + Expect(err).To(BeNil()) + + for _, volumeAttachment = range volumeAttachments.Items { + if *volumeAttachment.Spec.Source.PersistentVolumeName == pvName { + found = true + break + } + } + + Expect(found).To(BeTrue()) + err = drainOptions.client.StorageV1().VolumeAttachments().Delete(volumeAttachment.Name, &metav1.DeleteOptions{}) + Expect(err).To(BeNil()) + + // Create new volumeAttachment object + newVolumeAttachment := &storagev1.VolumeAttachment{ + TypeMeta: metav1.TypeMeta{}, + ObjectMeta: metav1.ObjectMeta{ + Name: "csi-new-" + pvName, + }, + Spec: storagev1.VolumeAttachmentSpec{ + Attacher: "disk.csi.azure.com", + Source: storagev1.VolumeAttachmentSource{ + PersistentVolumeName: &pvName, + }, + NodeName: nodeName, + }, + Status: storagev1.VolumeAttachmentStatus{ + Attached: true, + }, + } + + newVolumeAttachment, err = drainOptions.client.StorageV1().VolumeAttachments().Create(newVolumeAttachment) + Expect(err).To(BeNil()) + + newVolumeAttachment, err = drainOptions.client.StorageV1().VolumeAttachments().UpdateStatus(newVolumeAttachment) + Expect(err).To(BeNil()) + + drainOptions.volumeAttachmentHandler.AddVolumeAttachment(newVolumeAttachment) +} diff --git a/pkg/util/provider/drain/fake_controller.go b/pkg/util/provider/drain/fake_controller.go index de028b0b6..267f7c33a 100644 --- a/pkg/util/provider/drain/fake_controller.go +++ b/pkg/util/provider/drain/fake_controller.go @@ -32,7 +32,15 @@ func createFakeController( stop <-chan struct{}, namespace string, targetCoreObjects []runtime.Object, -) (kubernetes.Interface, corelisters.PersistentVolumeLister, corelisters.PersistentVolumeClaimLister, *customfake.FakeObjectTracker) { +) ( + kubernetes.Interface, + corelisters.PersistentVolumeLister, + corelisters.PersistentVolumeClaimLister, + corelisters.NodeLister, + func() bool, + func() bool, + func() bool, + *customfake.FakeObjectTracker) { fakeTargetCoreClient, targetCoreObjectTracker := customfake.NewCoreClientSet(targetCoreObjects...) go targetCoreObjectTracker.Start() @@ -47,9 +55,15 @@ func createFakeController( coreTargetSharedInformers := coreTargetInformerFactory.Core().V1() pvcs := coreTargetSharedInformers.PersistentVolumeClaims() pvs := coreTargetSharedInformers.PersistentVolumes() + nodes := coreTargetSharedInformers.Nodes() pvcLister := pvcs.Lister() pvLister := pvs.Lister() + nodeLister := nodes.Lister() - return fakeTargetCoreClient, pvLister, pvcLister, targetCoreObjectTracker + pvcSynced := pvcs.Informer().HasSynced + pvSynced := pvs.Informer().HasSynced + nodeSynced := nodes.Informer().HasSynced + + return fakeTargetCoreClient, pvLister, pvcLister, nodeLister, pvcSynced, pvSynced, nodeSynced, targetCoreObjectTracker } diff --git a/pkg/util/provider/drain/util.go b/pkg/util/provider/drain/util.go new file mode 100644 index 000000000..bdf5f941d --- /dev/null +++ b/pkg/util/provider/drain/util.go @@ -0,0 +1,25 @@ +/* +Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +// Package drain is used to drain nodes +package drain + +import corev1 "k8s.io/api/core/v1" + +func getPodKey(pod *corev1.Pod) string { + return pod.Namespace + "/" + pod.Name +} diff --git a/pkg/util/provider/drain/volume_attachment.go b/pkg/util/provider/drain/volume_attachment.go new file mode 100644 index 000000000..5d01e43ad --- /dev/null +++ b/pkg/util/provider/drain/volume_attachment.go @@ -0,0 +1,115 @@ +/* +Copyright (c) 2021 SAP SE or an SAP affiliate company. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +// Package drain is used to drain nodes +package drain + +import ( + "sync" + + storagev1 "k8s.io/api/storage/v1" + "k8s.io/klog" +) + +// VolumeAttachmentHandler is an handler used to distribute +// incoming VolumeAttachment requests to all listening workers +type VolumeAttachmentHandler struct { + sync.Mutex + workers []chan *storagev1.VolumeAttachment +} + +// NewVolumeAttachmentHandler returns a new VolumeAttachmentHandler +func NewVolumeAttachmentHandler() *VolumeAttachmentHandler { + return &VolumeAttachmentHandler{ + Mutex: sync.Mutex{}, + workers: []chan *storagev1.VolumeAttachment{}, + } +} + +func (v *VolumeAttachmentHandler) dispatch(obj interface{}) { + if len(v.workers) == 0 { + // As no workers are registered, nothing to do here. + return + } + + volumeAttachment := obj.(*storagev1.VolumeAttachment) + if volumeAttachment == nil { + klog.Errorf("Couldn't convert to volumeAttachment from object %v", obj) + } + + klog.V(4).Infof("Dispatching request for PV %s", *volumeAttachment.Spec.Source.PersistentVolumeName) + + v.Lock() + defer v.Unlock() + + for i, worker := range v.workers { + klog.V(4).Infof("Dispatching request for PV %s to worker %d", *volumeAttachment.Spec.Source.PersistentVolumeName, i) + worker <- volumeAttachment + } +} + +// AddVolumeAttachment is the event handler for VolumeAttachment add +func (v *VolumeAttachmentHandler) AddVolumeAttachment(obj interface{}) { + klog.V(5).Infof("Adding volume attachment object") + v.dispatch(obj) +} + +// UpdateVolumeAttachment is the event handler for VolumeAttachment update +func (v *VolumeAttachmentHandler) UpdateVolumeAttachment(oldObj, newObj interface{}) { + klog.V(5).Info("Updating volume attachment object") + v.dispatch(newObj) +} + +// AddWorker is the method used to add a new worker +func (v *VolumeAttachmentHandler) AddWorker() chan *storagev1.VolumeAttachment { + // chanSize is the channel buffer size to hold requests. + // This assumes that not more than 20 unprocessed objects would exist at a given time. + const chanSize = 20 + + klog.V(4).Infof("Adding new worker. Current active workers %d", len(v.workers)) + + v.Lock() + defer v.Unlock() + + newWorker := make(chan *storagev1.VolumeAttachment, chanSize) + v.workers = append(v.workers, newWorker) + + klog.V(4).Infof("Successfully added new worker %v. Current active workers %d", newWorker, len(v.workers)) + return newWorker +} + +// DeleteWorker is the method used to delete an existing worker +func (v *VolumeAttachmentHandler) DeleteWorker(desiredWorker chan *storagev1.VolumeAttachment) { + klog.V(4).Infof("Deleting an existing worker %v. Current active workers %d", desiredWorker, len(v.workers)) + + v.Lock() + defer v.Unlock() + + finalWorkers := []chan *storagev1.VolumeAttachment{} + + for i, worker := range v.workers { + if worker == desiredWorker { + close(worker) + klog.V(4).Infof("Deleting worker %d from worker list", i) + } else { + finalWorkers = append(finalWorkers, worker) + } + } + + v.workers = finalWorkers + klog.V(4).Infof("Successfully removed worker. Current active workers %d", len(v.workers)) +} diff --git a/pkg/util/provider/machinecontroller/controller.go b/pkg/util/provider/machinecontroller/controller.go index df153bcd7..4275b1a21 100644 --- a/pkg/util/provider/machinecontroller/controller.go +++ b/pkg/util/provider/machinecontroller/controller.go @@ -28,18 +28,23 @@ import ( machineinformers "github.com/gardener/machine-controller-manager/pkg/client/informers/externalversions/machine/v1alpha1" machinelisters "github.com/gardener/machine-controller-manager/pkg/client/listers/machine/v1alpha1" "github.com/gardener/machine-controller-manager/pkg/handlers" + "github.com/gardener/machine-controller-manager/pkg/util/k8sutils" + "github.com/gardener/machine-controller-manager/pkg/util/provider/drain" "github.com/gardener/machine-controller-manager/pkg/util/provider/driver" "github.com/gardener/machine-controller-manager/pkg/util/provider/options" "github.com/prometheus/client_golang/prometheus" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" runtimeutil "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" coreinformers "k8s.io/client-go/informers/core/v1" policyinformers "k8s.io/client-go/informers/policy/v1beta1" + storageinformers "k8s.io/client-go/informers/storage/v1" "k8s.io/client-go/kubernetes" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1beta1" + storagelisters "k8s.io/client-go/listers/storage/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -75,6 +80,7 @@ func NewController( secretInformer coreinformers.SecretInformer, nodeInformer coreinformers.NodeInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer, + volumeAttachmentInformer storageinformers.VolumeAttachmentInformer, machineClassInformer machineinformers.MachineClassInformer, machineInformer machineinformers.MachineInformer, recorder record.EventRecorder, @@ -82,6 +88,15 @@ func NewController( nodeConditions string, bootstrapTokenAuthExtraGroups string, ) (Controller, error) { + const ( + // volumeAttachmentGroupName group name + volumeAttachmentGroupName = "storage.k8s.io" + // volumenAttachmentKind is the kind used for VolumeAttachment + volumeAttachmentResourceName = "volumeattachments" + // volumeAttachmentResource is the kind used for VolumeAttachment + volumeAttachmentResourceKind = "VolumeAttachment" + ) + controller := &controller{ namespace: namespace, controlMachineClient: controlMachineClient, @@ -98,6 +113,7 @@ func NewController( nodeConditions: nodeConditions, driver: driver, bootstrapTokenAuthExtraGroups: bootstrapTokenAuthExtraGroups, + volumeAttachmentHandler: nil, } controller.internalExternalScheme = runtime.NewScheme() @@ -119,13 +135,18 @@ func NewController( controller.pvLister = pvInformer.Lister() controller.secretLister = secretInformer.Lister() controller.pdbLister = pdbInformer.Lister() + // TODO: Need to handle K8s versions below 1.13 differently + controller.volumeAttachementLister = volumeAttachmentInformer.Lister() controller.machineClassLister = machineClassInformer.Lister() controller.nodeLister = nodeInformer.Lister() controller.machineLister = machineInformer.Lister() // Controller syncs + controller.pvcSynced = pvcInformer.Informer().HasSynced + controller.pvSynced = pvInformer.Informer().HasSynced controller.secretSynced = secretInformer.Informer().HasSynced controller.pdbSynced = pdbInformer.Informer().HasSynced + controller.volumeAttachementSynced = volumeAttachmentInformer.Informer().HasSynced controller.machineClassSynced = machineClassInformer.Informer().HasSynced controller.nodeSynced = nodeInformer.Informer().HasSynced controller.machineSynced = machineInformer.Informer().HasSynced @@ -169,18 +190,31 @@ func NewController( }) // MachineSafety Controller Informers - // We follow the kubernetes way of reconciling the safety controller // done by adding empty key objects. We initialize it, to trigger // running of different safety loop on MCM startup. controller.machineSafetyOrphanVMsQueue.Add("") controller.machineSafetyAPIServerQueue.Add("") - machineInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ // deleteMachineToSafety makes sure that orphan VM handler is invoked DeleteFunc: controller.deleteMachineToSafety, }) + // Drain Controller Informers + if k8sutils.IsResourceSupported( + targetCoreClient, + schema.GroupResource{ + Group: k8sutils.VolumeAttachmentGroupName, + Resource: k8sutils.VolumeAttachmentResourceName, + }, + ) { + controller.volumeAttachmentHandler = drain.NewVolumeAttachmentHandler() + volumeAttachmentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.volumeAttachmentHandler.AddVolumeAttachment, + UpdateFunc: controller.volumeAttachmentHandler.UpdateVolumeAttachment, + }) + } + return controller, nil } @@ -202,19 +236,21 @@ type controller struct { controlCoreClient kubernetes.Interface targetCoreClient kubernetes.Interface - recorder record.EventRecorder - safetyOptions options.SafetyOptions - internalExternalScheme *runtime.Scheme - driver driver.Driver + recorder record.EventRecorder + safetyOptions options.SafetyOptions + internalExternalScheme *runtime.Scheme + driver driver.Driver + volumeAttachmentHandler *drain.VolumeAttachmentHandler // listers - pvcLister corelisters.PersistentVolumeClaimLister - pvLister corelisters.PersistentVolumeLister - secretLister corelisters.SecretLister - nodeLister corelisters.NodeLister - pdbLister policylisters.PodDisruptionBudgetLister - machineClassLister machinelisters.MachineClassLister - machineLister machinelisters.MachineLister + pvcLister corelisters.PersistentVolumeClaimLister + pvLister corelisters.PersistentVolumeLister + secretLister corelisters.SecretLister + nodeLister corelisters.NodeLister + pdbLister policylisters.PodDisruptionBudgetLister + volumeAttachementLister storagelisters.VolumeAttachmentLister + machineClassLister machinelisters.MachineClassLister + machineLister machinelisters.MachineLister // queues secretQueue workqueue.RateLimitingInterface nodeQueue workqueue.RateLimitingInterface @@ -223,11 +259,14 @@ type controller struct { machineSafetyOrphanVMsQueue workqueue.RateLimitingInterface machineSafetyAPIServerQueue workqueue.RateLimitingInterface // syncs - secretSynced cache.InformerSynced - pdbSynced cache.InformerSynced - nodeSynced cache.InformerSynced - machineClassSynced cache.InformerSynced - machineSynced cache.InformerSynced + pvcSynced cache.InformerSynced + pvSynced cache.InformerSynced + secretSynced cache.InformerSynced + pdbSynced cache.InformerSynced + volumeAttachementSynced cache.InformerSynced + nodeSynced cache.InformerSynced + machineClassSynced cache.InformerSynced + machineSynced cache.InformerSynced } func (c *controller) Run(workers int, stopCh <-chan struct{}) { @@ -244,7 +283,7 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) { defer c.machineSafetyOrphanVMsQueue.ShutDown() defer c.machineSafetyAPIServerQueue.ShutDown() - if !cache.WaitForCacheSync(stopCh, c.secretSynced, c.pdbSynced, c.nodeSynced, c.machineClassSynced, c.machineSynced) { + if !cache.WaitForCacheSync(stopCh, c.secretSynced, c.pvcSynced, c.pvSynced, c.pdbSynced, c.volumeAttachementSynced, c.nodeSynced, c.machineClassSynced, c.machineSynced) { runtimeutil.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } diff --git a/pkg/util/provider/machinecontroller/machine.go b/pkg/util/provider/machinecontroller/machine.go index 77d21e20f..df0cf2bb6 100644 --- a/pkg/util/provider/machinecontroller/machine.go +++ b/pkg/util/provider/machinecontroller/machine.go @@ -46,17 +46,17 @@ import ( Machine controller - Machine add, update, delete watches */ func (c *controller) addMachine(obj interface{}) { - klog.V(4).Infof("Adding machine object") + klog.V(5).Infof("Adding machine object") c.enqueueMachine(obj) } func (c *controller) updateMachine(oldObj, newObj interface{}) { - klog.V(4).Info("Updating machine object") + klog.V(5).Info("Updating machine object") c.enqueueMachine(newObj) } func (c *controller) deleteMachine(obj interface{}) { - klog.V(4).Info("Deleting machine object") + klog.V(5).Info("Deleting machine object") c.enqueueMachine(obj) } @@ -73,14 +73,14 @@ func (c *controller) isToBeEnqueued(obj interface{}) (bool, string) { func (c *controller) enqueueMachine(obj interface{}) { if toBeEnqueued, key := c.isToBeEnqueued(obj); toBeEnqueued { - klog.V(4).Infof("Adding machine object to the queue %q", key) + klog.V(5).Infof("Adding machine object to the queue %q", key) c.machineQueue.Add(key) } } func (c *controller) enqueueMachineAfter(obj interface{}, after time.Duration) { if toBeEnqueued, key := c.isToBeEnqueued(obj); toBeEnqueued { - klog.V(4).Infof("Adding machine object to the queue %q after %s", key, after) + klog.V(5).Infof("Adding machine object to the queue %q after %s", key, after) c.machineQueue.AddAfter(key, after) } } @@ -102,7 +102,7 @@ func (c *controller) reconcileClusterMachineKey(key string) error { } retryPeriod, err := c.reconcileClusterMachine(machine) - klog.V(4).Info(err, retryPeriod) + klog.V(5).Info(err, retryPeriod) c.enqueueMachineAfter(machine, time.Duration(retryPeriod)) @@ -110,8 +110,8 @@ func (c *controller) reconcileClusterMachineKey(key string) error { } func (c *controller) reconcileClusterMachine(machine *v1alpha1.Machine) (machineutils.RetryPeriod, error) { - klog.V(4).Infof("Start Reconciling machine: %q , nodeName: %q ,providerID: %q", machine.Name, getNodeName(machine), getProviderID(machine)) - defer klog.V(4).Infof("Stop Reconciling machine %q, nodeName: %q ,providerID: %q", machine.Name, getNodeName(machine), getProviderID(machine)) + klog.V(5).Infof("Start Reconciling machine: %q , nodeName: %q ,providerID: %q", machine.Name, getNodeName(machine), getProviderID(machine)) + defer klog.V(5).Infof("Stop Reconciling machine %q, nodeName: %q ,providerID: %q", machine.Name, getNodeName(machine), getProviderID(machine)) if c.safetyOptions.MachineControllerFrozen && machine.DeletionTimestamp == nil { // If Machine controller is frozen and @@ -199,7 +199,7 @@ func (c *controller) addNodeToMachine(obj interface{}) { } if machine.Status.CurrentStatus.Phase != v1alpha1.MachineCrashLoopBackOff && nodeConditionsHaveChanged(machine.Status.Conditions, node.Status.Conditions) { - klog.V(4).Infof("Enqueue machine object %q as conditions of backing node %q have changed", machine.Name, getNodeName(machine)) + klog.V(5).Infof("Enqueue machine object %q as conditions of backing node %q have changed", machine.Name, getNodeName(machine)) c.enqueueMachine(machine) } } diff --git a/pkg/util/provider/machinecontroller/machine_safety.go b/pkg/util/provider/machinecontroller/machine_safety.go index cb684bfe9..a5c5941e0 100644 --- a/pkg/util/provider/machinecontroller/machine_safety.go +++ b/pkg/util/provider/machinecontroller/machine_safety.go @@ -104,7 +104,7 @@ func (c *controller) reconcileClusterMachineSafetyAPIServer(key string) error { return err } - klog.V(2).Info("SafetyController: Reinitializing machine health check for machine: %q with backing node: %q and providerID: %q", machine.Name, getNodeName(machine), getProviderID(machine)) + klog.V(2).Infof("SafetyController: Reinitializing machine health check for machine: %q with backing node: %q and providerID: %q", machine.Name, getNodeName(machine), getProviderID(machine)) } // En-queue after 30 seconds, to ensure all machine states are reconciled diff --git a/pkg/util/provider/machinecontroller/machine_util.go b/pkg/util/provider/machinecontroller/machine_util.go index ecd274b43..953813338 100644 --- a/pkg/util/provider/machinecontroller/machine_util.go +++ b/pkg/util/provider/machinecontroller/machine_util.go @@ -926,6 +926,7 @@ func (c *controller) drainNode(deleteMachineRequest *driver.DeleteMachineRequest machine = deleteMachineRequest.Machine maxEvictRetries = int32(math.Min(float64(*c.getEffectiveMaxEvictRetries(machine)), c.getEffectiveDrainTimeout(machine).Seconds()/drain.PodEvictionRetryInterval.Seconds())) pvDetachTimeOut = c.safetyOptions.PvDetachTimeout.Duration + pvReattachTimeOut = c.safetyOptions.PvReattachTimeout.Duration timeOutDuration = c.getEffectiveDrainTimeout(deleteMachineRequest.Machine).Duration forceDeleteLabelPresent = machine.Labels["force-deletion"] == "True" nodeName = machine.Labels["node"] @@ -1016,6 +1017,7 @@ func (c *controller) drainNode(deleteMachineRequest *driver.DeleteMachineRequest timeOutDuration, maxEvictRetries, pvDetachTimeOut, + pvReattachTimeOut, nodeName, -1, forceDeletePods, @@ -1028,6 +1030,8 @@ func (c *controller) drainNode(deleteMachineRequest *driver.DeleteMachineRequest c.pvcLister, c.pvLister, c.pdbLister, + c.nodeLister, + c.volumeAttachmentHandler, ) err = drainOptions.RunDrain() if err == nil { diff --git a/pkg/util/provider/machinecontroller/secret.go b/pkg/util/provider/machinecontroller/secret.go index 3d2831d8e..2495d9352 100644 --- a/pkg/util/provider/machinecontroller/secret.go +++ b/pkg/util/provider/machinecontroller/secret.go @@ -59,10 +59,10 @@ func (c *controller) reconcileClusterSecretKey(key string) error { func (c *controller) reconcileClusterSecret(secret *corev1.Secret) error { startTime := time.Now() - klog.V(4).Infof("Start syncing %q", secret.Name) + klog.V(5).Infof("Start syncing %q", secret.Name) defer func() { c.enqueueSecretAfter(secret, 10*time.Minute) - klog.V(4).Infof("Finished syncing %q (%v)", secret.Name, time.Since(startTime)) + klog.V(5).Infof("Finished syncing %q (%v)", secret.Name, time.Since(startTime)) }() // Check if machineClasses are referring to this secret diff --git a/pkg/util/provider/options/types.go b/pkg/util/provider/options/types.go index c2448eef4..5a752b692 100644 --- a/pkg/util/provider/options/types.go +++ b/pkg/util/provider/options/types.go @@ -104,6 +104,8 @@ type SafetyOptions struct { MaxEvictRetries int32 // Timeout (in duration) used while waiting for PV to detach PvDetachTimeout metav1.Duration + // Timeout (in duration) used while waiting for PV to reattach on new node + PvReattachTimeout metav1.Duration // Timeout (in duration) for which the APIServer can be down before // declare the machine controller frozen by safety controller