Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pod groups: e2e tests for diverse pods and preemption #1638

Merged
merged 3 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/pod/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ func (p *PodWrapper) Queue(q string) *PodWrapper {
return p.Label(constants.QueueLabel, q)
}

// Queue updates the queue name of the Pod
func (p *PodWrapper) PriorityClass(pc string) *PodWrapper {
p.Spec.PriorityClassName = pc
return p
}

// Name updated the name of the pod
func (p *PodWrapper) Name(n string) *PodWrapper {
p.ObjectMeta.Name = n
Expand Down
160 changes: 160 additions & 0 deletions test/e2e/singlecluster/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -71,6 +74,9 @@ var _ = ginkgo.Describe("Pod groups", func() {
ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(),
).
Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).
Obj()
gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed())
lq = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj()
Expand Down Expand Up @@ -223,6 +229,160 @@ var _ = ginkgo.Describe("Pod groups", func() {

util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"})
})

ginkgo.It("should allow to schedule a group of diverse pods", func() {
group := podtesting.MakePod("group", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Queue(lq.Name).
Request(corev1.ResourceCPU, "3").
MakeGroup(2)
gKey := client.ObjectKey{Namespace: ns.Name, Name: "group"}

// make the group of pods diverse using different amount of resources
group[0].Spec.Containers[0].Resources.Requests[corev1.ResourceCPU] = resource.MustParse("2")

ginkgo.By("Group starts", func() {
for _, p := range group {
gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range group {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Group completes", func() {
util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"})
})
ginkgo.By("Deleting finished Pods", func() {
for _, p := range group {
gomega.Expect(k8sClient.Delete(ctx, p)).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, p := range group {
var pCopy corev1.Pod
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(p), &pCopy)).To(testing.BeNotFoundError())
}
var wl kueue.Workload
g.Expect(k8sClient.Get(ctx, gKey, &wl)).Should(testing.BeNotFoundError())
}, util.Timeout, util.Interval)
})
})

ginkgo.It("should allow to preempt the lower priority group", func() {
highPriorityClass := testing.MakePriorityClass("high").PriorityValue(100).Obj()
gomega.Expect(k8sClient.Create(ctx, highPriorityClass))
ginkgo.DeferCleanup(func() {
gomega.Expect(k8sClient.Delete(ctx, highPriorityClass)).To(gomega.Succeed())
})

defaultPriorityGroup := podtesting.MakePod("default-priority-group", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"-termination-code=1", "10min"}).
Queue(lq.Name).
Request(corev1.ResourceCPU, "2").
MakeGroup(2)
defaultGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "default-priority-group"}

ginkgo.By("Default-priority group starts", func() {
for _, p := range defaultPriorityGroup {
gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range defaultPriorityGroup {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

highPriorityGroup := podtesting.MakePod("high-priority-group", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Queue(lq.Name).
PriorityClass("high").
Request(corev1.ResourceCPU, "1").
MakeGroup(2)
highGroupKey := client.ObjectKey{Namespace: ns.Name, Name: "high-priority-group"}

ginkgo.By("Create the high-priority group", func() {
for _, p := range highPriorityGroup {
gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range highPriorityGroup {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("The default priority workload is preempted", func() {
var updatedWorkload kueue.Workload
gomega.Expect(k8sClient.Get(ctx, defaultGroupKey, &updatedWorkload)).To(gomega.Succeed())
util.ExpectWorkloadsToBePreempted(ctx, k8sClient, &updatedWorkload)
})

replacementPods := make(map[types.NamespacedName]types.NamespacedName, len(defaultPriorityGroup))
ginkgo.By("Create replacement pods as soon as the default-priority pods are Failed", func() {
gomega.Eventually(func(g gomega.Gomega) int {
for _, origPod := range defaultPriorityGroup {
origKey := client.ObjectKeyFromObject(origPod)
if _, found := replacementPods[origKey]; !found {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, origKey, &p)).To(gomega.Succeed())
if p.Status.Phase == v1.PodFailed {
rep := origPod.DeepCopy()
// For replacement pods use args that let it complete fast.
rep.Name = "replacement-for-" + rep.Name
rep.Spec.Containers[0].Args = []string{"1ms"}
gomega.Expect(k8sClient.Create(ctx, rep)).To(gomega.Succeed())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is potential for flakiness here, as Kueue might not have observed the Pod as Failed yet.

I'm not sure if events within a kind are ordered. In that case, Kueue might only see the replacement Pod after it has seen the other Pod as failed, in which case there wouldn't be flakiness.

Let's run the tests a few times.

Otherwise, we might have to implement the logic in which, instead of deleting excess Pods, they are left gated until there is space.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume there is no issue with flakiness, because I run this in a look locally and for >1h and all attempts have passed. Also, all attempts on GH CI passed (6).

replacementPods[origKey] = client.ObjectKeyFromObject(rep)
}
}
}
return len(replacementPods)
}, util.Timeout, util.Interval).Should(gomega.BeNumerically("==", len(defaultPriorityGroup)))
mimowo marked this conversation as resolved.
Show resolved Hide resolved
})

ginkgo.By("Verify the high-priority pods are scheduled", func() {
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range highPriorityGroup {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Verify the high priority group completes", func() {
util.ExpectWorkloadToFinish(ctx, k8sClient, highGroupKey)
})

ginkgo.By("Await for the replacement pods to be ungated", func() {
for _, replKey := range replacementPods {
gomega.Eventually(func(g gomega.Gomega) {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, replKey, &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}
})

// TODO(#1557): verify the default-priority workload is finished
ginkgo.By("Verify the replacement pods of the default priority workload complete", func() {
for _, replKey := range replacementPods {
gomega.Eventually(func(g gomega.Gomega) {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, replKey, &p)).To(gomega.Succeed())
g.Expect(p.Status.Phase).To(gomega.Equal(v1.PodSucceeded))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
}
})
})
})
})

Expand Down