Skip to content

Commit

Permalink
DRA: handle duplicating unschedulable pods using DRA
Browse files Browse the repository at this point in the history
  • Loading branch information
towca committed Oct 7, 2024
1 parent 16e995d commit 4d499a1
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 48 deletions.
4 changes: 4 additions & 0 deletions cluster-autoscaler/core/static_autoscaler_dra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,10 @@ func TestStaticAutoscalerDynamicResources(t *testing.T) {
//"pods with nominatedNodeName using ResourceClaims are correctly scheduled": {
//
//},
// TODO(DRA): Write.
//"duplicating unschedulabel pods using ResourceClaims works correctly": {
//
//},
}

for tcName, tc := range testCases {
Expand Down
6 changes: 5 additions & 1 deletion cluster-autoscaler/dynamicresources/sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func SanitizeNodeResourceSlices(nodeLocalSlices []*resourceapi.ResourceSlice, ne
// and nameSuffix is appended to all pool names in allocation results, to match the pool names of the new, duplicated node.
// - Returns an error if any of the allocated claims is not node-local on oldNodeName. Such allocations can't be sanitized, the only
// option is to clear the allocation and run scheduler filters&reserve to get a new allocation when duplicating a pod.
func SanitizePodResourceClaims(newOwner, oldOwner *v1.Pod, claims []*resourceapi.ResourceClaim, nameSuffix, oldNodeName, newNodeName string, oldNodePoolNames map[string]bool) ([]*resourceapi.ResourceClaim, error) {
func SanitizePodResourceClaims(newOwner, oldOwner *v1.Pod, claims []*resourceapi.ResourceClaim, clearAllocations bool, nameSuffix, oldNodeName, newNodeName string, oldNodePoolNames map[string]bool) ([]*resourceapi.ResourceClaim, error) {
var newResourceClaims []*resourceapi.ResourceClaim
for _, claim := range claims {
if ownerName, ownerUid := ClaimOwningPod(claim); ownerName != oldOwner.Name || ownerUid != oldOwner.UID {
Expand All @@ -67,6 +67,10 @@ func SanitizePodResourceClaims(newOwner, oldOwner *v1.Pod, claims []*resourceapi
claimCopy.Name = fmt.Sprintf("%s-%s", claim.Name, nameSuffix)
claimCopy.OwnerReferences = []metav1.OwnerReference{podClaimOwnerReference(newOwner)}

if clearAllocations {
claimCopy.Status.Allocation = nil
}

if claimCopy.Status.Allocation == nil {
// Unallocated claim - just clear the consumer reservations to be sure, and we're done.
claimCopy.Status.ReservedFor = []resourceapi.ResourceClaimConsumerReference{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,55 +40,55 @@ func TestEnforceInjectedPodsLimitProcessor(t *testing.T) {
{
name: "Real pods = 0 && fake pods < PodLimit",
podLimit: 10,
unschedulablePods: makeFakePods(ownerUid, samplePod, 5),
unschedulablePods: makeFakePodsIgnoreClaims(ownerUid, samplePod, 5),
expectedNumberOfResultedUnschedulablePods: 5,
expectedNumberOfResultedUnschedulableFakePods: 5,
expectedNumberOfResultedUnschedulableRealPods: 0,
},
{
name: "Real pods = 0 && fake pods > PodLimit",
podLimit: 10,
unschedulablePods: makeFakePods(ownerUid, samplePod, 15),
unschedulablePods: makeFakePodsIgnoreClaims(ownerUid, samplePod, 15),
expectedNumberOfResultedUnschedulablePods: 10,
expectedNumberOfResultedUnschedulableFakePods: 10,
expectedNumberOfResultedUnschedulableRealPods: 0,
},
{
name: "Real pods > PodLimit && some fake pods",
podLimit: 10,
unschedulablePods: append(makeTestingPods(11), makeFakePods(ownerUid, samplePod, 5)...),
unschedulablePods: append(makeTestingPods(11), makeFakePodsIgnoreClaims(ownerUid, samplePod, 5)...),
expectedNumberOfResultedUnschedulablePods: 11,
expectedNumberOfResultedUnschedulableFakePods: 0,
expectedNumberOfResultedUnschedulableRealPods: 11,
},
{
name: "Real pods = PodLimit && some fake pods",
podLimit: 10,
unschedulablePods: append(makeTestingPods(10), makeFakePods(ownerUid, samplePod, 5)...),
unschedulablePods: append(makeTestingPods(10), makeFakePodsIgnoreClaims(ownerUid, samplePod, 5)...),
expectedNumberOfResultedUnschedulablePods: 10,
expectedNumberOfResultedUnschedulableFakePods: 0,
expectedNumberOfResultedUnschedulableRealPods: 10,
},
{
name: "Real pods < PodLimit && real pods + fake pods > PodLimit",
podLimit: 10,
unschedulablePods: append(makeTestingPods(3), makeFakePods(ownerUid, samplePod, 10)...),
unschedulablePods: append(makeTestingPods(3), makeFakePodsIgnoreClaims(ownerUid, samplePod, 10)...),
expectedNumberOfResultedUnschedulablePods: 10,
expectedNumberOfResultedUnschedulableFakePods: 7,
expectedNumberOfResultedUnschedulableRealPods: 3,
},
{
name: "Real pods < PodLimit && real pods + fake pods < PodLimit",
podLimit: 10,
unschedulablePods: append(makeTestingPods(3), makeFakePods(ownerUid, samplePod, 4)...),
unschedulablePods: append(makeTestingPods(3), makeFakePodsIgnoreClaims(ownerUid, samplePod, 4)...),
expectedNumberOfResultedUnschedulablePods: 7,
expectedNumberOfResultedUnschedulableFakePods: 4,
expectedNumberOfResultedUnschedulableRealPods: 3,
},
{
name: "Real pods < PodLimit && real pods + fake pods = PodLimit",
podLimit: 10,
unschedulablePods: append(makeTestingPods(3), makeFakePods(ownerUid, samplePod, 7)...),
unschedulablePods: append(makeTestingPods(3), makeFakePodsIgnoreClaims(ownerUid, samplePod, 7)...),
expectedNumberOfResultedUnschedulablePods: 10,
expectedNumberOfResultedUnschedulableFakePods: 7,
expectedNumberOfResultedUnschedulableRealPods: 3,
Expand Down
31 changes: 25 additions & 6 deletions cluster-autoscaler/processors/podinjection/pod_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,39 @@ package podinjection

import (
apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/dynamicresources"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
)

type podGroup struct {
podCount int
desiredReplicas int
sample *apiv1.Pod
sample *framework.PodInfo
ownerUid types.UID
}

// groupPods creates a map of controller uids and podGroups.
// If a controller for some pods is not found, such pods are ignored and not grouped
func groupPods(pods []*apiv1.Pod, controllers []controller) map[types.UID]podGroup {
func groupPods(pods []*apiv1.Pod, controllers []controller, snapshot clustersnapshot.ClusterSnapshot, draEnabled bool) map[types.UID]podGroup {
podGroups := map[types.UID]podGroup{}
for _, con := range controllers {
podGroups[con.uid] = makePodGroup(con.desiredReplicas)
}

for _, pod := range pods {
for _, ownerRef := range pod.OwnerReferences {
podGroups = updatePodGroups(pod, ownerRef, podGroups)
podGroups = updatePodGroups(pod, ownerRef, podGroups, snapshot, draEnabled)
}
}
return podGroups
}

// updatePodGroups updates the pod group if ownerRef is the controller of the pod
func updatePodGroups(pod *apiv1.Pod, ownerRef metav1.OwnerReference, podGroups map[types.UID]podGroup) map[types.UID]podGroup {
func updatePodGroups(pod *apiv1.Pod, ownerRef metav1.OwnerReference, podGroups map[types.UID]podGroup, snapshot clustersnapshot.ClusterSnapshot, draEnabled bool) map[types.UID]podGroup {
if ownerRef.Controller == nil {
return podGroups
}
Expand All @@ -58,8 +62,23 @@ func updatePodGroups(pod *apiv1.Pod, ownerRef metav1.OwnerReference, podGroups m
return podGroups
}
if group.sample == nil && pod.Spec.NodeName == "" {
group.sample = pod
group.ownerUid = ownerRef.UID
validSample := true

var podClaims []*resourceapi.ResourceClaim
if draEnabled && dynamicresources.PodNeedsResourceClaims(pod) {
claims, err := snapshot.GetPodResourceClaims(pod)
if err != nil {
// Error means that not all claims for this Pod are created yet, it won't be able to schedule anyway - count it, but don't use as a sample.
validSample = false
} else {
podClaims = claims
}
}

if validSample {
group.sample = &framework.PodInfo{Pod: pod, NeededResourceClaims: podClaims}
group.ownerUid = ownerRef.UID
}
}
group.podCount += 1
podGroups[ownerRef.UID] = group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/dynamicresources"
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -67,13 +68,27 @@ func (p *PodInjectionPodListProcessor) Process(ctx *context.AutoscalingContext,
}
scheduledPods := podsFromNodeInfos(nodeInfos)

groupedPods := groupPods(append(scheduledPods, unschedulablePods...), controllers)
groupedPods := groupPods(append(scheduledPods, unschedulablePods...), controllers, ctx.ClusterSnapshot, ctx.EnableDynamicResources)
var podsToInject []*apiv1.Pod

for _, groupedPod := range groupedPods {
var fakePodCount = groupedPod.fakePodCount()
fakePods := makeFakePods(groupedPod.ownerUid, groupedPod.sample, fakePodCount)
podsToInject = append(podsToInject, fakePods...)
fakePods, err := makeFakePods(groupedPod.ownerUid, groupedPod.sample, fakePodCount)
if err != nil {
return nil, err
}
for _, podInfo := range fakePods {
podsToInject = append(podsToInject, podInfo.Pod)

if ctx.EnableDynamicResources && dynamicresources.PodNeedsResourceClaims(podInfo.Pod) {
// If the pod we're duplicating owns any ResourceClaims, we need to duplicate and inject them into the cluster snapshot in order
// for the pods to be able to schedule.
err := ctx.ClusterSnapshot.AddResourceClaims(podInfo.NeededResourceClaims)
if err != nil {
return nil, err
}
}
}
}

unschedulablePodsAfterProcessing := append(unschedulablePods, podsToInject...)
Expand All @@ -87,15 +102,24 @@ func (p *PodInjectionPodListProcessor) CleanUp() {

// makeFakePods creates podCount number of copies of the sample pod
// makeFakePods also adds annotation to the pod to be marked as "fake"
func makeFakePods(ownerUid types.UID, samplePod *apiv1.Pod, podCount int) []*apiv1.Pod {
var fakePods []*apiv1.Pod
func makeFakePods(ownerUid types.UID, samplePod *framework.PodInfo, podCount int) ([]*framework.PodInfo, error) {
var fakePods []*framework.PodInfo
for i := 1; i <= podCount; i++ {
newPod := withFakePodAnnotation(samplePod.DeepCopy())
newPod.Name = fmt.Sprintf("%s-copy-%d", samplePod.Name, i)
nameSuffix := fmt.Sprintf("copy-%d", i)
newPod.Name = fmt.Sprintf("%s-%s", samplePod.Name, nameSuffix)
newPod.UID = types.UID(fmt.Sprintf("%s-%d", string(ownerUid), i))
fakePods = append(fakePods, newPod)

// Filter to only the claims owned by the duplicated pod, clear allocations to be sure, sanitize. These will need to be added to the cluster snapshot
// in order for the fake pod to be able to schedule.
newResourceClaims, err := dynamicresources.SanitizePodResourceClaims(newPod, samplePod.Pod, samplePod.NeededResourceClaims, true, nameSuffix, "", "", nil)
if err != nil {
return nil, err
}

fakePods = append(fakePods, &framework.PodInfo{Pod: newPod, NeededResourceClaims: newResourceClaims})
}
return fakePods
return fakePods, nil
}

// withFakePodAnnotation adds annotation of key `FakePodAnnotationKey` with value `FakePodAnnotationValue` to passed pod.
Expand Down
Loading

0 comments on commit 4d499a1

Please sign in to comment.