Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cache] Add more details in the clusterQueues inactive message. #3127

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3280,31 +3280,31 @@ func TestClusterQueueReadiness(t *testing.T) {
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "FlavorNotFound",
wantMessage: "Can't admit new workloads: FlavorNotFound",
wantMessage: "Can't admit new workloads: References missing ResourceFlavor [flavor1].",
},
"check not found": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
resourceFlavors: []*kueue.ResourceFlavor{baseFlavor},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: CheckNotFoundOrInactive",
wantReason: "AdmissionCheckNotFound",
wantMessage: "Can't admit new workloads: References missing AdmissionChecks [check1].",
},
"check inactive": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
resourceFlavors: []*kueue.ResourceFlavor{baseFlavor},
admissionChecks: []*kueue.AdmissionCheck{utiltesting.MakeAdmissionCheck("check1").Obj()},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: CheckNotFoundOrInactive",
wantReason: "AdmissionCheckInactive",
wantMessage: "Can't admit new workloads: References inactive AdmissionChecks [check1].",
},
"flavor and check not found": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "FlavorNotFound",
wantMessage: "Can't admit new workloads: FlavorNotFound, CheckNotFoundOrInactive",
wantMessage: "Can't admit new workloads: References missing ResourceFlavor [flavor1]. References missing AdmissionChecks [check1].",
},
"terminating": {
clusterQueues: []*kueue.ClusterQueue{baseQueue},
Expand Down Expand Up @@ -3332,7 +3332,7 @@ func TestClusterQueueReadiness(t *testing.T) {
clusterQueueName: "queue1",
wantStatus: metav1.ConditionFalse,
wantReason: "Stopped",
wantMessage: "Can't admit new workloads: Stopped",
wantMessage: "Can't admit new workloads: Is stopped.",
},
}

Expand Down
119 changes: 76 additions & 43 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package cache

import (
"errors"
"fmt"
"maps"
"math"
"slices"
"strings"

corev1 "k8s.io/api/core/v1"
Expand All @@ -35,6 +38,7 @@ import (
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -65,15 +69,16 @@ type clusterQueue struct {

AdmittedUsage resources.FlavorResourceQuantities
// localQueues by (namespace/name).
localQueues map[string]*queue
podsReadyTracking bool
hasMissingFlavors bool
hasMissingOrInactiveAdmissionChecks bool
hasMultipleSingleInstanceControllersChecks bool
hasFlavorIndependentAdmissionCheckAppliedPerFlavor bool
admittedWorkloadsCount int
isStopped bool
workloadInfoOptions []workload.InfoOption
localQueues map[string]*queue
podsReadyTracking bool
missingFlavors []kueue.ResourceFlavorReference
missingAdmissionChecks []string
inactiveAdmissionChecks []string
multipleSingleInstanceControllersChecks map[string][]string // key = controllerName
flavorIndependentAdmissionCheckAppliedPerFlavor []string
admittedWorkloadsCount int
isStopped bool
workloadInfoOptions []workload.InfoOption

resourceNode ResourceNode
hierarchy.ClusterQueue[*cohort]
Expand Down Expand Up @@ -219,7 +224,12 @@ func (c *clusterQueue) updateQuotasAndResourceGroups(in []kueue.ResourceGroup) b

func (c *clusterQueue) updateQueueStatus() {
status := active
if c.hasMissingFlavors || c.hasMissingOrInactiveAdmissionChecks || c.isStopped || c.hasMultipleSingleInstanceControllersChecks || c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor {
if c.isStopped ||
len(c.missingFlavors) > 0 ||
len(c.missingAdmissionChecks) > 0 ||
len(c.inactiveAdmissionChecks) > 0 ||
len(c.multipleSingleInstanceControllersChecks) > 0 ||
len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
status = pending
}
if c.Status == terminating {
Expand All @@ -237,42 +247,53 @@ func (c *clusterQueue) inactiveReason() (string, string) {
return "Terminating", "Can't admit new workloads; clusterQueue is terminating"
case pending:
reasons := make([]string, 0, 3)
messages := make([]string, 0, 3)
if c.isStopped {
reasons = append(reasons, "Stopped")
messages = append(messages, "Is stopped.")
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
if c.hasMissingFlavors {
if len(c.missingFlavors) > 0 {
reasons = append(reasons, "FlavorNotFound")
messages = append(messages, fmt.Sprintf("References missing ResourceFlavor %v.", c.missingFlavors))
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
if c.hasMissingOrInactiveAdmissionChecks {
reasons = append(reasons, "CheckNotFoundOrInactive")
if len(c.missingAdmissionChecks) > 0 {
reasons = append(reasons, "AdmissionCheckNotFound")
trasc marked this conversation as resolved.
Show resolved Hide resolved
trasc marked this conversation as resolved.
Show resolved Hide resolved
messages = append(messages, fmt.Sprintf("References missing AdmissionChecks %v.", c.missingAdmissionChecks))
trasc marked this conversation as resolved.
Show resolved Hide resolved
}

if c.hasMultipleSingleInstanceControllersChecks {
reasons = append(reasons, "MultipleSingleInstanceControllerChecks")
if len(c.inactiveAdmissionChecks) > 0 {
reasons = append(reasons, "AdmissionCheckInactive")
messages = append(messages, fmt.Sprintf("References inactive AdmissionChecks %v.", c.inactiveAdmissionChecks))
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
if len(c.multipleSingleInstanceControllersChecks) > 0 {
reasons = append(reasons, "MultipleSingleInstanceControllerAdmissionChecks")
for _, controller := range utilmaps.SortedKeys(c.multipleSingleInstanceControllersChecks) {
messages = append(messages, fmt.Sprintf("Only one AdmissionChecks of %v can be reference for controller %q.", c.multipleSingleInstanceControllersChecks[controller], controller))
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
}

if c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor {
if len(c.flavorIndependentAdmissionCheckAppliedPerFlavor) > 0 {
reasons = append(reasons, "FlavorIndependentAdmissionCheckAppliedPerFlavor")
messages = append(messages, fmt.Sprintf("AdmissionChecks %v cannot be set at flavor level.", c.flavorIndependentAdmissionCheckAppliedPerFlavor))
trasc marked this conversation as resolved.
Show resolved Hide resolved
}

if len(reasons) == 0 {
return "Unknown", "Can't admit new workloads."
}

return reasons[0], strings.Join([]string{"Can't admit new workloads:", strings.Join(reasons, ", ")}, " ")
return reasons[0], strings.Join([]string{"Can't admit new workloads:", strings.Join(messages, " ")}, " ")
trasc marked this conversation as resolved.
Show resolved Hide resolved
trasc marked this conversation as resolved.
Show resolved Hide resolved
}
return "Ready", "Can admit new flavors"
return "Ready", "Can admit new workloads"
}

// UpdateWithFlavors updates a ClusterQueue based on the passed ResourceFlavors set.
// Exported only for testing.
func (c *clusterQueue) UpdateWithFlavors(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.hasMissingFlavors = c.updateLabelKeys(flavors)
c.updateLabelKeys(flavors)
c.updateQueueStatus()
}

func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) bool {
var flavorNotFound bool
func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor) {
c.missingFlavors = nil
for i := range c.ResourceGroups {
rg := &c.ResourceGroups[i]
if len(rg.Flavors) == 0 {
Expand All @@ -286,61 +307,73 @@ func (c *clusterQueue) updateLabelKeys(flavors map[kueue.ResourceFlavorReference
keys.Insert(k)
}
} else {
flavorNotFound = true
c.missingFlavors = append(c.missingFlavors, fName)
}
}

if keys.Len() > 0 {
rg.LabelKeys = keys
}
}

return flavorNotFound
}

// updateWithAdmissionChecks updates a ClusterQueue based on the passed AdmissionChecks set.
func (c *clusterQueue) updateWithAdmissionChecks(checks map[string]AdmissionCheck) {
hasMissing := false
hasSpecificChecks := false
checksPerController := make(map[string]int, len(c.AdmissionChecks))
checksPerController := make(map[string][]string, len(c.AdmissionChecks))
singleInstanceControllers := sets.New[string]()
var missing []string
var inactive []string
var flavorIndependentCheckOnFlavors []string
for acName, flavors := range c.AdmissionChecks {
if ac, found := checks[acName]; !found {
hasMissing = true
missing = append(missing, acName)
} else {
if !ac.Active {
hasMissing = true
inactive = append(inactive, acName)
}
checksPerController[ac.Controller]++
checksPerController[ac.Controller] = append(checksPerController[ac.Controller], acName)
if ac.SingleInstanceInClusterQueue {
singleInstanceControllers.Insert(ac.Controller)
}
if ac.FlavorIndependent && flavors.Len() != 0 {
hasSpecificChecks = true
flavorIndependentCheckOnFlavors = append(flavorIndependentCheckOnFlavors, acName)
}
}
}

// sort the lists since c.AdmissionChecks is a map
slices.Sort(missing)
slices.Sort(inactive)
slices.Sort(flavorIndependentCheckOnFlavors)

update := false
if hasMissing != c.hasMissingOrInactiveAdmissionChecks {
c.hasMissingOrInactiveAdmissionChecks = hasMissing
if !slices.Equal(c.missingAdmissionChecks, missing) {
c.missingAdmissionChecks = missing
update = true
}

hasMultipleSICC := false
for controller, checks := range checksPerController {
if singleInstanceControllers.Has(controller) && checks > 1 {
hasMultipleSICC = true
}
if !slices.Equal(c.inactiveAdmissionChecks, inactive) {
c.inactiveAdmissionChecks = inactive
update = true
}

// remove the controllers which don't have more then one AC or are not single instance.
maps.DeleteFunc(checksPerController, func(controller string, acs []string) bool {
return len(acs) < 2 || !singleInstanceControllers.Has(controller)
})

// sort the remaining set
for c := range checksPerController {
slices.Sort(checksPerController[c])
}

if c.hasMultipleSingleInstanceControllersChecks != hasMultipleSICC {
c.hasMultipleSingleInstanceControllersChecks = hasMultipleSICC
if !maps.EqualFunc(checksPerController, c.multipleSingleInstanceControllersChecks, slices.Equal) {
c.multipleSingleInstanceControllersChecks = checksPerController
update = true
}

if c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor != hasSpecificChecks {
c.hasFlavorIndependentAdmissionCheckAppliedPerFlavor = hasSpecificChecks
if !slices.Equal(c.flavorIndependentAdmissionCheckAppliedPerFlavor, flavorIndependentCheckOnFlavors) {
c.flavorIndependentAdmissionCheckAppliedPerFlavor = flavorIndependentCheckOnFlavors
update = true
}

Expand Down
Loading