-
Notifications
You must be signed in to change notification settings - Fork 262
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Define ClusterQueueSnapshot and CohortSnapshot types #2519
Conversation
✅ Deploy Preview for kubernetes-sigs-kueue canceled.
|
pkg/cache/clusterqueue.go
Outdated
@@ -150,7 +139,7 @@ func (c *Cohort) CalculateLendable() map[corev1.ResourceName]int64 { | |||
return lendable | |||
} | |||
|
|||
func (c *ClusterQueue) FitInCohort(q resources.FlavorResourceQuantities) bool { | |||
func (c ClusterQueueSnapshot) FitInCohort(q resources.FlavorResourceQuantities) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (c ClusterQueueSnapshot) FitInCohort(q resources.FlavorResourceQuantities) bool { | |
func (c *ClusterQueueSnapshot) FitInCohort(q resources.FlavorResourceQuantities) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this exclusively for efficiency reasons, as ClusterQueueSnapshot is rather large?
pkg/cache/clusterqueue.go
Outdated
@@ -626,7 +604,7 @@ func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool { | |||
// LendingLimit will also be counted here if feature LendingLimit enabled. | |||
// Please note that for different clusterQueues, the requestable quota is different, | |||
// they should be calculated dynamically. | |||
func (c *ClusterQueue) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { | |||
func (c ClusterQueueSnapshot) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (c ClusterQueueSnapshot) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { | |
func (c *ClusterQueueSnapshot) RequestableCohortQuota(fName kueue.ResourceFlavorReference, rName corev1.ResourceName) (val int64) { |
same for others
pkg/resources/resource.go
Outdated
@@ -40,3 +40,11 @@ func (f FlavorResourceQuantitiesFlat) Unflatten() FlavorResourceQuantities { | |||
} | |||
return out | |||
} | |||
|
|||
// SafeGet attempts to access nested value, returning 0 if absent. | |||
func (f FlavorResourceQuantities) SafeGet(fr FlavorResource) int64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (f FlavorResourceQuantities) SafeGet(fr FlavorResource) int64 { | |
func (f FlavorResourceQuantities) For(fr FlavorResource) int64 { |
pkg/cache/clusterqueue.go
Outdated
@@ -674,41 +652,67 @@ func (c *ClusterQueue) UsedCohortQuota(fName kueue.ResourceFlavorReference, rNam | |||
return cohortUsage | |||
} | |||
|
|||
func (c ClusterQueue) hasCohort() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment that these methods just implement the interface.
Otherwise they look unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, also in clusterqueue_snapshot
pkg/cache/clusterqueue_test.go
Outdated
@@ -1052,7 +1062,7 @@ func TestDominantResourceShare(t *testing.T) { | |||
} | |||
for name, tc := range cases { | |||
t.Run(name, func(t *testing.T) { | |||
drValue, drName := tc.cq.DominantResourceShareWith(tc.flvResQ) | |||
drValue, drName := dominantResourceShare(&tc.cq, tc.flvResQ, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, try to test exported functions or methods
pkg/cache/resource.go
Outdated
// note: this is copy heavy, but we don't care - the structure of ResourceGroup will change | ||
// soon, which will have these values precomputed (or cheap to access). | ||
func flavorResources(node resourceGroupNode) []resources.FlavorResource { | ||
flavorResources := make([]resources.FlavorResource, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flavorResources := make([]resources.FlavorResource, 0) | |
var flavorResources []resources.FlavorResource |
append
can allocate as necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turned flavorResources into a named return value
pkg/cache/resource.go
Outdated
// negative value implies that the node is borrowing. | ||
func getNetQuota(node netQuotaNode) resources.FlavorResourceQuantitiesFlat { | ||
netQuota := make(resources.FlavorResourceQuantitiesFlat) | ||
for _, fr := range flavorResources(node) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like creating the intermediate flavorResources
slice should not be necessary.
the resourceQuota
function also does a linear search again throughout the resource groups and the flavors within them.
I think we can iterate once over the resourceGroups to calculate the net quotas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was to allow this to be easily substituted once I move capacity/limits out from ResourceGroup (into some flat datastructures, maybe FlavorResourceQuantitiesFlat), without concern for the complexity as (the inefficient part of) this code is expected to be short lived
I updated so that it is now O(n), and should still be easily updated with my upcoming changes.
pkg/cache/snapshot.go
Outdated
@@ -111,7 +111,7 @@ func (c *Cache) Snapshot() Snapshot { | |||
snap.ResourceFlavors[name] = rf | |||
} | |||
for _, cohort := range c.cohorts { | |||
cohortCopy := newCohort(cohort.Name, cohort.Members.Len()) | |||
cohortCopy := cohort.snapshot() | |||
cohortCopy.AllocatableResourceGeneration = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not put all of this inside cohort.snapshot
? Otherwise the logic is split between two places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thank you.
I don't love the signature I created (as it has no return value, and cohort is only accessible via CQ), but once Cohorts become first class, we'll likely update it to return CohortSnapshot.
pkg/cache/snapshot_test.go
Outdated
@@ -633,7 +631,7 @@ func TestSnapshot(t *testing.T) { | |||
for i := range cq.ResourceGroups { | |||
rg := &cq.ResourceGroups[i] | |||
for rName := range rg.CoveredResources { | |||
if cq.RGByResource[rName] != rg { | |||
if resourceGroupForResource(cq, rName) != rg { | |||
t.Errorf("RGByResource[%s] does not point to its resource group", rName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update error message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. And updated test to use the exported method.
pkg/cache/clusterqueue_snapshot.go
Outdated
} | ||
|
||
func (c ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *ResourceGroup { | ||
return resourceGroupForResource(c, resource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this function to also work for non-snapshot CQ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is only used by flavorassigner and scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then implement it here, instead of using this secondary function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh, I misunderstood your question. It was being also used in cache test. But after your comment below, i deleted that test.
So I'll move it in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/cache/clusterqueue.go
Outdated
@@ -150,7 +139,7 @@ func (c *Cohort) CalculateLendable() map[corev1.ResourceName]int64 { | |||
return lendable | |||
} | |||
|
|||
func (c *ClusterQueue) FitInCohort(q resources.FlavorResourceQuantities) bool { | |||
func (c ClusterQueueSnapshot) FitInCohort(q resources.FlavorResourceQuantities) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this exclusively for efficiency reasons, as ClusterQueueSnapshot is rather large?
pkg/cache/clusterqueue_snapshot.go
Outdated
} | ||
|
||
func (c ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *ResourceGroup { | ||
return resourceGroupForResource(c, resource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it is only used by flavorassigner and scheduler
pkg/cache/resource.go
Outdated
// note: this is copy heavy, but we don't care - the structure of ResourceGroup will change | ||
// soon, which will have these values precomputed (or cheap to access). | ||
func flavorResources(node resourceGroupNode) []resources.FlavorResource { | ||
flavorResources := make([]resources.FlavorResource, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turned flavorResources into a named return value
pkg/cache/resource.go
Outdated
// negative value implies that the node is borrowing. | ||
func getNetQuota(node netQuotaNode) resources.FlavorResourceQuantitiesFlat { | ||
netQuota := make(resources.FlavorResourceQuantitiesFlat) | ||
for _, fr := range flavorResources(node) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My intention was to allow this to be easily substituted once I move capacity/limits out from ResourceGroup (into some flat datastructures, maybe FlavorResourceQuantitiesFlat), without concern for the complexity as (the inefficient part of) this code is expected to be short lived
I updated so that it is now O(n), and should still be easily updated with my upcoming changes.
pkg/cache/snapshot.go
Outdated
@@ -111,7 +111,7 @@ func (c *Cache) Snapshot() Snapshot { | |||
snap.ResourceFlavors[name] = rf | |||
} | |||
for _, cohort := range c.cohorts { | |||
cohortCopy := newCohort(cohort.Name, cohort.Members.Len()) | |||
cohortCopy := cohort.snapshot() | |||
cohortCopy.AllocatableResourceGeneration = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thank you.
I don't love the signature I created (as it has no return value, and cohort is only accessible via CQ), but once Cohorts become first class, we'll likely update it to return CohortSnapshot.
pkg/cache/snapshot_test.go
Outdated
@@ -633,7 +631,7 @@ func TestSnapshot(t *testing.T) { | |||
for i := range cq.ResourceGroups { | |||
rg := &cq.ResourceGroups[i] | |||
for rName := range rg.CoveredResources { | |||
if cq.RGByResource[rName] != rg { | |||
if resourceGroupForResource(cq, rName) != rg { | |||
t.Errorf("RGByResource[%s] does not point to its resource group", rName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. And updated test to use the exported method.
pkg/cache/clusterqueue.go
Outdated
@@ -674,41 +652,67 @@ func (c *ClusterQueue) UsedCohortQuota(fName kueue.ResourceFlavorReference, rNam | |||
return cohortUsage | |||
} | |||
|
|||
func (c ClusterQueue) hasCohort() bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, also in clusterqueue_snapshot
pkg/cache/cache_test.go
Outdated
@@ -1261,8 +1261,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { | |||
for i := range cq.ResourceGroups { | |||
rg := &cq.ResourceGroups[i] | |||
for rName := range rg.CoveredResources { | |||
if cq.RGByResource[rName] != rg { | |||
t.Errorf("RGByResource[%s] does not point to its resource group", rName) | |||
if resourceGroupForResource(cq, rName) != rg { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this check is only relevant for the snapshot, so maybe worth removing it here.
pkg/cache/snapshot.go
Outdated
@@ -156,7 +144,25 @@ func (c *ClusterQueue) snapshot() *ClusterQueue { | |||
return cc | |||
} | |||
|
|||
func (c *ClusterQueue) accumulateResources(cohort *Cohort) { | |||
func (c *Cohort) snapshot(cqs map[string]*ClusterQueueSnapshot) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (c *Cohort) snapshot(cqs map[string]*ClusterQueueSnapshot) { | |
func (c *Cohort) snapshotInto(cqs map[string]*ClusterQueueSnapshot) { |
@@ -72,8 +71,6 @@ type ClusterQueue struct { | |||
// Lendable holds the total lendable quota for the resources of the ClusterQueue, independent of the flavor. | |||
Lendable map[corev1.ResourceName]int64 | |||
|
|||
// The following fields are not populated in a snapshot. | |||
|
|||
AdmittedUsage resources.FlavorResourceQuantities |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are all of the fields still needed in this struct?
FairWeight: oneQuantity, | ||
Usage: resources.FlavorResourceQuantitiesFlat{ | ||
{Flavor: "default", Resource: corev1.ResourceCPU}: 1_000, | ||
{Flavor: "default", Resource: "example.com/gpu"}: 2, | ||
}.Unflatten(), | ||
ResourceGroups: []ResourceGroup{ | ||
{ | ||
CoveredResources: sets.New(corev1.ResourceCPU, "example.com/gpu"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The setup for snapshot seems to require extra field(s) which seem redundant.
Wouldn't it be better to still use ClusterQueue
in test cases setup and generate the snapshots based on the ClusterQueues inside t.Run
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but that can be done in a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the direction of the changes, thanks
/lgtm |
LGTM label has been added. Git tree hash: bd1d46818c682fcf6721ec4be56300a0d8cbd825
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: alculquicondor, gabesaba The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/test pull-kueue-test-e2e-main-1-28 |
What type of PR is this?
/kind cleanup
What this PR does / why we need it:
Preparation for #79. Create Snapshot types for CQ and Cohort, to better differentiate between Cache types. We were currently relying on fields properly being set/copied, and some fields not being set.
This change will make the code easier to understand and less error prone.
Special notes for your reviewer:
Unexporting Cohort/ClusterQueue works trivially. I will send this out in a subsequent PR.
Additionally, I didn't move methods that operate on ClusterQueueSnapshot to the new file, to keep the diff smaller. These can be moved in a subsequent PR if we wish.
Also folding #2489 into this change.
Does this PR introduce a user-facing change?