Skip to content

Commit

Permalink
[scheduler] Validate resource requests before admission
Browse files Browse the repository at this point in the history
  • Loading branch information
trasc committed Mar 21, 2023
1 parent 0e805f0 commit d1b04f1
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
63 changes: 63 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/go-logr/logr"
Expand All @@ -32,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/utils/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand All @@ -43,6 +45,8 @@ import (
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
"sigs.k8s.io/kueue/pkg/scheduler/preemption"
"sigs.k8s.io/kueue/pkg/util/api"
"sigs.k8s.io/kueue/pkg/util/limitrange"
"sigs.k8s.io/kueue/pkg/util/resource"
"sigs.k8s.io/kueue/pkg/util/routine"
"sigs.k8s.io/kueue/pkg/workload"
)
Expand Down Expand Up @@ -249,6 +253,10 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
} else if !cq.NamespaceSelector.Matches(labels.Set(ns.Labels)) {
e.inadmissibleMsg = "Workload namespace doesn't match ClusterQueue selector"
e.requeueReason = queue.RequeueReasonNamespaceMismatch
} else if err := s.validateRresources(&w); err != nil {
e.inadmissibleMsg = err.Error()
} else if err := s.validateLimitRange(ctx, &w); err != nil {
e.inadmissibleMsg = err.Error()
} else {
e.assignment = flavorassigner.AssignFlavors(log, &e.Info, snap.ResourceFlavors, cq)
e.inadmissibleMsg = e.assignment.Message()
Expand All @@ -258,6 +266,61 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna
return entries
}

// validateResources validates that requested resources are less or equal
// to limits
func (s *Scheduler) validateRresources(wi *workload.Info) error {
podsetsPath := field.NewPath("podSets")
// requests should be less then limits
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
psPath := podsetsPath.Child(ps.Name)
for i := range ps.Template.Spec.InitContainers {
c := ps.Template.Spec.InitContainers[i]
if !resource.IsLessOrEqual(c.Resources.Requests, c.Resources.Limits) {
allReasons = append(allReasons, fmt.Sprintf("%s requests exceed it's limits", psPath.Child("initContainers").Index(i).String()))
}
}

for i := range ps.Template.Spec.Containers {
c := ps.Template.Spec.Containers[i]
if !resource.IsLessOrEqual(c.Resources.Requests, c.Resources.Limits) {
allReasons = append(allReasons, fmt.Sprintf("%s requests exceed it's limits", psPath.Child("containers").Index(i).String()))
}
}
}
if len(allReasons) > 0 {
return fmt.Errorf("resource validation failed: %s", strings.Join(allReasons, "; "))
}
return nil
}

// validateLimitRange validates that the requested resources fit into the namespace defined
// limitRanges
func (s *Scheduler) validateLimitRange(ctx context.Context, wi *workload.Info) error {
podsetsPath := field.NewPath("podSets")
// get the range summary from the namespace
list := corev1.LimitRangeList{}
if err := s.client.List(ctx, &list, &client.ListOptions{Namespace: wi.Obj.Namespace}); err != nil {
return err
}
if len(list.Items) == 0 {
return nil
}
summary := limitrange.Summarize(list.Items...)

// verify
allReasons := []string{}
for i := range wi.Obj.Spec.PodSets {
ps := &wi.Obj.Spec.PodSets[i]
allReasons = append(allReasons, summary.ValidatePodSpec(&ps.Template.Spec, podsetsPath.Child(ps.Name))...)
}
if len(allReasons) > 0 {
return fmt.Errorf("limits validation failed: %s", strings.Join(allReasons, "; "))
}
return nil
}

// admit sets the admitting clusterQueue and flavors into the workload of
// the entry, and asynchronously updates the object in the apiserver after
// assuming it in the cache.
Expand Down
68 changes: 68 additions & 0 deletions test/integration/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/util/pointer"
"sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/workload"
"sigs.k8s.io/kueue/test/util"
)

Expand Down Expand Up @@ -964,4 +965,71 @@ var _ = ginkgo.Describe("Scheduler", func() {
util.ExpectPendingWorkloadsMetric(cq, 0, 1)
})
})

ginkgo.When("The workload's podSet resource requests are not valid", func() {
var (
cq *kueue.ClusterQueue
queue *kueue.LocalQueue
)

ginkgo.BeforeEach(func() {
gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).To(gomega.Succeed())
cq = testing.MakeClusterQueue("cluster-queue").
ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(),
).
Obj()
gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed())
queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed())
})

ginkgo.AfterEach(func() {
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, cq, true)
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true)
})

ginkgo.DescribeTable("", func(reqCPU, limCPU, minCPU, maxCPU string, limitType corev1.LimitType, status string, shouldBeAdmited bool) {
lr := testing.MakeLimitRange("limit", ns.Name).
WithType(limitType).
WithValue("Min", corev1.ResourceCPU, minCPU).
WithValue("Max", corev1.ResourceCPU, maxCPU).
Obj()
gomega.Expect(k8sClient.Create(ctx, lr)).To(gomega.Succeed())

wl := testing.MakeWorkload("workload", ns.Name).
Queue(queue.Name).
Request(corev1.ResourceCPU, reqCPU).
Limit(corev1.ResourceCPU, limCPU).
Obj()
gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed())

if shouldBeAdmited {
util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, cq.Name, wl)
} else {
gomega.Eventually(func() string {
rwl := kueue.Workload{}
if err := k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &rwl); err != nil {
return ""
}

ci := workload.FindConditionIndex(&rwl.Status, kueue.WorkloadAdmitted)
if ci < 0 {
return ""
}
return rwl.Status.Conditions[ci].Message
}, util.Timeout, util.Interval).Should(gomega.ContainSubstring(status))
}
gomega.Expect(util.DeleteWorkload(ctx, k8sClient, wl)).To(gomega.Succeed())
gomega.Expect(k8sClient.Delete(ctx, lr)).To(gomega.Succeed())
},
ginkgo.Entry("request more that limits", "3", "2", "1", "4", corev1.LimitTypeContainer, "resource validation failed:", false),
ginkgo.Entry("request over container limits", "2", "3", "1", "1", corev1.LimitTypeContainer, "limits validation failed:", false),
ginkgo.Entry("request under container limits", "2", "3", "3", "4", corev1.LimitTypeContainer, "limits validation failed:", false),
ginkgo.Entry("request over pod limits", "2", "3", "1", "1", corev1.LimitTypePod, "limits validation failed:", false),
ginkgo.Entry("request under pod limits", "2", "3", "3", "4", corev1.LimitTypePod, "limits validation failed:", false),
ginkgo.Entry("valid", "2", "3", "1", "4", corev1.LimitTypeContainer, "", true),
)
})
})

0 comments on commit d1b04f1

Please sign in to comment.