From 0899f555fd9089b9378f37c150cc5e6ea61d1154 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Wed, 24 Jan 2024 17:35:37 +0100 Subject: [PATCH 1/3] WIP: Add more e2e tests for pod groups --- pkg/util/testingjobs/pod/wrappers.go | 6 ++ test/e2e/singlecluster/pod_test.go | 132 +++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) diff --git a/pkg/util/testingjobs/pod/wrappers.go b/pkg/util/testingjobs/pod/wrappers.go index 2def9f9f92..aca94e818f 100644 --- a/pkg/util/testingjobs/pod/wrappers.go +++ b/pkg/util/testingjobs/pod/wrappers.go @@ -84,6 +84,12 @@ func (p *PodWrapper) Queue(q string) *PodWrapper { return p.Label(constants.QueueLabel, q) } +// Queue updates the queue name of the Pod +func (p *PodWrapper) PriorityClass(pc string) *PodWrapper { + p.Spec.PriorityClassName = pc + return p +} + // Name updated the name of the pod func (p *PodWrapper) Name(n string) *PodWrapper { p.ObjectMeta.Name = n diff --git a/test/e2e/singlecluster/pod_test.go b/test/e2e/singlecluster/pod_test.go index dbff1c8e82..e859f043e3 100644 --- a/test/e2e/singlecluster/pod_test.go +++ b/test/e2e/singlecluster/pod_test.go @@ -20,6 +20,8 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/version" "k8s.io/client-go/discovery" @@ -71,6 +73,9 @@ var _ = ginkgo.Describe("Pod groups", func() { ResourceGroup( *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(), ). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). Obj() gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed()) lq = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() @@ -223,6 +228,133 @@ var _ = ginkgo.Describe("Pod groups", func() { util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"}) }) + + ginkgo.It("should allow to schedule a group of diverse pods", func() { + group := podtesting.MakePod("group", ns.Name). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). + Queue(lq.Name). + Request(corev1.ResourceCPU, "3"). + MakeGroup(2) + gKey := client.ObjectKey{Namespace: ns.Name, Name: "group"} + + // make the group of pods diverse using different amount of resources + group[0].Spec.Containers[0].Resources.Requests[corev1.ResourceCPU] = resource.MustParse("2") + + ginkgo.By("Group starts", func() { + for _, p := range group { + gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed()) + } + gomega.Eventually(func(g gomega.Gomega) { + for _, origPod := range group { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Group completes", func() { + util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"}) + }) + ginkgo.By("Deleting finished Pods", func() { + for _, p := range group { + gomega.Expect(k8sClient.Delete(ctx, p)).To(gomega.Succeed()) + } + gomega.Eventually(func(g gomega.Gomega) { + for _, p := range group { + var pCopy corev1.Pod + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(p), &pCopy)).To(testing.BeNotFoundError()) + } + var wl kueue.Workload + g.Expect(k8sClient.Get(ctx, gKey, &wl)).Should(testing.BeNotFoundError()) + }, util.Timeout, util.Interval) + }) + }) + + ginkgo.It("should allow to preempt the lower priority group", func() { + highPriorityClass := testing.MakePriorityClass("high").PriorityValue(100).Obj() + gomega.Expect(k8sClient.Create(ctx, highPriorityClass)) + ginkgo.DeferCleanup(func() { + gomega.Expect(k8sClient.Delete(ctx, highPriorityClass)).To(gomega.Succeed()) + }) + + defaultPriorityGroup := podtesting.MakePod("default-priority-group", ns.Name). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"-termination-code=1", "10m"}). + Queue(lq.Name). + Request(corev1.ResourceCPU, "2"). + MakeGroup(2) + + ginkgo.By("Default-priority group starts", func() { + for _, p := range defaultPriorityGroup { + gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed()) + } + gomega.Eventually(func(g gomega.Gomega) { + for _, origPod := range defaultPriorityGroup { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + highPriorityGroup := podtesting.MakePod("high-priority-group", ns.Name). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). + Queue(lq.Name). + PriorityClass("high"). + Request(corev1.ResourceCPU, "1"). + MakeGroup(2) + highGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "high-priority-group"} + + ginkgo.By("High-priority group starts", func() { + for _, p := range highPriorityGroup { + gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed()) + } + gomega.Eventually(func(g gomega.Gomega) { + for _, origPod := range highPriorityGroup { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("High priority group completes", func() { + util.ExpectWorkloadToFinish(ctx, k8sClient, highGroupKey) + }) + + var replacementPods []*corev1.Pod + ginkgo.By("Recreate the replacement pods for the default-priority group", func() { + for i := range defaultPriorityGroup { + rep := defaultPriorityGroup[i].DeepCopy() + // For replacement pods use image that completes fast. + rep.Name = "replacement-for-" + rep.Name + rep.Spec.Containers[0].Args = []string{"3s"} + replacementPods = append(replacementPods, rep) + gomega.Expect(k8sClient.Create(ctx, rep)).To(gomega.Succeed()) + } + }) + + ginkgo.By("Await for the replacement pods to be ungated", func() { + for _, rep := range replacementPods { + gomega.Eventually(func(g gomega.Gomega) { + var p corev1.Pod + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rep), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + } + }) + + // TODO(#1557): verify the default-priority workload is finished + ginkgo.By("Verify the replacement pods of the default priority workload complete", func() { + for _, rep := range replacementPods { + gomega.Eventually(func(g gomega.Gomega) { + var p corev1.Pod + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rep), &p)).To(gomega.Succeed()) + g.Expect(p.Status.Phase).To(gomega.Equal(v1.PodSucceeded)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + } + }) + }) }) }) From cdca39c8b303d6a0de4decdf041ade997fbe54e9 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Fri, 26 Jan 2024 11:32:46 +0100 Subject: [PATCH 2/3] cleanup --- test/e2e/singlecluster/pod_test.go | 64 +++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 18 deletions(-) diff --git a/test/e2e/singlecluster/pod_test.go b/test/e2e/singlecluster/pod_test.go index e859f043e3..4ec4d7b008 100644 --- a/test/e2e/singlecluster/pod_test.go +++ b/test/e2e/singlecluster/pod_test.go @@ -23,6 +23,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/version" "k8s.io/client-go/discovery" "sigs.k8s.io/controller-runtime/pkg/client" @@ -279,10 +280,11 @@ var _ = ginkgo.Describe("Pod groups", func() { }) defaultPriorityGroup := podtesting.MakePod("default-priority-group", ns.Name). - Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"-termination-code=1", "10m"}). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"-termination-code=1", "10min"}). Queue(lq.Name). Request(corev1.ResourceCPU, "2"). MakeGroup(2) + defaultGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "default-priority-group"} ginkgo.By("Default-priority group starts", func() { for _, p := range defaultPriorityGroup { @@ -305,7 +307,7 @@ var _ = ginkgo.Describe("Pod groups", func() { MakeGroup(2) highGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "high-priority-group"} - ginkgo.By("High-priority group starts", func() { + ginkgo.By("Create the high-priority group", func() { for _, p := range highPriorityGroup { gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed()) } @@ -318,27 +320,53 @@ var _ = ginkgo.Describe("Pod groups", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) - ginkgo.By("High priority group completes", func() { - util.ExpectWorkloadToFinish(ctx, k8sClient, highGroupKey) + ginkgo.By("The default priority workload is preempted", func() { + var updatedWorkload kueue.Workload + gomega.Expect(k8sClient.Get(ctx, defaultGroupKey, &updatedWorkload)).To(gomega.Succeed()) + util.ExpectWorkloadsToBePreempted(ctx, k8sClient, &updatedWorkload) }) - var replacementPods []*corev1.Pod - ginkgo.By("Recreate the replacement pods for the default-priority group", func() { - for i := range defaultPriorityGroup { - rep := defaultPriorityGroup[i].DeepCopy() - // For replacement pods use image that completes fast. - rep.Name = "replacement-for-" + rep.Name - rep.Spec.Containers[0].Args = []string{"3s"} - replacementPods = append(replacementPods, rep) - gomega.Expect(k8sClient.Create(ctx, rep)).To(gomega.Succeed()) - } + replacementPods := make(map[types.NamespacedName]types.NamespacedName, len(defaultPriorityGroup)) + ginkgo.By("Create replacement pods as soon as the default-priority pods are Failed", func() { + gomega.Eventually(func(g gomega.Gomega) int { + for _, origPod := range defaultPriorityGroup { + origKey := client.ObjectKeyFromObject(origPod) + if _, found := replacementPods[origKey]; !found { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, origKey, &p)).To(gomega.Succeed()) + if p.Status.Phase == v1.PodFailed { + rep := origPod.DeepCopy() + // For replacement pods use args that let it complete fast. + rep.Name = "replacement-for-" + rep.Name + rep.Spec.Containers[0].Args = []string{"1ms"} + gomega.Expect(k8sClient.Create(ctx, rep)).To(gomega.Succeed()) + replacementPods[origKey] = client.ObjectKeyFromObject(rep) + } + } + } + return len(replacementPods) + }, util.Timeout, util.Interval).Should(gomega.BeNumerically("==", len(defaultPriorityGroup))) + }) + + ginkgo.By("Verify the high-priority pods are scheduled", func() { + gomega.Eventually(func(g gomega.Gomega) { + for _, origPod := range highPriorityGroup { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Verify the high priority group completes", func() { + util.ExpectWorkloadToFinish(ctx, k8sClient, highGroupKey) }) ginkgo.By("Await for the replacement pods to be ungated", func() { - for _, rep := range replacementPods { + for _, replKey := range replacementPods { gomega.Eventually(func(g gomega.Gomega) { var p corev1.Pod - g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rep), &p)).To(gomega.Succeed()) + g.Expect(k8sClient.Get(ctx, replKey, &p)).To(gomega.Succeed()) g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) }, util.Timeout, util.Interval).Should(gomega.Succeed()) } @@ -346,10 +374,10 @@ var _ = ginkgo.Describe("Pod groups", func() { // TODO(#1557): verify the default-priority workload is finished ginkgo.By("Verify the replacement pods of the default priority workload complete", func() { - for _, rep := range replacementPods { + for _, replKey := range replacementPods { gomega.Eventually(func(g gomega.Gomega) { var p corev1.Pod - g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rep), &p)).To(gomega.Succeed()) + g.Expect(k8sClient.Get(ctx, replKey, &p)).To(gomega.Succeed()) g.Expect(p.Status.Phase).To(gomega.Equal(v1.PodSucceeded)) }, util.Timeout, util.Interval).Should(gomega.Succeed()) } From 04dd696cc4f519e29c9bae1a7f42f24c4b91dcdf Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 29 Jan 2024 09:47:43 +0100 Subject: [PATCH 3/3] review comment --- test/e2e/singlecluster/pod_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/e2e/singlecluster/pod_test.go b/test/e2e/singlecluster/pod_test.go index 4ec4d7b008..77f514e59f 100644 --- a/test/e2e/singlecluster/pod_test.go +++ b/test/e2e/singlecluster/pod_test.go @@ -345,7 +345,7 @@ var _ = ginkgo.Describe("Pod groups", func() { } } return len(replacementPods) - }, util.Timeout, util.Interval).Should(gomega.BeNumerically("==", len(defaultPriorityGroup))) + }, util.Timeout, util.Interval).Should(gomega.Equal(len(defaultPriorityGroup))) }) ginkgo.By("Verify the high-priority pods are scheduled", func() {