Skip to content

Commit

Permalink
DRA: attach DRA objects to unschedulable pods in ScaleUp and Estimator
Browse files Browse the repository at this point in the history
  • Loading branch information
towca committed Aug 28, 2024
1 parent 202899b commit 48be6fd
Show file tree
Hide file tree
Showing 18 changed files with 152 additions and 136 deletions.
19 changes: 10 additions & 9 deletions cluster-autoscaler/core/scaleup/equivalence/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package equivalence

import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"reflect"

"k8s.io/autoscaler/cluster-autoscaler/utils"
Expand All @@ -30,14 +31,14 @@ import (

// PodGroup contains a group of pods that are equivalent in terms of schedulability.
type PodGroup struct {
Pods []*apiv1.Pod
Pods []*clustersnapshot.PodResourceInfo
SchedulingErrors map[string]status.Reasons
SchedulableGroups []string
Schedulable bool
}

// BuildPodGroups prepares pod groups with equivalent scheduling properties.
func BuildPodGroups(pods []*apiv1.Pod) []*PodGroup {
func BuildPodGroups(pods []*clustersnapshot.PodResourceInfo) []*PodGroup {
podEquivalenceGroups := []*PodGroup{}
for _, pods := range groupPodsBySchedulingProperties(pods) {
podEquivalenceGroups = append(podEquivalenceGroups, &PodGroup{
Expand All @@ -52,28 +53,28 @@ func BuildPodGroups(pods []*apiv1.Pod) []*PodGroup {
type equivalenceGroupId int
type equivalenceGroup struct {
id equivalenceGroupId
representant *apiv1.Pod
representant *clustersnapshot.PodResourceInfo
}

const maxEquivalenceGroupsByController = 10

// groupPodsBySchedulingProperties groups pods based on scheduling properties. Group ID is meaningless.
// TODO(x13n): refactor this to have shared logic with PodSchedulableMap.
func groupPodsBySchedulingProperties(pods []*apiv1.Pod) map[equivalenceGroupId][]*apiv1.Pod {
podEquivalenceGroups := map[equivalenceGroupId][]*apiv1.Pod{}
func groupPodsBySchedulingProperties(pods []*clustersnapshot.PodResourceInfo) map[equivalenceGroupId][]*clustersnapshot.PodResourceInfo {
podEquivalenceGroups := map[equivalenceGroupId][]*clustersnapshot.PodResourceInfo{}
equivalenceGroupsByController := make(map[types.UID][]equivalenceGroup)

var nextGroupId equivalenceGroupId
for _, pod := range pods {
controllerRef := drain.ControllerRef(pod)
if controllerRef == nil || pod_utils.IsDaemonSetPod(pod) {
podEquivalenceGroups[nextGroupId] = []*apiv1.Pod{pod}
controllerRef := drain.ControllerRef(pod.Pod)
if controllerRef == nil || pod_utils.IsDaemonSetPod(pod.Pod) {
podEquivalenceGroups[nextGroupId] = []*clustersnapshot.PodResourceInfo{pod}
nextGroupId++
continue
}

egs := equivalenceGroupsByController[controllerRef.UID]
if gid := match(egs, pod); gid != nil {
if gid := match(egs, pod.Pod); gid != nil {
podEquivalenceGroups[*gid] = append(podEquivalenceGroups[*gid], pod)
continue
}
Expand Down
55 changes: 30 additions & 25 deletions cluster-autoscaler/core/scaleup/equivalence/groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package equivalence

import (
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"testing"

. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -67,56 +68,56 @@ func TestGroupSchedulablePodsForNode(t *testing.T) {
}

projectedSAVol := BuildServiceTokenProjectedVolumeSource("path")
p1 := BuildTestPod("p1", 1500, 200000)
p2_1 := BuildTestPod("p2_1", 3000, 200000)
p1 := buildTestPod("p1", 1500, 200000)
p2_1 := buildTestPod("p2_1", 3000, 200000)
p2_1.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p2_2 := BuildTestPod("p2_2", 3000, 200000)
p2_2 := buildTestPod("p2_2", 3000, 200000)
p2_2.OwnerReferences = GenerateOwnerReferences(rc1.Name, "ReplicationController", "extensions/v1beta1", rc1.UID)
p3_1 := BuildTestPod("p3_1", 100, 200000)
p3_1 := buildTestPod("p3_1", 100, 200000)
p3_1.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
p3_2 := BuildTestPod("p3_2", 100, 200000)
p3_2 := buildTestPod("p3_2", 100, 200000)
p3_2.OwnerReferences = GenerateOwnerReferences(rc2.Name, "ReplicationController", "extensions/v1beta1", rc2.UID)
// Two pods with projected volume sources should be in the same equivalence group
p4_1 := BuildTestPod("p4_1", 100, 200000)
p4_1 := buildTestPod("p4_1", 100, 200000)
p4_1.OwnerReferences = GenerateOwnerReferences(rc3.Name, "ReplicationController", "extensions/v1beta1", rc3.UID)
p4_1.Spec.Volumes = []apiv1.Volume{{Name: "kube-api-access-nz94b", VolumeSource: apiv1.VolumeSource{Projected: projectedSAVol}}}
p4_2 := BuildTestPod("p4_2", 100, 200000)
p4_2 := buildTestPod("p4_2", 100, 200000)
p4_2.OwnerReferences = GenerateOwnerReferences(rc3.Name, "ReplicationController", "extensions/v1beta1", rc3.UID)
p4_2.Spec.Volumes = []apiv1.Volume{{Name: "kube-api-access-mo25i", VolumeSource: apiv1.VolumeSource{Projected: projectedSAVol}}}
// Two pods with flex volume sources should be in different equivalence groups
p5_1 := BuildTestPod("p5_1", 100, 200000)
p5_1 := buildTestPod("p5_1", 100, 200000)
p5_1.Spec.Volumes = []apiv1.Volume{{Name: "volume-nz94b", VolumeSource: apiv1.VolumeSource{FlexVolume: &apiv1.FlexVolumeSource{Driver: "testDriver"}}}}
p5_1.OwnerReferences = GenerateOwnerReferences(rc4.Name, "ReplicationController", "extensions/v1beta1", rc4.UID)
p5_2 := BuildTestPod("p5_2", 100, 200000)
p5_2 := buildTestPod("p5_2", 100, 200000)
p5_2.Spec.Volumes = []apiv1.Volume{{Name: "volume-mo25i", VolumeSource: apiv1.VolumeSource{FlexVolume: &apiv1.FlexVolumeSource{Driver: "testDriver"}}}}
p5_2.OwnerReferences = GenerateOwnerReferences(rc4.Name, "ReplicationController", "extensions/v1beta1", rc4.UID)
unschedulablePods := []*apiv1.Pod{p1, p2_1, p2_2, p3_1, p3_2, p4_1, p4_2, p5_1, p5_2}
unschedulablePods := []*clustersnapshot.PodResourceInfo{p1, p2_1, p2_2, p3_1, p3_2, p4_1, p4_2, p5_1, p5_2}

podGroups := groupPodsBySchedulingProperties(unschedulablePods)
assert.Equal(t, 6, len(podGroups))

wantedGroups := []struct {
pods []*apiv1.Pod
pods []*clustersnapshot.PodResourceInfo
found bool
}{
{pods: []*apiv1.Pod{p1}},
{pods: []*apiv1.Pod{p2_1, p2_2}},
{pods: []*apiv1.Pod{p3_1, p3_2}},
{pods: []*apiv1.Pod{p4_1, p4_2}},
{pods: []*apiv1.Pod{p5_1}},
{pods: []*apiv1.Pod{p5_2}},
{pods: []*clustersnapshot.PodResourceInfo{p1}},
{pods: []*clustersnapshot.PodResourceInfo{p2_1, p2_2}},
{pods: []*clustersnapshot.PodResourceInfo{p3_1, p3_2}},
{pods: []*clustersnapshot.PodResourceInfo{p4_1, p4_2}},
{pods: []*clustersnapshot.PodResourceInfo{p5_1}},
{pods: []*clustersnapshot.PodResourceInfo{p5_2}},
}

equal := func(a, b []*apiv1.Pod) bool {
equal := func(a, b []*clustersnapshot.PodResourceInfo) bool {
if len(a) != len(b) {
return false
}
ma := map[*apiv1.Pod]bool{}
for _, ea := range a {
ma[ea] = true
ma[ea.Pod] = true
}
for _, eb := range b {
if !ma[eb] {
if !ma[eb.Pod] {
return false
}
}
Expand Down Expand Up @@ -149,9 +150,9 @@ func TestEquivalenceGroupSizeLimiting(t *testing.T) {
UID: "12345678-1234-1234-1234-123456789012",
},
}
pods := make([]*apiv1.Pod, 0, maxEquivalenceGroupsByController+1)
pods := make([]*clustersnapshot.PodResourceInfo, 0, maxEquivalenceGroupsByController+1)
for i := 0; i < maxEquivalenceGroupsByController+1; i += 1 {
p := BuildTestPod(fmt.Sprintf("p%d", i), 3000, 200000)
p := buildTestPod(fmt.Sprintf("p%d", i), 3000, 200000)
p.OwnerReferences = GenerateOwnerReferences(rc.Name, "ReplicationController", "extensions/v1beta1", rc.UID)
label := fmt.Sprintf("l%d", i)
if i > maxEquivalenceGroupsByController {
Expand All @@ -176,11 +177,15 @@ func TestEquivalenceGroupIgnoresDaemonSets(t *testing.T) {
UID: "12345678-1234-1234-1234-123456789012",
},
}
pods := make([]*apiv1.Pod, 2)
pods[0] = BuildTestPod("p1", 3000, 200000)
pods := make([]*clustersnapshot.PodResourceInfo, 2)
pods[0] = buildTestPod("p1", 3000, 200000)
pods[0].OwnerReferences = GenerateOwnerReferences(ds.Name, "DaemonSet", "apps/v1", ds.UID)
pods[1] = BuildTestPod("p2", 3000, 200000)
pods[1] = buildTestPod("p2", 3000, 200000)
pods[1].OwnerReferences = GenerateOwnerReferences(ds.Name, "DaemonSet", "apps/v1", ds.UID)
podGroups := groupPodsBySchedulingProperties(pods)
assert.Equal(t, 2, len(podGroups))
}

func buildTestPod(name string, cpu, mem int64) *clustersnapshot.PodResourceInfo {
return &clustersnapshot.PodResourceInfo{Pod: BuildTestPod(name, cpu, mem)}
}
12 changes: 6 additions & 6 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (o *ScaleUpOrchestrator) Initialize(
// an unexpected error occurred. Assumes that all nodes in the cluster are ready
// and in sync with instance groups.
func (o *ScaleUpOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
unschedulablePods []*clustersnapshot.PodResourceInfo,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
Expand Down Expand Up @@ -579,7 +579,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
var schedulablePodGroups []estimator.PodEquivalenceGroup
for _, eg := range podEquivalenceGroups {
samplePod := eg.Pods[0]
if err := o.autoscalingContext.PredicateChecker.CheckPredicates(o.autoscalingContext.ClusterSnapshot, samplePod, nodeInfo.Node().Name); err == nil {
if err := o.autoscalingContext.PredicateChecker.CheckPredicates(o.autoscalingContext.ClusterSnapshot, samplePod.Pod, nodeInfo.Node().Name); err == nil {
// Add pods to option.
schedulablePodGroups = append(schedulablePodGroups, estimator.PodEquivalenceGroup{
Pods: eg.Pods,
Expand Down Expand Up @@ -758,10 +758,10 @@ func (o *ScaleUpOrchestrator) ComputeSimilarNodeGroups(
func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, similarPodGroups []estimator.PodEquivalenceGroup) bool {
schedulableSamplePods := make(map[*apiv1.Pod]bool)
for _, podGroup := range similarPodGroups {
schedulableSamplePods[podGroup.Exemplar()] = true
schedulableSamplePods[podGroup.Exemplar().Pod] = true
}
for _, podGroup := range podGroups {
if _, found := schedulableSamplePods[podGroup.Exemplar()]; !found {
if _, found := schedulableSamplePods[podGroup.Exemplar().Pod]; !found {
return false
}
}
Expand Down Expand Up @@ -801,7 +801,7 @@ func GetRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Rea
}
for _, pod := range eg.Pods {
noScaleUpInfo := status.NoScaleUpInfo{
Pod: pod,
Pod: pod.Pod,
RejectedNodeGroups: eg.SchedulingErrors,
SkippedNodeGroups: skipped,
}
Expand All @@ -819,7 +819,7 @@ func GetPodsAwaitingEvaluation(egs []*equivalence.PodGroup, bestOption string) [
if eg.Schedulable {
if _, found := eg.SchedulingErrors[bestOption]; found {
// Schedulable, but not yet.
awaitsEvaluation = append(awaitsEvaluation, eg.Pods...)
awaitsEvaluation = append(awaitsEvaluation, clustersnapshot.ToPods(eg.Pods)...)
}
}
}
Expand Down
Loading

0 comments on commit 48be6fd

Please sign in to comment.