diff --git a/pkg/util/testingjobs/pod/wrappers.go b/pkg/util/testingjobs/pod/wrappers.go index 2def9f9f92..aca94e818f 100644 --- a/pkg/util/testingjobs/pod/wrappers.go +++ b/pkg/util/testingjobs/pod/wrappers.go @@ -84,6 +84,12 @@ func (p *PodWrapper) Queue(q string) *PodWrapper { return p.Label(constants.QueueLabel, q) } +// Queue updates the queue name of the Pod +func (p *PodWrapper) PriorityClass(pc string) *PodWrapper { + p.Spec.PriorityClassName = pc + return p +} + // Name updated the name of the pod func (p *PodWrapper) Name(n string) *PodWrapper { p.ObjectMeta.Name = n diff --git a/test/e2e/singlecluster/pod_test.go b/test/e2e/singlecluster/pod_test.go index dbff1c8e82..77f514e59f 100644 --- a/test/e2e/singlecluster/pod_test.go +++ b/test/e2e/singlecluster/pod_test.go @@ -20,7 +20,10 @@ import ( "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/version" "k8s.io/client-go/discovery" "sigs.k8s.io/controller-runtime/pkg/client" @@ -71,6 +74,9 @@ var _ = ginkgo.Describe("Pod groups", func() { ResourceGroup( *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(), ). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). Obj() gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed()) lq = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() @@ -223,6 +229,160 @@ var _ = ginkgo.Describe("Pod groups", func() { util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"}) }) + + ginkgo.It("should allow to schedule a group of diverse pods", func() { + group := podtesting.MakePod("group", ns.Name). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). + Queue(lq.Name). + Request(corev1.ResourceCPU, "3"). + MakeGroup(2) + gKey := client.ObjectKey{Namespace: ns.Name, Name: "group"} + + // make the group of pods diverse using different amount of resources + group[0].Spec.Containers[0].Resources.Requests[corev1.ResourceCPU] = resource.MustParse("2") + + ginkgo.By("Group starts", func() { + for _, p := range group { + gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed()) + } + gomega.Eventually(func(g gomega.Gomega) { + for _, origPod := range group { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Group completes", func() { + util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"}) + }) + ginkgo.By("Deleting finished Pods", func() { + for _, p := range group { + gomega.Expect(k8sClient.Delete(ctx, p)).To(gomega.Succeed()) + } + gomega.Eventually(func(g gomega.Gomega) { + for _, p := range group { + var pCopy corev1.Pod + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(p), &pCopy)).To(testing.BeNotFoundError()) + } + var wl kueue.Workload + g.Expect(k8sClient.Get(ctx, gKey, &wl)).Should(testing.BeNotFoundError()) + }, util.Timeout, util.Interval) + }) + }) + + ginkgo.It("should allow to preempt the lower priority group", func() { + highPriorityClass := testing.MakePriorityClass("high").PriorityValue(100).Obj() + gomega.Expect(k8sClient.Create(ctx, highPriorityClass)) + ginkgo.DeferCleanup(func() { + gomega.Expect(k8sClient.Delete(ctx, highPriorityClass)).To(gomega.Succeed()) + }) + + defaultPriorityGroup := podtesting.MakePod("default-priority-group", ns.Name). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"-termination-code=1", "10min"}). + Queue(lq.Name). + Request(corev1.ResourceCPU, "2"). + MakeGroup(2) + defaultGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "default-priority-group"} + + ginkgo.By("Default-priority group starts", func() { + for _, p := range defaultPriorityGroup { + gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed()) + } + gomega.Eventually(func(g gomega.Gomega) { + for _, origPod := range defaultPriorityGroup { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + highPriorityGroup := podtesting.MakePod("high-priority-group", ns.Name). + Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}). + Queue(lq.Name). + PriorityClass("high"). + Request(corev1.ResourceCPU, "1"). + MakeGroup(2) + highGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "high-priority-group"} + + ginkgo.By("Create the high-priority group", func() { + for _, p := range highPriorityGroup { + gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed()) + } + gomega.Eventually(func(g gomega.Gomega) { + for _, origPod := range highPriorityGroup { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("The default priority workload is preempted", func() { + var updatedWorkload kueue.Workload + gomega.Expect(k8sClient.Get(ctx, defaultGroupKey, &updatedWorkload)).To(gomega.Succeed()) + util.ExpectWorkloadsToBePreempted(ctx, k8sClient, &updatedWorkload) + }) + + replacementPods := make(map[types.NamespacedName]types.NamespacedName, len(defaultPriorityGroup)) + ginkgo.By("Create replacement pods as soon as the default-priority pods are Failed", func() { + gomega.Eventually(func(g gomega.Gomega) int { + for _, origPod := range defaultPriorityGroup { + origKey := client.ObjectKeyFromObject(origPod) + if _, found := replacementPods[origKey]; !found { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, origKey, &p)).To(gomega.Succeed()) + if p.Status.Phase == v1.PodFailed { + rep := origPod.DeepCopy() + // For replacement pods use args that let it complete fast. + rep.Name = "replacement-for-" + rep.Name + rep.Spec.Containers[0].Args = []string{"1ms"} + gomega.Expect(k8sClient.Create(ctx, rep)).To(gomega.Succeed()) + replacementPods[origKey] = client.ObjectKeyFromObject(rep) + } + } + } + return len(replacementPods) + }, util.Timeout, util.Interval).Should(gomega.Equal(len(defaultPriorityGroup))) + }) + + ginkgo.By("Verify the high-priority pods are scheduled", func() { + gomega.Eventually(func(g gomega.Gomega) { + for _, origPod := range highPriorityGroup { + var p corev1.Pod + gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + } + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("Verify the high priority group completes", func() { + util.ExpectWorkloadToFinish(ctx, k8sClient, highGroupKey) + }) + + ginkgo.By("Await for the replacement pods to be ungated", func() { + for _, replKey := range replacementPods { + gomega.Eventually(func(g gomega.Gomega) { + var p corev1.Pod + g.Expect(k8sClient.Get(ctx, replKey, &p)).To(gomega.Succeed()) + g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + } + }) + + // TODO(#1557): verify the default-priority workload is finished + ginkgo.By("Verify the replacement pods of the default priority workload complete", func() { + for _, replKey := range replacementPods { + gomega.Eventually(func(g gomega.Gomega) { + var p corev1.Pod + g.Expect(k8sClient.Get(ctx, replKey, &p)).To(gomega.Succeed()) + g.Expect(p.Status.Phase).To(gomega.Equal(v1.PodSucceeded)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + } + }) + }) }) })