Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cluster-autoscaler-release-1.30] Add support for all-or-nothing scale-up strategy #7015

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cluster-autoscaler/core/scaleup/equivalence/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

// PodGroup contains a group of pods that are equivalent in terms of schedulability.
type PodGroup struct {
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
Schedulable bool
Pods []*apiv1.Pod
SchedulingErrors map[string]status.Reasons
SchedulableGroups []string
Schedulable bool
}

// BuildPodGroups prepares pod groups with equivalent scheduling properties.
Expand Down
25 changes: 20 additions & 5 deletions cluster-autoscaler/core/scaleup/orchestrator/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,20 @@ func (e *scaleUpExecutor) ExecuteScaleUps(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
options := e.autoscalingContext.AutoscalingOptions
if options.ParallelScaleUp {
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now)
return e.executeScaleUpsParallel(scaleUpInfos, nodeInfos, now, atomic)
}
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now)
return e.executeScaleUpsSync(scaleUpInfos, nodeInfos, now, atomic)
}

func (e *scaleUpExecutor) executeScaleUpsSync(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
availableGPUTypes := e.autoscalingContext.CloudProvider.GetAvailableGPUTypes()
for _, scaleUpInfo := range scaleUpInfos {
Expand All @@ -81,7 +83,7 @@ func (e *scaleUpExecutor) executeScaleUpsSync(
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", scaleUpInfo.Group.Id())
continue
}
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now); aErr != nil {
if aErr := e.executeScaleUp(scaleUpInfo, nodeInfo, availableGPUTypes, now, atomic); aErr != nil {
return aErr, []cloudprovider.NodeGroup{scaleUpInfo.Group}
}
}
Expand All @@ -92,6 +94,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
atomic bool,
) (errors.AutoscalerError, []cloudprovider.NodeGroup) {
if err := checkUniqueNodeGroups(scaleUpInfos); err != nil {
return err, extractNodeGroups(scaleUpInfos)
Expand All @@ -113,7 +116,7 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
klog.Errorf("ExecuteScaleUp: failed to get node info for node group %s", info.Group.Id())
return
}
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now); aErr != nil {
if aErr := e.executeScaleUp(info, nodeInfo, availableGPUTypes, now, atomic); aErr != nil {
errResults <- errResult{err: aErr, info: &info}
}
}(scaleUpInfo)
Expand All @@ -136,19 +139,31 @@ func (e *scaleUpExecutor) executeScaleUpsParallel(
return nil, nil
}

func (e *scaleUpExecutor) increaseSize(nodeGroup cloudprovider.NodeGroup, increase int, atomic bool) error {
if atomic {
if err := nodeGroup.AtomicIncreaseSize(increase); err != cloudprovider.ErrNotImplemented {
return err
}
// If error is cloudprovider.ErrNotImplemented, fall back to non-atomic
// increase - cloud provider doesn't support it.
}
return nodeGroup.IncreaseSize(increase)
}

func (e *scaleUpExecutor) executeScaleUp(
info nodegroupset.ScaleUpInfo,
nodeInfo *schedulerframework.NodeInfo,
availableGPUTypes map[string]struct{},
now time.Time,
atomic bool,
) errors.AutoscalerError {
gpuConfig := e.autoscalingContext.CloudProvider.GetNodeGpuConfig(nodeInfo.Node())
gpuResourceName, gpuType := gpu.GetGpuInfoForMetrics(gpuConfig, availableGPUTypes, nodeInfo.Node(), nil)
klog.V(0).Infof("Scale-up: setting group %s size to %d", info.Group.Id(), info.NewSize)
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaledUpGroup",
"Scale-up: setting group %s size to %d instead of %d (max: %d)", info.Group.Id(), info.NewSize, info.CurrentSize, info.MaxSize)
increase := info.NewSize - info.CurrentSize
if err := info.Group.IncreaseSize(increase); err != nil {
if err := e.increaseSize(info.Group, increase, atomic); err != nil {
e.autoscalingContext.LogRecorder.Eventf(apiv1.EventTypeWarning, "FailedToScaleUpGroup", "Scale-up failed for group %s: %v", info.Group.Id(), err)
aerr := errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("failed to increase node group size: ")
e.scaleStateNotifier.RegisterFailedScaleUp(info.Group, string(aerr.Type()), aerr.Error(), gpuResourceName, gpuType, now)
Expand Down
79 changes: 75 additions & 4 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool, // Either request enough capacity for all unschedulablePods, or don't request it at all.
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !o.initialized {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
Expand Down Expand Up @@ -146,11 +147,13 @@ func (o *ScaleUpOrchestrator) ScaleUp(
}

for _, nodeGroup := range validNodeGroups {
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now)
option := o.ComputeExpansionOption(nodeGroup, schedulablePodGroups, nodeInfos, len(nodes)+len(upcomingNodes), now, allOrNothing)
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingContext, nodeGroup.Id())

if len(option.Pods) == 0 || option.NodeCount == 0 {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
} else if allOrNothing && len(option.Pods) < len(unschedulablePods) {
klog.V(4).Infof("Some pods can't fit to %s, giving up due to all-or-nothing scale-up strategy", nodeGroup.Id())
} else {
options = append(options, option)
}
Expand Down Expand Up @@ -211,9 +214,26 @@ func (o *ScaleUpOrchestrator) ScaleUp(
aErr)
}

if newNodes < bestOption.NodeCount {
klog.V(1).Infof("Only %d nodes can be added to %s due to cluster-wide limits", newNodes, bestOption.NodeGroup.Id())
if allOrNothing {
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
}

// If necessary, create the node group. This is no longer simulation, an empty node group will be created by cloud provider if supported.
createNodeGroupResults := make([]nodegroups.CreateNodeGroupResult, 0)
if !bestOption.NodeGroup.Exist() {
if allOrNothing && bestOption.NodeGroup.MaxSize() < newNodes {
klog.V(1).Infof("Can only create a new node group with max %d nodes, need %d nodes", bestOption.NodeGroup.MaxSize(), newNodes)
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
var scaleUpStatus *status.ScaleUpStatus
createNodeGroupResults, scaleUpStatus, aErr = o.CreateNodeGroup(bestOption, nodeInfos, schedulablePodGroups, podEquivalenceGroups, daemonSets)
if aErr != nil {
Expand Down Expand Up @@ -256,9 +276,24 @@ func (o *ScaleUpOrchestrator) ScaleUp(
aErr)
}

// Last check before scale-up. Node group capacity (both due to max size limits & current size) is only checked when balancing.
totalCapacity := 0
for _, sui := range scaleUpInfos {
totalCapacity += sui.NewSize - sui.CurrentSize
}
if totalCapacity < newNodes {
klog.V(1).Infof("Can only add %d nodes due to node group limits, need %d nodes", totalCapacity, newNodes)
if allOrNothing {
// Can't execute a scale-up that will accommodate all pods, so nothing is considered schedulable.
klog.V(1).Info("Not attempting scale-up due to all-or-nothing strategy: not all pods would be accommodated")
markedEquivalenceGroups := markAllGroupsAsUnschedulable(podEquivalenceGroups, AllOrNothingReason)
return buildNoOptionsAvailableStatus(markedEquivalenceGroups, skippedNodeGroups, nodeGroups), nil
}
}

// Execute scale up.
klog.V(1).Infof("Final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now, allOrNothing)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
Expand Down Expand Up @@ -364,7 +399,7 @@ func (o *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
}

klog.V(1).Infof("ScaleUpToNodeGroupMinSize: final scale-up plan: %v", scaleUpInfos)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now)
aErr, failedNodeGroups := o.scaleUpExecutor.ExecuteScaleUps(scaleUpInfos, nodeInfos, now, false /* allOrNothing disabled */)
if aErr != nil {
return status.UpdateScaleUpError(
&status.ScaleUpStatus{
Expand Down Expand Up @@ -447,6 +482,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
nodeInfos map[string]*schedulerframework.NodeInfo,
currentNodeCount int,
now time.Time,
allOrNothing bool,
) expander.Option {
option := expander.Option{NodeGroup: nodeGroup}
podGroups := schedulablePodGroups[nodeGroup.Id()]
Expand All @@ -471,11 +507,22 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
if err != nil && err != cloudprovider.ErrNotImplemented {
klog.Errorf("Failed to get autoscaling options for node group %s: %v", nodeGroup.Id(), err)
}

// Special handling for groups that only scale from zero to max.
if autoscalingOptions != nil && autoscalingOptions.ZeroOrMaxNodeScaling {
if option.NodeCount > 0 && option.NodeCount != nodeGroup.MaxSize() {
// For zero-or-max scaling groups, the only valid value of node count is node group's max size.
if allOrNothing && option.NodeCount > nodeGroup.MaxSize() {
// We would have to cap the node count, which means not all pods will be
// accommodated. This violates the principle of all-or-nothing strategy.
option.Pods = nil
option.NodeCount = 0
}
if option.NodeCount > 0 {
// Cap or increase the number of nodes to the only valid value - node group's max size.
option.NodeCount = nodeGroup.MaxSize()
}
}

return option
}

Expand Down Expand Up @@ -564,6 +611,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
})
// Mark pod group as (theoretically) schedulable.
eg.Schedulable = true
eg.SchedulableGroups = append(eg.SchedulableGroups, nodeGroup.Id())
} else {
klog.V(2).Infof("Pod %s/%s can't be scheduled on %s, predicate checking error: %v", samplePod.Namespace, samplePod.Name, nodeGroup.Id(), err.VerboseMessage())
if podCount := len(eg.Pods); podCount > 1 {
Expand Down Expand Up @@ -709,6 +757,29 @@ func matchingSchedulablePodGroups(podGroups []estimator.PodEquivalenceGroup, sim
return true
}

func markAllGroupsAsUnschedulable(egs []*equivalence.PodGroup, reason status.Reasons) []*equivalence.PodGroup {
for _, eg := range egs {
if eg.Schedulable {
if eg.SchedulingErrors == nil {
eg.SchedulingErrors = map[string]status.Reasons{}
}
for _, sg := range eg.SchedulableGroups {
eg.SchedulingErrors[sg] = reason
}
eg.Schedulable = false
}
}
return egs
}

func buildNoOptionsAvailableStatus(egs []*equivalence.PodGroup, skipped map[string]status.Reasons, ngs []cloudprovider.NodeGroup) *status.ScaleUpStatus {
return &status.ScaleUpStatus{
Result: status.ScaleUpNoOptionsAvailable,
PodsRemainUnschedulable: GetRemainingPods(egs, skipped),
ConsideredNodeGroups: ngs,
}
}

// GetRemainingPods returns information about pods which CA is unable to help
// at this moment.
func GetRemainingPods(egs []*equivalence.PodGroup, skipped map[string]status.Reasons) []status.NoScaleUpInfo {
Expand Down
45 changes: 38 additions & 7 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,37 @@ func TestNoCreateNodeGroupMaxCoresLimitHit(t *testing.T) {
simpleNoScaleUpTest(t, config, results)
}

func TestAllOrNothing(t *testing.T) {
options := defaultOptions

extraPods := []PodConfig{}
extraPodNames := []string{}
for i := 0; i < 11; i++ {
podName := fmt.Sprintf("pod-%d", i)
extraPods = append(extraPods, PodConfig{Name: podName, Cpu: 1000, Memory: 100})
extraPodNames = append(extraPodNames, podName)
}

config := &ScaleUpTestConfig{
Nodes: []NodeConfig{
{Name: "n1", Cpu: 1000, Memory: 1000, Gpu: 0, Ready: true, Group: "ng"},
},
Pods: []PodConfig{},
ExtraPods: extraPods,
Options: &options,
AllOrNothing: true,
}

result := &ScaleTestResults{
NoScaleUpReason: "all-or-nothing",
ScaleUpStatus: ScaleUpStatusInfo{
PodsRemainUnschedulable: extraPodNames,
},
}

simpleNoScaleUpTest(t, config, result)
}

func simpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig, expectedResults *ScaleTestResults) {
results := runSimpleScaleUpTest(t, config)
assert.NotNil(t, results.GroupSizeChanges, "Expected scale up event")
Expand Down Expand Up @@ -1032,7 +1063,7 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR
context.ExpanderStrategy = expander

// scale up
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, scaleUpErr := orchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos, config.AllOrNothing)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

// aggregate group size changes
Expand Down Expand Up @@ -1131,7 +1162,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)

assert.NoError(t, err)
// Node group is unhealthy.
Expand Down Expand Up @@ -1185,7 +1216,7 @@ func TestBinpackingLimiter(t *testing.T) {
expander := NewMockRepotingStrategy(t, nil)
context.ExpanderStrategy = expander

scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{extraPod}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1231,7 +1262,7 @@ func TestScaleUpNoHelp(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -1453,7 +1484,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
processors := NewTestProcessors(&context)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)

assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -1515,7 +1546,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down Expand Up @@ -1570,7 +1601,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {

suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, newEstimatorBuilder(), taints.TaintConfig{})
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos, false)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down
37 changes: 37 additions & 0 deletions cluster-autoscaler/core/scaleup/orchestrator/rejectedreasons.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package orchestrator

// RejectedReasons contains information why given node group was rejected as a scale-up option.
type RejectedReasons struct {
messages []string
}

// NewRejectedReasons creates new RejectedReason object.
func NewRejectedReasons(m string) *RejectedReasons {
return &RejectedReasons{[]string{m}}
}

// Reasons returns a slice of reasons why the node group was not considered for scale up.
func (sr *RejectedReasons) Reasons() []string {
return sr.messages
}

var (
// AllOrNothingReason means the node group was rejected because not all pods would fit it when using all-or-nothing strategy.
AllOrNothingReason = NewRejectedReasons("not all pods would fit and scale-up is using all-or-nothing strategy")
)
1 change: 1 addition & 0 deletions cluster-autoscaler/core/scaleup/scaleup.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Orchestrator interface {
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
allOrNothing bool,
) (*status.ScaleUpStatus, errors.AutoscalerError)
// ScaleUpToNodeGroupMinSize tries to scale up node groups that have less nodes
// than the configured min size. The source of truth for the current node group
Expand Down
Loading
Loading