Skip to content

Commit

Permalink
Pod groups: e2e tests for diverse pods and preemption (#1638)
Browse files Browse the repository at this point in the history
* WIP: Add more e2e tests for pod groups

* cleanup

* review comment
  • Loading branch information
mimowo authored Jan 29, 2024
1 parent c46f47a commit cb3c2f6
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/pod/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (p *PodWrapper) Queue(q string) *PodWrapper {
return p.Label(constants.QueueLabel, q)
}

// Queue updates the queue name of the Pod
func (p *PodWrapper) PriorityClass(pc string) *PodWrapper {
p.Spec.PriorityClassName = pc
return p
}

// Name updated the name of the pod
func (p *PodWrapper) Name(n string) *PodWrapper {
p.ObjectMeta.Name = n
Expand Down
160 changes: 160 additions & 0 deletions test/e2e/singlecluster/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -71,6 +74,9 @@ var _ = ginkgo.Describe("Pod groups", func() {
ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed())
lq = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj()
Expand Down Expand Up @@ -223,6 +229,160 @@ var _ = ginkgo.Describe("Pod groups", func() {

util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"})
})

ginkgo.It("should allow to schedule a group of diverse pods", func() {
group := podtesting.MakePod("group", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Queue(lq.Name).
Request(corev1.ResourceCPU, "3").
MakeGroup(2)
gKey := client.ObjectKey{Namespace: ns.Name, Name: "group"}

// make the group of pods diverse using different amount of resources
group[0].Spec.Containers[0].Resources.Requests[corev1.ResourceCPU] = resource.MustParse("2")

ginkgo.By("Group starts", func() {
for _, p := range group {
gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range group {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Group completes", func() {
util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"})
})
ginkgo.By("Deleting finished Pods", func() {
for _, p := range group {
gomega.Expect(k8sClient.Delete(ctx, p)).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, p := range group {
var pCopy corev1.Pod
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(p), &pCopy)).To(testing.BeNotFoundError())
}
var wl kueue.Workload
g.Expect(k8sClient.Get(ctx, gKey, &wl)).Should(testing.BeNotFoundError())
}, util.Timeout, util.Interval)
})
})

ginkgo.It("should allow to preempt the lower priority group", func() {
highPriorityClass := testing.MakePriorityClass("high").PriorityValue(100).Obj()
gomega.Expect(k8sClient.Create(ctx, highPriorityClass))
ginkgo.DeferCleanup(func() {
gomega.Expect(k8sClient.Delete(ctx, highPriorityClass)).To(gomega.Succeed())
})

defaultPriorityGroup := podtesting.MakePod("default-priority-group", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"-termination-code=1", "10min"}).
Queue(lq.Name).
Request(corev1.ResourceCPU, "2").
MakeGroup(2)
defaultGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "default-priority-group"}

ginkgo.By("Default-priority group starts", func() {
for _, p := range defaultPriorityGroup {
gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range defaultPriorityGroup {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

highPriorityGroup := podtesting.MakePod("high-priority-group", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Queue(lq.Name).
PriorityClass("high").
Request(corev1.ResourceCPU, "1").
MakeGroup(2)
highGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "high-priority-group"}

ginkgo.By("Create the high-priority group", func() {
for _, p := range highPriorityGroup {
gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range highPriorityGroup {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("The default priority workload is preempted", func() {
var updatedWorkload kueue.Workload
gomega.Expect(k8sClient.Get(ctx, defaultGroupKey, &updatedWorkload)).To(gomega.Succeed())
util.ExpectWorkloadsToBePreempted(ctx, k8sClient, &updatedWorkload)
})

replacementPods := make(map[types.NamespacedName]types.NamespacedName, len(defaultPriorityGroup))
ginkgo.By("Create replacement pods as soon as the default-priority pods are Failed", func() {
gomega.Eventually(func(g gomega.Gomega) int {
for _, origPod := range defaultPriorityGroup {
origKey := client.ObjectKeyFromObject(origPod)
if _, found := replacementPods[origKey]; !found {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, origKey, &p)).To(gomega.Succeed())
if p.Status.Phase == v1.PodFailed {
rep := origPod.DeepCopy()
// For replacement pods use args that let it complete fast.
rep.Name = "replacement-for-" + rep.Name
rep.Spec.Containers[0].Args = []string{"1ms"}
gomega.Expect(k8sClient.Create(ctx, rep)).To(gomega.Succeed())
replacementPods[origKey] = client.ObjectKeyFromObject(rep)
}
}
}
return len(replacementPods)
}, util.Timeout, util.Interval).Should(gomega.Equal(len(defaultPriorityGroup)))
})

ginkgo.By("Verify the high-priority pods are scheduled", func() {
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range highPriorityGroup {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Verify the high priority group completes", func() {
util.ExpectWorkloadToFinish(ctx, k8sClient, highGroupKey)
})

ginkgo.By("Await for the replacement pods to be ungated", func() {
for _, replKey := range replacementPods {
gomega.Eventually(func(g gomega.Gomega) {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, replKey, &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}
})

// TODO(#1557): verify the default-priority workload is finished
ginkgo.By("Verify the replacement pods of the default priority workload complete", func() {
for _, replKey := range replacementPods {
gomega.Eventually(func(g gomega.Gomega) {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, replKey, &p)).To(gomega.Succeed())
g.Expect(p.Status.Phase).To(gomega.Equal(v1.PodSucceeded))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}
})
})
})
})

Expand Down

0 comments on commit cb3c2f6

Please sign in to comment.