Skip to content

Commit

Permalink
Define ClusterQueueSnapshot and CohortSnapshot types
Browse files Browse the repository at this point in the history
  • Loading branch information
gabesaba committed Jul 5, 2024
1 parent 61cfdc7 commit 4d590a7
Show file tree
Hide file tree
Showing 15 changed files with 347 additions and 164 deletions.
2 changes: 1 addition & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (c *Cache) Usage(cqObj *kueue.ClusterQueue) (*ClusterQueueUsageStats, error
}

if c.fairSharingEnabled {
weightedShare, _ := cq.DominantResourceShare()
weightedShare, _ := dominantResourceShare(cq, nil, 0)
stats.WeightedShare = int64(weightedShare)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
t.Errorf("Unexpected error during test operation: %s", err)
}
if diff := cmp.Diff(tc.wantClusterQueues, cache.clusterQueues,
cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "RGByResource", "ResourceGroups"),
cmpopts.IgnoreFields(ClusterQueue{}, "Cohort", "ResourceGroups"),
cmpopts.IgnoreFields(workload.Info{}, "Obj", "LastAssignment"),
cmpopts.IgnoreUnexported(ClusterQueue{}),
cmpopts.EquateEmpty()); diff != "" {
Expand All @@ -1261,7 +1261,7 @@ func TestCacheClusterQueueOperations(t *testing.T) {
for i := range cq.ResourceGroups {
rg := &cq.ResourceGroups[i]
for rName := range rg.CoveredResources {
if cq.RGByResource[rName] != rg {
if resourceGroupForResource(cq, rName) != rg {
t.Errorf("RGByResource[%s] does not point to its resource group", rName)
}
}
Expand Down
100 changes: 50 additions & 50 deletions pkg/cache/clusterqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ type ClusterQueue struct {
Name string
Cohort *Cohort
ResourceGroups []ResourceGroup
RGByResource map[corev1.ResourceName]*ResourceGroup
Usage resources.FlavorResourceQuantities
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
Expand Down Expand Up @@ -91,16 +90,6 @@ type ClusterQueue struct {
type Cohort struct {
Name string
Members sets.Set[*ClusterQueue]

// The next fields are only populated for a snapshot.

// RequestableResources equals to the sum of LendingLimit when feature LendingLimit enabled.
RequestableResources resources.FlavorResourceQuantities
Usage resources.FlavorResourceQuantities
Lendable map[corev1.ResourceName]int64
// AllocatableResourceGeneration equals to
// the sum of allocatable generation among its members.
AllocatableResourceGeneration int64
}

type ResourceGroup struct {
Expand Down Expand Up @@ -150,7 +139,7 @@ func (c *Cohort) CalculateLendable() map[corev1.ResourceName]int64 {
return lendable
}

func (c *ClusterQueue) FitInCohort(q resources.FlavorResourceQuantities) bool {
func (c ClusterQueueSnapshot) FitInCohort(q resources.FlavorResourceQuantities) bool {
for flavor, qResources := range q {
if _, flavorFound := c.Cohort.RequestableResources[flavor]; flavorFound {
for resource, value := range qResources {
Expand Down Expand Up @@ -313,17 +302,6 @@ func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) {
if c.AllocatableResourceGeneration == 0 || !equality.Semantic.DeepEqual(oldRG, c.ResourceGroups) {
c.AllocatableResourceGeneration++
}
c.UpdateRGByResource()
}

func (c *ClusterQueue) UpdateRGByResource() {
c.RGByResource = make(map[corev1.ResourceName]*ResourceGroup)
for i := range c.ResourceGroups {
rg := &c.ResourceGroups[i]
for rName := range rg.CoveredResources {
c.RGByResource[rName] = rg
}
}
}

func (c *ClusterQueue) updateQueueStatus() {
Expand Down Expand Up @@ -530,7 +508,7 @@ func updateFlavorUsage(wi *workload.Info, flvUsage resources.FlavorResourceQuant
}
}

func updateCohortUsage(wi *workload.Info, cq *ClusterQueue, m int64) {
func updateCohortUsage(wi *workload.Info, cq *ClusterQueueSnapshot, m int64) {
for _, ps := range wi.TotalRequests {
for wlRes, wlResFlv := range ps.Flavors {
v, wlResExist := ps.Requests[wlRes]
Expand Down Expand Up @@ -626,7 +604,7 @@ func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool {
// LendingLimit will also be counted here if feature LendingLimit enabled.
// Please note that for different clusterQueues, the requestable quota is different,
// they should be calculated dynamically.
func (c *ClusterQueue) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
func (c ClusterQueueSnapshot) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
if c.Cohort.RequestableResources == nil || c.Cohort.RequestableResources[fName] == nil {
return 0
}
Expand All @@ -639,7 +617,7 @@ func (c *ClusterQueue) RequestableCohortQuota(fName kueue.ResourceFlavorReferenc
return requestableCohortQuota
}

func (c *ClusterQueue) guaranteedQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
func (c ClusterQueueSnapshot) guaranteedQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
if !features.Enabled(features.LendingLimit) {
return 0
}
Expand All @@ -652,7 +630,7 @@ func (c *ClusterQueue) guaranteedQuota(fName kueue.ResourceFlavorReference, rNam
// UsedCohortQuota returns the used quota by the flavor and resource name in the cohort.
// Note that when LendingLimit enabled, the usage is not equal to the total used quota but the one
// minus the guaranteed resources, this is only for judging whether workloads fit in the cohort.
func (c *ClusterQueue) UsedCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
func (c ClusterQueueSnapshot) UsedCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) {
if c.Cohort.Usage == nil || c.Cohort.Usage[fName] == nil {
return 0
}
Expand All @@ -674,41 +652,67 @@ func (c *ClusterQueue) UsedCohortQuota(fName kueue.ResourceFlavorReference, rNam
return cohortUsage
}

func (c ClusterQueue) hasCohort() bool {
return c.Cohort != nil
}

func (c ClusterQueue) getFairWeight() *resource.Quantity {
return &c.FairWeight
}

func (c ClusterQueue) getLendable() map[corev1.ResourceName]int64 {
return c.Cohort.CalculateLendable()
}

func (c ClusterQueue) usage(fr resources.FlavorResource) int64 {
return c.Usage.SafeGet(fr)
}

func (c ClusterQueue) getResourceGroups() *[]ResourceGroup {
return &c.ResourceGroups
}

// DominantResourceShare returns a value from 0 to 1,000,000 representing the maximum of the ratios
// of usage above nominal quota to the lendable resources in the cohort, among all the resources
// provided by the ClusterQueue, and divided by the weight.
// If zero, it means that the usage of the ClusterQueue is below the nominal quota.
// The function also returns the resource name that yielded this value.
// Also for a weight of zero, this will return 9223372036854775807.
func (c *ClusterQueue) DominantResourceShare() (int, corev1.ResourceName) {
return c.dominantResourceShare(nil, 0)
func (c ClusterQueueSnapshot) DominantResourceShare() (int, corev1.ResourceName) {
return dominantResourceShare(c, nil, 0)
}

func (c *ClusterQueue) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) {
return c.dominantResourceShare(wlReq, 1)
func (c ClusterQueueSnapshot) DominantResourceShareWith(wlReq resources.FlavorResourceQuantities) (int, corev1.ResourceName) {
return dominantResourceShare(c, wlReq, 1)
}

func (c *ClusterQueue) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) {
return c.dominantResourceShare(w.FlavorResourceUsage(), -1)
func (c ClusterQueueSnapshot) DominantResourceShareWithout(w *workload.Info) (int, corev1.ResourceName) {
return dominantResourceShare(c, w.FlavorResourceUsage(), -1)
}

func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) {
if c.Cohort == nil {
type dominantResourceShareNode interface {
hasCohort() bool
getFairWeight() *resource.Quantity
getLendable() map[corev1.ResourceName]int64

netQuotaNode
}

func dominantResourceShare(node dominantResourceShareNode, wlReq resources.FlavorResourceQuantities, m int64) (int, corev1.ResourceName) {
if !node.hasCohort() {
return 0, ""
}
if c.FairWeight.IsZero() {
if node.getFairWeight().IsZero() {
return math.MaxInt, ""
}

netQuota := getNetQuota(node)
borrowing := make(map[corev1.ResourceName]int64)
for _, rg := range c.ResourceGroups {
for _, flv := range rg.Flavors {
for rName, quotas := range flv.Resources {
b := c.Usage[flv.Name][rName] + m*wlReq[flv.Name][rName] - quotas.Nominal
if b > 0 {
borrowing[rName] += b
}
}

for fr, quota := range netQuota {
b := m*wlReq[fr.Flavor][fr.Resource] - quota
if b > 0 {
borrowing[fr.Resource] += b
}
}
if len(borrowing) == 0 {
Expand All @@ -718,11 +722,7 @@ func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuant
var drs int64 = -1
var dRes corev1.ResourceName

// If we are running from snapshot the c.Cohort.Lendable should be pre-calculated.
lendable := c.Cohort.Lendable
if lendable == nil {
lendable = c.Cohort.CalculateLendable()
}
lendable := node.getLendable()
for rName, b := range borrowing {
if lr := lendable[rName]; lr > 0 {
ratio := b * 1000 / lr
Expand All @@ -733,6 +733,6 @@ func (c *ClusterQueue) dominantResourceShare(wlReq resources.FlavorResourceQuant
}
}
}
dws := drs * 1000 / c.FairWeight.MilliValue()
dws := drs * 1000 / node.getFairWeight().MilliValue()
return int(dws), dRes
}
62 changes: 62 additions & 0 deletions pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package cache

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/resources"
"sigs.k8s.io/kueue/pkg/workload"
)

type ClusterQueueSnapshot struct {
Name string
Cohort *CohortSnapshot
ResourceGroups []ResourceGroup
Usage resources.FlavorResourceQuantities
Workloads map[string]*workload.Info
WorkloadsNotReady sets.Set[string]
NamespaceSelector labels.Selector
Preemption kueue.ClusterQueuePreemption
FairWeight resource.Quantity
FlavorFungibility kueue.FlavorFungibility
// Aggregates AdmissionChecks from both .spec.AdmissionChecks and .spec.AdmissionCheckStrategy
// Sets hold ResourceFlavors to which an AdmissionCheck should apply.
// In case its empty, it means an AdmissionCheck should apply to all ResourceFlavor
AdmissionChecks map[string]sets.Set[kueue.ResourceFlavorReference]
Status metrics.ClusterQueueStatus
// GuaranteedQuota records how much resource quota the ClusterQueue reserved
// when feature LendingLimit is enabled and flavor's lendingLimit is not nil.
GuaranteedQuota resources.FlavorResourceQuantities
// AllocatableResourceGeneration will be increased when some admitted workloads are
// deleted, or the resource groups are changed.
AllocatableResourceGeneration int64

// Lendable holds the total lendable quota for the resources of the ClusterQueue, independent of the flavor.
Lendable map[corev1.ResourceName]int64
}

func (c ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *ResourceGroup {
return resourceGroupForResource(c, resource)
}

func (c ClusterQueueSnapshot) hasCohort() bool {
return c.Cohort != nil
}
func (c ClusterQueueSnapshot) getFairWeight() *resource.Quantity {
return &c.FairWeight
}
func (c ClusterQueueSnapshot) getLendable() map[corev1.ResourceName]int64 {
return c.Cohort.Lendable
}

func (c ClusterQueueSnapshot) usage(fr resources.FlavorResource) int64 {
return c.Usage.SafeGet(fr)
}

func (c ClusterQueueSnapshot) getResourceGroups() *[]ResourceGroup {
return &c.ResourceGroups
}
Loading

0 comments on commit 4d590a7

Please sign in to comment.