Skip to content

Commit

Permalink
Merge pull request #11531 from k8s-infra-cherrypick-robot/cherry-pick…
Browse files Browse the repository at this point in the history
…-11526-to-release-1.9

[release-1.9] 🌱 test: add options for additional resources and verify volume detach to node drain test
  • Loading branch information
k8s-ci-robot authored Dec 4, 2024
2 parents 5cb86c2 + 1aa8c01 commit 63592b4
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 7 deletions.
96 changes: 89 additions & 7 deletions test/e2e/node_drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ type NodeDrainTimeoutSpecInput struct {
// Allows to inject a function to be run after test namespace is created.
// If not specified, this is a no-op.
PostNamespaceCreated func(managementClusterProxy framework.ClusterProxy, workloadClusterNamespace string)

// Enables additional verification for volumes blocking machine deletion.
// Requires to add appropriate resources via CreateAdditionalResources.
VerifyNodeVolumeDetach bool

// Allows to overwrite the default function used for unblocking volume detachments.
UnblockNodeVolumeDetachment func(ctx context.Context, bootstrapClusterProxy framework.ClusterProxy, cluster *clusterv1.Cluster)

// Allows to create additional resources.
CreateAdditionalResources func(ctx context.Context, clusterProxy framework.ClusterProxy, cluster *clusterv1.Cluster)
}

// NodeDrainTimeoutSpec goes through the following steps:
Expand All @@ -72,13 +82,16 @@ type NodeDrainTimeoutSpecInput struct {
// * Deploy MachineDrainRules
// * Deploy Deployment with unevictable Pods on CP & MD Nodes
// * Deploy Deployment with evictable Pods with finalizer on CP & MD Nodes
// * Deploy additional resources if defined in input
// * Trigger Node drain by scaling down the control plane to 1 and MachineDeployments to 0
// * Get draining control plane and MachineDeployment Machines
// * Verify drain of Deployments with order 1
// * Verify drain of Deployments with order 5
// * Verify skipped Pods are still there and don't have a deletionTimestamp
// * Verify Node drains for control plane and MachineDeployment Machines are blocked (only by PDBs)
// * Set NodeDrainTimeout to 1s to unblock Node drain
// * Verify machine deletion is blocked by waiting for volume detachment (only if VerifyNodeVolumeDetach is enabled)
// * Unblocks waiting for volume detachment (only if VerifyNodeVolumeDetach is enabled)
// * Verify scale down succeeded because Node drains were unblocked.
func NodeDrainTimeoutSpec(ctx context.Context, inputGetter func() NodeDrainTimeoutSpecInput) {
var (
Expand All @@ -100,6 +113,10 @@ func NodeDrainTimeoutSpec(ctx context.Context, inputGetter func() NodeDrainTimeo
Expect(input.E2EConfig.GetIntervals(specName, "wait-deployment-available")).ToNot(BeNil())
Expect(input.E2EConfig.GetIntervals(specName, "wait-machine-deleted")).ToNot(BeNil())

if input.VerifyNodeVolumeDetach && input.UnblockNodeVolumeDetachment == nil {
input.UnblockNodeVolumeDetachment = unblockNodeVolumeDetachmentFunc(input.E2EConfig.GetIntervals(specName, "wait-control-plane"), input.E2EConfig.GetIntervals(specName, "wait-worker-nodes"))
}

// Setup a Namespace where to host objects for this spec and create a watcher for the namespace events.
namespace, cancelWatches = framework.SetupSpecNamespace(ctx, specName, input.BootstrapClusterProxy, input.ArtifactFolder, input.PostNamespaceCreated)
clusterResources = new(clusterctl.ApplyClusterTemplateAndWaitResult)
Expand Down Expand Up @@ -147,6 +164,9 @@ func NodeDrainTimeoutSpec(ctx context.Context, inputGetter func() NodeDrainTimeo
Cluster: cluster,
ModifyControlPlaneTopology: func(topology *clusterv1.ControlPlaneTopology) {
topology.NodeDrainTimeout = &metav1.Duration{Duration: time.Duration(0)}
if input.VerifyNodeVolumeDetach {
topology.NodeVolumeDetachTimeout = &metav1.Duration{Duration: time.Duration(0)}
}
if topology.Metadata.Labels == nil {
topology.Metadata.Labels = map[string]string{}
}
Expand All @@ -159,6 +179,9 @@ func NodeDrainTimeoutSpec(ctx context.Context, inputGetter func() NodeDrainTimeo
Cluster: cluster,
ModifyMachineDeploymentTopology: func(topology *clusterv1.MachineDeploymentTopology) {
topology.NodeDrainTimeout = &metav1.Duration{Duration: time.Duration(0)}
if input.VerifyNodeVolumeDetach {
topology.NodeVolumeDetachTimeout = &metav1.Duration{Duration: time.Duration(0)}
}
if topology.Metadata.Labels == nil {
topology.Metadata.Labels = map[string]string{}
}
Expand All @@ -174,12 +197,14 @@ func NodeDrainTimeoutSpec(ctx context.Context, inputGetter func() NodeDrainTimeo
workloadClusterProxy := input.BootstrapClusterProxy.GetWorkloadCluster(ctx, cluster.Namespace, cluster.Name)

By("Deploy MachineDrainRules.")
Expect(input.BootstrapClusterProxy.GetClient().Create(ctx,
generateMachineDrainRule(namespace.Name, clusterName, "drain-order-1", 1))).To(Succeed())
Expect(input.BootstrapClusterProxy.GetClient().Create(ctx,
generateMachineDrainRule(namespace.Name, clusterName, "drain-order-5", 5))).To(Succeed())
Expect(input.BootstrapClusterProxy.GetClient().Create(ctx,
generateMachineDrainRule(namespace.Name, clusterName, "drain-order-10", 10))).To(Succeed())
machineDrainRules := []*clusterv1.MachineDrainRule{
generateMachineDrainRule(namespace.Name, clusterName, "drain-order-1", 1),
generateMachineDrainRule(namespace.Name, clusterName, "drain-order-5", 5),
generateMachineDrainRule(namespace.Name, clusterName, "drain-order-10", 10),
}
for _, rule := range machineDrainRules {
Expect(input.BootstrapClusterProxy.GetClient().Create(ctx, rule)).To(Succeed())
}

By("Deploy Deployment with unevictable Pods on control plane and MachineDeployment Nodes.")
framework.DeployUnevictablePod(ctx, framework.DeployUnevictablePodInput{
Expand Down Expand Up @@ -248,6 +273,10 @@ func NodeDrainTimeoutSpec(ctx context.Context, inputGetter func() NodeDrainTimeo
}
}

if input.CreateAdditionalResources != nil {
input.CreateAdditionalResources(ctx, input.BootstrapClusterProxy, cluster)
}

By("Trigger Node drain by scaling down the control plane to 1 and MachineDeployments to 0.")
modifyControlPlaneViaClusterAndWait(ctx, modifyControlPlaneViaClusterAndWaitInput{
ClusterProxy: input.BootstrapClusterProxy,
Expand Down Expand Up @@ -432,7 +461,35 @@ func NodeDrainTimeoutSpec(ctx context.Context, inputGetter func() NodeDrainTimeo
WaitForMachineDeployments: input.E2EConfig.GetIntervals(specName, "wait-worker-nodes"),
})

By("Verify scale down succeeded because Node drains were unblocked")
if input.VerifyNodeVolumeDetach {
By("Verify Node removal for control plane and MachineDeployment Machines are blocked (only by volume detachments)")
Eventually(func(g Gomega) {
waitingCPMachine := &clusterv1.Machine{}
g.Expect(input.BootstrapClusterProxy.GetClient().Get(ctx, drainingCPMachineKey, waitingCPMachine)).To(Succeed())

condition := conditions.Get(waitingCPMachine, clusterv1.VolumeDetachSucceededCondition)
g.Expect(condition).ToNot(BeNil())
g.Expect(condition.Status).To(Equal(corev1.ConditionFalse))
// Deletion still not be blocked because of the volume.
g.Expect(condition.Message).To(ContainSubstring("Waiting for node volumes to be detached"))
}, input.E2EConfig.GetIntervals(specName, "wait-machine-deleted")...).Should(Succeed())
for _, machineKey := range drainingMDMachineKeys {
Eventually(func(g Gomega) {
drainedMDMachine := &clusterv1.Machine{}
g.Expect(input.BootstrapClusterProxy.GetClient().Get(ctx, machineKey, drainedMDMachine)).To(Succeed())

condition := conditions.Get(drainedMDMachine, clusterv1.VolumeDetachSucceededCondition)
g.Expect(condition).ToNot(BeNil())
g.Expect(condition.Status).To(Equal(corev1.ConditionFalse)) // Deletion still not be blocked because of the volume.
g.Expect(condition.Message).To(ContainSubstring("Waiting for node volumes to be detached"))
}, input.E2EConfig.GetIntervals(specName, "wait-machine-deleted")...).Should(Succeed())
}

By("Executing input.UnblockNodeVolumeDetachment to unblock waiting for volume detachments")
input.UnblockNodeVolumeDetachment(ctx, input.BootstrapClusterProxy, cluster)
}

By("Verify scale down succeeded because Node drains and Volume detachments were unblocked")
// When we scale down the KCP, controlplane machines are deleted one by one, so it requires more time
// MD Machine deletion is done in parallel and will be faster.
nodeDrainTimeoutKCPInterval := getDrainAndDeleteInterval(input.E2EConfig.GetIntervals(specName, "wait-machine-deleted"), drainTimeout, controlPlaneReplicas)
Expand Down Expand Up @@ -641,3 +698,28 @@ func getDrainAndDeleteInterval(deleteInterval []interface{}, drainTimeout *metav
res := []interface{}{intervalDuration.String(), deleteInterval[1]}
return res
}

func unblockNodeVolumeDetachmentFunc(waitControlPlaneIntervals, waitWorkerNodeIntervals []interface{}) func(ctx context.Context, bootstrapClusterProxy framework.ClusterProxy, cluster *clusterv1.Cluster) {
return func(ctx context.Context, bootstrapClusterProxy framework.ClusterProxy, cluster *clusterv1.Cluster) {
By("Set NodeVolumeDetachTimeout to 1s to unblock waiting for volume detachments")
// Note: This also verifies that KCP & MachineDeployments are still propagating changes to NodeVolumeDetachTimeout down to
// Machines that already have a deletionTimestamp.
nodeVolumeDetachTimeout := &metav1.Duration{Duration: time.Duration(1) * time.Second}
modifyControlPlaneViaClusterAndWait(ctx, modifyControlPlaneViaClusterAndWaitInput{
ClusterProxy: bootstrapClusterProxy,
Cluster: cluster,
ModifyControlPlaneTopology: func(topology *clusterv1.ControlPlaneTopology) {
topology.NodeVolumeDetachTimeout = nodeVolumeDetachTimeout
},
WaitForControlPlane: waitControlPlaneIntervals,
})
modifyMachineDeploymentViaClusterAndWait(ctx, modifyMachineDeploymentViaClusterAndWaitInput{
ClusterProxy: bootstrapClusterProxy,
Cluster: cluster,
ModifyMachineDeploymentTopology: func(topology *clusterv1.MachineDeploymentTopology) {
topology.NodeVolumeDetachTimeout = nodeVolumeDetachTimeout
},
WaitForMachineDeployments: waitWorkerNodeIntervals,
})
}
}
44 changes: 44 additions & 0 deletions test/e2e/node_drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,18 @@ limitations under the License.
package e2e

import (
"context"
"fmt"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"

clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/test/framework"
)

var _ = Describe("When testing Node drain", func() {
Expand All @@ -34,6 +44,40 @@ var _ = Describe("When testing Node drain", func() {
SkipCleanup: skipCleanup,
Flavor: ptr.To("topology"),
InfrastructureProvider: ptr.To("docker"),
VerifyNodeVolumeDetach: true,
CreateAdditionalResources: func(ctx context.Context, clusterProxy framework.ClusterProxy, cluster *clusterv1.Cluster) {
workloadClusterClient := clusterProxy.GetWorkloadCluster(ctx, cluster.Namespace, cluster.Name).GetClient()

nodeList := &corev1.NodeList{}
Expect(workloadClusterClient.List(ctx, nodeList)).To(Succeed())

// Create a fake VolumeAttachment object for each Node without having a real backing csi driver.
for _, node := range nodeList.Items {
va := generateVolumeAttachment(node)
Expect(workloadClusterClient.Create(ctx, va)).To(Succeed())
// Set .Status.Attached to true to make the VolumeAttachment blocking for machine deletions.
va.Status.Attached = true
Expect(workloadClusterClient.Status().Update(ctx, va)).To(Succeed())
}
},
}
})
})

func generateVolumeAttachment(node corev1.Node) *storagev1.VolumeAttachment {
return &storagev1.VolumeAttachment{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("va-%s", node.GetName()),
Finalizers: []string{
"test.cluster.x-k8s.io/block",
},
},
Spec: storagev1.VolumeAttachmentSpec{
Attacher: "manual",
NodeName: node.GetName(),
Source: storagev1.VolumeAttachmentSource{
PersistentVolumeName: ptr.To("foo"),
},
},
}
}
4 changes: 4 additions & 0 deletions test/framework/convenience.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
storagev1 "k8s.io/api/storage/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -88,6 +89,9 @@ func TryAddDefaultSchemes(scheme *runtime.Scheme) {
// Add coordination to the schema
// Note: This is e.g. used to trigger kube-controller-manager restarts by stealing its lease.
_ = coordinationv1.AddToScheme(scheme)

// Add storagev1 to the scheme
_ = storagev1.AddToScheme(scheme)
}

// ObjectToKind returns the Kind without the package prefix. Pass in a pointer to a struct
Expand Down

0 comments on commit 63592b4

Please sign in to comment.