Skip to content

Commit

Permalink
[pods] Consider deleted pods without spec.nodeName inactive. (#2212)
Browse files Browse the repository at this point in the history
* [pods] Consider Unschedulable pods inactive.

* Review Remarks

* Review Remarks

* Fix unit tests

* Review remarks

* Review Remarks

* Fix test nit.
  • Loading branch information
trasc authored May 17, 2024
1 parent 9e307f3 commit e11d776
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 4 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/jobs/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,9 +737,9 @@ func (p *Pod) notRunnableNorSucceededPods() []corev1.Pod {
}

// isPodRunnableOrSucceeded returns whether the Pod can eventually run, is Running or Succeeded.
// A Pod cannot run if it's gated and has a deletionTimestamp.
// A Pod cannot run if it's gated or has no node assignment while having a deletionTimestamp.
func isPodRunnableOrSucceeded(p *corev1.Pod) bool {
if p.DeletionTimestamp != nil && len(p.Spec.SchedulingGates) > 0 {
if p.DeletionTimestamp != nil && len(p.Spec.NodeName) == 0 {
return false
}
return p.Status.Phase != corev1.PodFailed
Expand Down
108 changes: 107 additions & 1 deletion pkg/controller/jobs/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1674,6 +1674,7 @@ func TestReconciler(t *testing.T) {
KueueFinalizer().
Queue("test-queue").
Group("test-group").
NodeName("test-node").
GroupTotalCount("2").
Delete().
Obj(),
Expand All @@ -1684,6 +1685,7 @@ func TestReconciler(t *testing.T) {
KueueFinalizer().
Queue("test-queue").
Group("test-group").
NodeName("test-node").
GroupTotalCount("2").
Delete().
Obj(),
Expand All @@ -1695,6 +1697,7 @@ func TestReconciler(t *testing.T) {
KueueFinalizer().
Queue("test-queue").
Group("test-group").
NodeName("test-node").
GroupTotalCount("2").
Delete().
Obj(),
Expand All @@ -1705,6 +1708,7 @@ func TestReconciler(t *testing.T) {
KueueFinalizer().
Queue("test-queue").
Group("test-group").
NodeName("test-node").
GroupTotalCount("2").
Delete().
Obj(),
Expand All @@ -1724,7 +1728,7 @@ func TestReconciler(t *testing.T) {
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet("dc85db45", 2).Request(corev1.ResourceCPU, "1").Obj()).
PodSets(*utiltesting.MakePodSet("dc85db45", 2).Request(corev1.ResourceCPU, "1").NodeName("test-node").Obj()).
Queue("test-queue").
Priority(0).
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod", "test-uid").
Expand Down Expand Up @@ -3756,6 +3760,108 @@ func TestReconciler(t *testing.T) {
},
},
},
"deleted unschedulable pods are finalized": {
pods: []corev1.Pod{
*basePodWrapper.
Clone().
Name("pod1").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
StatusPhase(corev1.PodRunning).
Group("test-group").
GroupTotalCount("2").
CreationTimestamp(time.Now().Add(-time.Hour)).
Obj(),
*basePodWrapper.
Clone().
Name("pod2").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Delete().
StatusPhase(corev1.PodPending).
Group("test-group").
GroupTotalCount("2").
CreationTimestamp(time.Now().Add(-time.Hour)).
Obj(),
*basePodWrapper.
Clone().
Name("replacement").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
KueueSchedulingGate().
Group("test-group").
GroupTotalCount("2").
CreationTimestamp(time.Now()).
Obj(),
},
workloadCmpOpts: defaultWorkloadCmpOpts,
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
*utiltesting.MakePodSet("dc85db45", 2).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Queue("user-queue").
Priority(0).
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod2", "test-uid").
ReserveQuota(utiltesting.MakeAdmission("cq", "dc85db45").AssignmentPodCount(2).Obj()).
Admitted(true).
Obj(),
},
wantPods: []corev1.Pod{
*basePodWrapper.
Clone().
Name("pod1").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
StatusPhase(corev1.PodRunning).
Group("test-group").
GroupTotalCount("2").
CreationTimestamp(time.Now().Add(-time.Hour)).
Obj(),
*basePodWrapper.
Clone().
Name("replacement").
Label("kueue.x-k8s.io/managed", "true").
KueueFinalizer().
Group("test-group").
GroupTotalCount("2").
CreationTimestamp(time.Now()).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("test-group", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(
*utiltesting.MakePodSet("dc85db45", 2).
Request(corev1.ResourceCPU, "1").
Obj(),
).
Queue("user-queue").
Priority(0).
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod1", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "pod2", "test-uid").
OwnerReference(corev1.SchemeGroupVersion.WithKind("Pod"), "replacement", "test-uid").
ReserveQuota(utiltesting.MakeAdmission("cq", "dc85db45").AssignmentPodCount(2).Obj()).
Admitted(true).
Obj(),
},
wantEvents: []utiltesting.EventRecord{
{
Key: types.NamespacedName{Name: "replacement", Namespace: "ns"},
EventType: "Normal",
Reason: "Started",
Message: "Admitted by clusterQueue cq",
},
{
Key: types.NamespacedName{Name: "test-group", Namespace: "ns"},
EventType: "Normal",
Reason: "OwnerReferencesAdded",
Message: "Added 1 owner reference(s)",
},
},
},
}

for name, tc := range testCases {
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,11 @@ func (p *PodSetWrapper) NodeSelector(kv map[string]string) *PodSetWrapper {
return p
}

func (p *PodSetWrapper) NodeName(name string) *PodSetWrapper {
p.Template.Spec.NodeName = name
return p
}

func (p *PodSetWrapper) Labels(kv map[string]string) *PodSetWrapper {
p.Template.Labels = kv
return p
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/testingjobs/pod/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ func (p *PodWrapper) NodeSelector(k, v string) *PodWrapper {
return p
}

// NodeName sets a node name to the Pod.
func (p *PodWrapper) NodeName(name string) *PodWrapper {
p.Spec.NodeName = name
return p
}

// Request adds a resource request to the default container.
func (p *PodWrapper) Request(r corev1.ResourceName, v string) *PodWrapper {
p.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v)
Expand Down
82 changes: 81 additions & 1 deletion test/e2e/singlecluster/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package e2e

import (
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -204,7 +205,7 @@ var _ = ginkgo.Describe("Pod groups", func() {
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range group {
var p corev1.Pod
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
Expand Down Expand Up @@ -254,6 +255,85 @@ var _ = ginkgo.Describe("Pod groups", func() {
util.ExpectWorkloadToFinish(ctx, k8sClient, client.ObjectKey{Namespace: ns.Name, Name: "group"})
})

ginkgo.It("Unscheduled Pod which is deleted can be replaced in group", func() {
eventList := corev1.EventList{}
eventWatcher, err := k8sClient.Watch(ctx, &eventList, &client.ListOptions{
Namespace: ns.Name,
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ginkgo.DeferCleanup(func() {
eventWatcher.Stop()
})

group := podtesting.MakePod("group", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Queue(lq.Name).
Request(corev1.ResourceCPU, "1").
MakeGroup(2)

// The first pod has a node selector for a missing node.
group[0].Spec.NodeSelector = map[string]string{"missing-node-key": "missing-node-value"}

ginkgo.By("Group starts", func() {
for _, p := range group {
gomega.Expect(k8sClient.Create(ctx, p.DeepCopy())).To(gomega.Succeed())
}
gomega.Eventually(func(g gomega.Gomega) {
for _, origPod := range group {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(origPod), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
}
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Check the second pod is no longer pending", func() {
gomega.Eventually(func(g gomega.Gomega) {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(group[1]), &p)).To(gomega.Succeed())
g.Expect(p.Status.Phase).NotTo(gomega.Equal(corev1.PodPending))
g.Expect(p.Spec.NodeName).NotTo(gomega.BeEmpty())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Check the first pod is Unschedulable", func() {
gomega.Eventually(func(g gomega.Gomega) {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(group[0]), &p)).To(gomega.Succeed())
g.Expect(p.Status.Phase).To(gomega.Equal(corev1.PodPending))
g.Expect(p.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(corev1.PodCondition{
Type: corev1.PodScheduled,
Status: corev1.ConditionFalse,
Reason: corev1.PodReasonUnschedulable,
}, cmpopts.IgnoreFields(corev1.PodCondition{}, "LastProbeTime", "LastTransitionTime", "Message"))))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Deleting the pod it remains Unschedulable", func() {
gomega.Expect(k8sClient.Delete(ctx, group[0])).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(group[0]), &p)).To(gomega.Succeed())
g.Expect(p.DeletionTimestamp.IsZero()).NotTo(gomega.BeTrue())
g.Expect(p.Status.Phase).To(gomega.Equal(corev1.PodPending))
g.Expect(p.Spec.NodeName).To(gomega.BeEmpty())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("Replacement pod is un-gated, and the failed one is deleted", func() {
rep := group[0].DeepCopy()
rep.Name = "replacement"
gomega.Expect(k8sClient.Create(ctx, rep)).To(gomega.Succeed())
gomega.Eventually(func(g gomega.Gomega) {
var p corev1.Pod
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(rep), &p)).To(gomega.Succeed())
g.Expect(p.Spec.SchedulingGates).To(gomega.BeEmpty())
g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(group[0]), &p)).To(testing.BeNotFoundError())
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})
})

ginkgo.It("should allow to schedule a group of diverse pods", func() {
group := podtesting.MakePod("group", ns.Name).
Image("gcr.io/k8s-staging-perf-tests/sleep:v0.1.0", []string{"1ms"}).
Expand Down

0 comments on commit e11d776

Please sign in to comment.