Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cache] Add more details in the clusterQueues inactive message. #3127

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions apis/kueue/v1beta1/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ClusterQueue Active condition reasons.
const (
ClusterQueueActiveReasonTerminating = "Terminating"
ClusterQueueActiveReasonStopped = "Stopped"
ClusterQueueActiveReasonFlavorNotFound = "FlavorNotFound"
ClusterQueueActiveReasonAdmissionCheckNotFound = "AdmissionCheckNotFound"
ClusterQueueActiveReasonAdmissionCheckInactive = "AdmissionCheckInactive"
ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks = "MultipleSingleInstanceControllerAdmissionChecks"
ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor = "FlavorIndependentAdmissionCheckAppliedPerFlavor"
ClusterQueueActiveReasonUnknown = "Unknown"
ClusterQueueActiveReasonReady = "Ready"
)

// ClusterQueueSpec defines the desired state of ClusterQueue
// +kubebuilder:validation:XValidation:rule="!has(self.cohort) && has(self.resourceGroups) ? self.resourceGroups.all(rg, rg.flavors.all(f, f.resources.all(r, !has(r.borrowingLimit)))) : true", message="borrowingLimit must be nil when cohort is empty"
type ClusterQueueSpec struct {
Expand Down
14 changes: 7 additions & 7 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3280,31 +3280,31 @@ func TestClusterQueueReadiness(t *testing.T) {
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "FlavorNotFound",
wantMessage: "Can't admit new workloads: FlavorNotFound",
wantMessage: "Can't admit new workloads: references missing ResourceFlavor(s): [flavor1].",
},
"check not found": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
resourceFlavors: []*kueue.ResourceFlavor{baseFlavor},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: CheckNotFoundOrInactive",
wantReason: "AdmissionCheckNotFound",
wantMessage: "Can't admit new workloads: references missing AdmissionCheck(s): [check1].",
},
"check inactive": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
resourceFlavors: []*kueue.ResourceFlavor{baseFlavor},
admissionChecks: []*kueue.AdmissionCheck{utiltesting.MakeAdmissionCheck("check1").Obj()},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: CheckNotFoundOrInactive",
wantReason: "AdmissionCheckInactive",
wantMessage: "Can't admit new workloads: references inactive AdmissionCheck(s): [check1].",
},
"flavor and check not found": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "FlavorNotFound",
wantMessage: "Can't admit new workloads: FlavorNotFound, CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: references missing ResourceFlavor(s): [flavor1], references missing AdmissionCheck(s): [check1].",
},
"terminating": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
Expand Down Expand Up @@ -3332,7 +3332,7 @@ func TestClusterQueueReadiness(t *testing.T) {
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "Stopped",
wantMessage: "Can't admit new workloads: Stopped",
wantMessage: "Can't admit new workloads: is stopped.",
},
}

Expand Down
130 changes: 82 additions & 48 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package cache

import (
"errors"
"fmt"
"maps"
"math"
"slices"
"strings"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,6 +38,8 @@ import (
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/api"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -65,15 +70,16 @@ type clusterQueue struct {

AdmittedUsage resources.FlavorResourceQuantities
// localQueues by (namespace/name).
localQueues map[string]*queue
podsReadyTracking bool
hasMissingFlavors bool
hasMissingOrInactiveAdmissionChecks bool
hasMultipleSingleInstanceControllersChecks bool
hasFlavorIndependentAdmissionCheckAppliedPerFlavor bool
admittedWorkloadsCount int
isStopped bool
workloadInfoOptions []workload.InfoOption
localQueues map[string]*queue
podsReadyTracking bool
missingFlavors []kueue.ResourceFlavorReference
missingAdmissionChecks []string
inactiveAdmissionChecks []string
multipleSingleInstanceControllersChecks map[string][]string // key = controllerName
flavorIndependentAdmissionCheckAppliedPerFlavor []string
admittedWorkloadsCount int
isStopped bool
workloadInfoOptions []workload.InfoOption

resourceNode ResourceNode
hierarchy.ClusterQueue[*cohort]
Expand Down Expand Up @@ -219,7 +225,12 @@ func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) b

func (c *clusterQueue) updateQueueStatus() {
status := active
if c.hasMissingFlavors || c.hasMissingOrInactiveAdmissionChecks || c.isStopped || c.hasMultipleSingleInstanceControllersChecks || c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor {
if c.isStopped ||
len(c.missingFlavors) > 0 ||
len(c.missingAdmissionChecks) > 0 ||
len(c.inactiveAdmissionChecks) > 0 ||
len(c.multipleSingleInstanceControllersChecks) > 0 ||
len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
status = pending
}
if c.Status == terminating {
Expand All @@ -234,45 +245,56 @@ func (c *clusterQueue) updateQueueStatus() {
func (c *clusterQueue) inactiveReason() (string, string) {
switch c.Status {
case terminating:
return "Terminating", "Can't admit new workloads; clusterQueue is terminating"
return kueue.ClusterQueueActiveReasonTerminating, "Can't admit new workloads; clusterQueue is terminating"
case pending:
reasons := make([]string, 0, 3)
messages := make([]string, 0, 3)
if c.isStopped {
reasons = append(reasons, "Stopped")
reasons = append(reasons, kueue.ClusterQueueActiveReasonStopped)
messages = append(messages, "is stopped")
}
if c.hasMissingFlavors {
reasons = append(reasons, "FlavorNotFound")
if len(c.missingFlavors) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorNotFound)
messages = append(messages, fmt.Sprintf("references missing ResourceFlavor(s): %v", c.missingFlavors))
}
if c.hasMissingOrInactiveAdmissionChecks {
reasons = append(reasons, "CheckNotFoundOrInactive")
if len(c.missingAdmissionChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonAdmissionCheckNotFound)
messages = append(messages, fmt.Sprintf("references missing AdmissionCheck(s): %v", c.missingAdmissionChecks))
}

if c.hasMultipleSingleInstanceControllersChecks {
reasons = append(reasons, "MultipleSingleInstanceControllerChecks")
if len(c.inactiveAdmissionChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonAdmissionCheckInactive)
messages = append(messages, fmt.Sprintf("references inactive AdmissionCheck(s): %v", c.inactiveAdmissionChecks))
}
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonMultipleSingleInstanceControllerAdmissionChecks)
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("only one AdmissionCheck of %v can be referenced for controller %q", c.multipleSingleInstanceControllersChecks[controller], controller))
}
}

if c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor {
reasons = append(reasons, "FlavorIndependentAdmissionCheckAppliedPerFlavor")
if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, kueue.ClusterQueueActiveReasonFlavorIndependentAdmissionCheckAppliedPerFlavor)
messages = append(messages, fmt.Sprintf("AdmissionCheck(s): %v cannot be set at flavor level", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
}

if len(reasons) == 0 {
return "Unknown", "Can't admit new workloads."
return kueue.ClusterQueueActiveReasonUnknown, "Can't admit new workloads."
}

return reasons[0], strings.Join([]string{"Can't admit new workloads:", strings.Join(reasons, ", ")}, " ")
return reasons[0], api.TruncateConditionMessage(strings.Join([]string{"Can't admit new workloads: ", strings.Join(messages, ", "), "."}, ""))
}
return "Ready", "Can admit new flavors"
return kueue.ClusterQueueActiveReasonReady, "Can admit new workloads"
}

// UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set.
// Exported only for testing.
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.hasMissingFlavors = c.updateLabelKeys(flavors)
c.updateLabelKeys(flavors)
c.updateQueueStatus()
}

func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) bool {
var flavorNotFound bool
func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.missingFlavors = nil
for i := range c.ResourceGroups {
rg := &c.ResourceGroups[i]
if len(rg.Flavors) == 0 {
Expand All @@ -286,61 +308,73 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
keys.Insert(k)
}
} else {
flavorNotFound = true
c.missingFlavors = append(c.missingFlavors, fName)
}
}

if keys.Len() > 0 {
rg.LabelKeys = keys
}
}

return flavorNotFound
}

// updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set.
func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) {
hasMissing := false
hasSpecificChecks := false
checksPerController := make(map[string]int, len(c.AdmissionChecks))
checksPerController := make(map[string][]string, len(c.AdmissionChecks))
singleInstanceControllers := sets.New[string]()
var missing []string
var inactive []string
var flavorIndependentCheckOnFlavors []string
for acName, flavors := range c.AdmissionChecks {
if ac, found := checks[acName]; !found {
hasMissing = true
missing = append(missing, acName)
} else {
if !ac.Active {
hasMissing = true
inactive = append(inactive, acName)
}
checksPerController[ac.Controller]++
checksPerController[ac.Controller] = append(checksPerController[ac.Controller], acName)
if ac.SingleInstanceInClusterQueue {
singleInstanceControllers.Insert(ac.Controller)
}
if ac.FlavorIndependent && flavors.Len() != 0 {
hasSpecificChecks = true
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}
}
}

// sort the lists since c.AdmissionChecks is a map
slices.Sort(missing)
slices.Sort(inactive)
slices.Sort(flavorIndependentCheckOnFlavors)

update := false
if hasMissing != c.hasMissingOrInactiveAdmissionChecks {
c.hasMissingOrInactiveAdmissionChecks = hasMissing
if !slices.Equal(c.missingAdmissionChecks, missing) {
c.missingAdmissionChecks = missing
update = true
}

hasMultipleSICC := false
for controller, checks := range checksPerController {
if singleInstanceControllers.Has(controller) && checks > 1 {
hasMultipleSICC = true
}
if !slices.Equal(c.inactiveAdmissionChecks, inactive) {
c.inactiveAdmissionChecks = inactive
update = true
}

// remove the controllers which don't have more then one AC or are not single instance.
maps.DeleteFunc(checksPerController, func(controller string, acs []string) bool {
return len(acs) < 2 || !singleInstanceControllers.Has(controller)
})

// sort the remaining set
for c := range checksPerController {
slices.Sort(checksPerController[c])
}

if c.hasMultipleSingleInstanceControllersChecks != hasMultipleSICC {
c.hasMultipleSingleInstanceControllersChecks = hasMultipleSICC
if !maps.EqualFunc(checksPerController, c.multipleSingleInstanceControllersChecks, slices.Equal) {
c.multipleSingleInstanceControllersChecks = checksPerController
update = true
}

if c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor != hasSpecificChecks {
c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor = hasSpecificChecks
if !slices.Equal(c.flavorIndependentAdmissionCheckAppliedPerFlavor, flavorIndependentCheckOnFlavors) {
c.flavorIndependentAdmissionCheckAppliedPerFlavor = flavorIndependentCheckOnFlavors
update = true
}

Expand Down
Loading