Skip to content

Commit

Permalink
Merge pull request #5131 from x13n/scaledown2
Browse files Browse the repository at this point in the history
Allow simulator to persist changes in cluster snapshot
  • Loading branch information
k8s-ci-robot authored Sep 15, 2022
2 parents b042aae + 2854870 commit ab08e9a
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 291 deletions.
213 changes: 16 additions & 197 deletions cluster-autoscaler/core/scaledown/legacy/legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/actuation"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/deletiontracker"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/eligibility"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/resource"
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/unremovable"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
Expand All @@ -47,187 +45,6 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type scaleDownResourcesLimits map[string]int64
type scaleDownResourcesDelta map[string]int64

// used as a value in scaleDownResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider
const scaleDownLimitUnknown = math.MinInt64

func (sd *ScaleDown) computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *cloudprovider.ResourceLimiter, cp cloudprovider.CloudProvider, timestamp time.Time) scaleDownResourcesLimits {
totalCores, totalMem := calculateScaleDownCoresMemoryTotal(nodes, timestamp)

var totalResources map[string]int64
var totalResourcesErr error
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
totalResources, totalResourcesErr = sd.calculateScaleDownCustomResourcesTotal(nodes, cp, timestamp)
}

resultScaleDownLimits := make(scaleDownResourcesLimits)
for _, resource := range resourceLimiter.GetResources() {
min := resourceLimiter.GetMin(resource)

// we put only actual limits into final map. No entry means no limit.
if min > 0 {
switch {
case resource == cloudprovider.ResourceNameCores:
resultScaleDownLimits[resource] = computeAboveMin(totalCores, min)
case resource == cloudprovider.ResourceNameMemory:
resultScaleDownLimits[resource] = computeAboveMin(totalMem, min)
case cloudprovider.IsCustomResource(resource):
if totalResourcesErr != nil {
resultScaleDownLimits[resource] = scaleDownLimitUnknown
} else {
resultScaleDownLimits[resource] = computeAboveMin(totalResources[resource], min)
}
default:
klog.Errorf("Scale down limits defined for unsupported resource '%s'", resource)
}
}
}
return resultScaleDownLimits
}

func computeAboveMin(total int64, min int64) int64 {
if total > min {
return total - min
}
return 0

}

func calculateScaleDownCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time) (int64, int64) {
var coresTotal, memoryTotal int64
for _, node := range nodes {
if actuation.IsNodeBeingDeleted(node, timestamp) {
// Nodes being deleted do not count towards total cluster resources
continue
}
cores, memory := core_utils.GetNodeCoresAndMemory(node)

coresTotal += cores
memoryTotal += memory
}

return coresTotal, memoryTotal
}

func (sd *ScaleDown) calculateScaleDownCustomResourcesTotal(nodes []*apiv1.Node, cp cloudprovider.CloudProvider, timestamp time.Time) (map[string]int64, error) {
result := make(map[string]int64)
ngCache := make(map[string][]customresources.CustomResourceTarget)
for _, node := range nodes {
if actuation.IsNodeBeingDeleted(node, timestamp) {
// Nodes being deleted do not count towards total cluster resources
continue
}
nodeGroup, err := cp.NodeGroupForNode(node)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("can not get node group for node %v when calculating cluster gpu usage", node.Name)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
// We do not trust cloud providers to return properly constructed nil for interface type - hence the reflection check.
// See https://golang.org/doc/faq#nil_error
// TODO[lukaszos] consider creating cloud_provider sanitizer which will wrap cloud provider and ensure sane behaviour.
nodeGroup = nil
}

var resourceTargets []customresources.CustomResourceTarget
var cacheHit bool

if nodeGroup != nil {
resourceTargets, cacheHit = ngCache[nodeGroup.Id()]
}
if !cacheHit {
resourceTargets, err = sd.processors.CustomResourcesProcessor.GetNodeResourceTargets(sd.context, node, nodeGroup)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("can not get gpu count for node %v when calculating cluster gpu usage")
}
if nodeGroup != nil {
ngCache[nodeGroup.Id()] = resourceTargets
}
}

for _, resourceTarget := range resourceTargets {
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
continue
}
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount
}
}

return result, nil
}

func noScaleDownLimitsOnResources() scaleDownResourcesLimits {
return nil
}

func copyScaleDownResourcesLimits(source scaleDownResourcesLimits) scaleDownResourcesLimits {
copy := scaleDownResourcesLimits{}
for k, v := range source {
copy[k] = v
}
return copy
}

func (sd *ScaleDown) computeScaleDownResourcesDelta(cp cloudprovider.CloudProvider, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, resourcesWithLimits []string) (scaleDownResourcesDelta, errors.AutoscalerError) {
resultScaleDownDelta := make(scaleDownResourcesDelta)

nodeCPU, nodeMemory := core_utils.GetNodeCoresAndMemory(node)
resultScaleDownDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleDownDelta[cloudprovider.ResourceNameMemory] = nodeMemory

if cloudprovider.ContainsCustomResources(resourcesWithLimits) {
resourceTargets, err := sd.processors.CustomResourcesProcessor.GetNodeResourceTargets(sd.context, node, nodeGroup)
if err != nil {
return scaleDownResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node %v custom resources: %v", node.Name)
}
for _, resourceTarget := range resourceTargets {
resultScaleDownDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount
}
}
return resultScaleDownDelta, nil
}

type scaleDownLimitsCheckResult struct {
exceeded bool
exceededResources []string
}

func scaleDownLimitsNotExceeded() scaleDownLimitsCheckResult {
return scaleDownLimitsCheckResult{false, []string{}}
}

func (limits *scaleDownResourcesLimits) checkScaleDownDeltaWithinLimits(delta scaleDownResourcesDelta) scaleDownLimitsCheckResult {
exceededResources := sets.NewString()
for resource, resourceDelta := range delta {
resourceLeft, found := (*limits)[resource]
if found {
if (resourceDelta > 0) && (resourceLeft == scaleDownLimitUnknown || resourceDelta > resourceLeft) {
exceededResources.Insert(resource)
}
}
}
if len(exceededResources) > 0 {
return scaleDownLimitsCheckResult{true, exceededResources.List()}
}

return scaleDownLimitsNotExceeded()
}

func (limits *scaleDownResourcesLimits) tryDecrementLimitsByDelta(delta scaleDownResourcesDelta) scaleDownLimitsCheckResult {
result := limits.checkScaleDownDeltaWithinLimits(delta)
if result.exceeded {
return result
}
for resource, resourceDelta := range delta {
resourceLeft, found := (*limits)[resource]
if found {
(*limits)[resource] = resourceLeft - resourceDelta
}
}
return scaleDownLimitsNotExceeded()
}

// ScaleDown is responsible for maintaining the state needed to perform unneeded node removals.
type ScaleDown struct {
context *context.AutoscalingContext
Expand All @@ -242,12 +59,13 @@ type ScaleDown struct {
nodeDeletionTracker *deletiontracker.NodeDeletionTracker
removalSimulator *simulator.RemovalSimulator
eligibilityChecker *eligibility.Checker
resourceLimitsFinder *resource.LimitsFinder
}

// NewScaleDown builds new ScaleDown object.
func NewScaleDown(context *context.AutoscalingContext, processors *processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, ndt *deletiontracker.NodeDeletionTracker) *ScaleDown {
usageTracker := simulator.NewUsageTracker()
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker)
removalSimulator := simulator.NewRemovalSimulator(context.ListerRegistry, context.ClusterSnapshot, context.PredicateChecker, usageTracker, false)
unremovableNodes := unremovable.NewNodes()
return &ScaleDown{
context: context,
Expand All @@ -262,6 +80,7 @@ func NewScaleDown(context *context.AutoscalingContext, processors *processors.Au
nodeDeletionTracker: ndt,
removalSimulator: removalSimulator,
eligibilityChecker: eligibility.NewChecker(processors.NodeGroupConfigProcessor),
resourceLimitsFinder: resource.NewLimitsFinder(processors.CustomResourcesProcessor),
}
}

Expand Down Expand Up @@ -513,7 +332,7 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
return nil, nil, status.ScaleDownError, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
}

scaleDownResourcesLeft := sd.computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime)
scaleDownResourcesLeft := sd.resourceLimitsFinder.LimitsLeft(sd.context, nodesWithoutMaster, resourceLimiter, currentTime)

nodeGroupSize := utils.GetNodeGroupSizeMap(sd.context.CloudProvider)
resourcesWithLimits := resourceLimiter.GetResources()
Expand Down Expand Up @@ -588,18 +407,18 @@ func (sd *ScaleDown) NodesToDelete(currentTime time.Time, pdbs []*policyv1.PodDi
continue
}

scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
scaleDownResourcesDelta, err := sd.resourceLimitsFinder.DeltaForNode(sd.context, node, nodeGroup, resourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
sd.unremovableNodes.AddReason(node, simulator.UnexpectedError)
continue
}

checkResult := scaleDownResourcesLeft.checkScaleDownDeltaWithinLimits(scaleDownResourcesDelta)
if checkResult.exceeded {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.exceededResources)
checkResult := scaleDownResourcesLeft.CheckDeltaWithinLimits(scaleDownResourcesDelta)
if checkResult.Exceeded() {
klog.V(4).Infof("Skipping %s - minimal limit exceeded for %v", node.Name, checkResult.ExceededResources)
sd.unremovableNodes.AddReason(node, simulator.MinimalResourceLimitExceeded)
for _, resource := range checkResult.exceededResources {
for _, resource := range checkResult.ExceededResources {
switch resource {
case cloudprovider.ResourceNameCores:
metrics.RegisterSkippedScaleDownCPU()
Expand Down Expand Up @@ -674,18 +493,18 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration
}

func (sd *ScaleDown) getEmptyNodesToRemoveNoResourceLimits(candidates []string, timestamp time.Time) []simulator.NodeToBeRemoved {
return sd.getEmptyNodesToRemove(candidates, noScaleDownLimitsOnResources(), timestamp)
return sd.getEmptyNodesToRemove(candidates, resource.NoLimits(), timestamp)
}

// This functions finds empty nodes among passed candidates and returns a list of empty nodes
// that can be deleted at the same time.
func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits scaleDownResourcesLimits,
func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits resource.Limits,
timestamp time.Time) []simulator.NodeToBeRemoved {

emptyNodes := sd.removalSimulator.FindEmptyNodesToRemove(candidates, timestamp)
availabilityMap := make(map[string]int)
nodesToRemove := make([]simulator.NodeToBeRemoved, 0)
resourcesLimitsCopy := copyScaleDownResourcesLimits(resourcesLimits) // we do not want to modify input parameter
resourcesLimitsCopy := resourcesLimits.DeepCopy() // we do not want to modify input parameter
resourcesNames := sets.StringKeySet(resourcesLimits).List()
for _, nodeName := range emptyNodes {
nodeInfo, err := sd.context.ClusterSnapshot.NodeInfos().Get(nodeName)
Expand Down Expand Up @@ -719,13 +538,13 @@ func (sd *ScaleDown) getEmptyNodesToRemove(candidates []string, resourcesLimits
availabilityMap[nodeGroup.Id()] = available
}
if available > 0 {
resourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesNames)
resourcesDelta, err := sd.resourceLimitsFinder.DeltaForNode(sd.context, node, nodeGroup, resourcesNames)
if err != nil {
klog.Errorf("Error: %v", err)
continue
}
checkResult := resourcesLimitsCopy.tryDecrementLimitsByDelta(resourcesDelta)
if checkResult.exceeded {
checkResult := resourcesLimitsCopy.TryDecrementBy(resourcesDelta)
if checkResult.Exceeded() {
continue
}
available--
Expand Down
80 changes: 0 additions & 80 deletions cluster-autoscaler/core/scaledown/legacy/legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,37 +1249,6 @@ func getCountOfChan(c chan string) int {
}
}

func TestCalculateCoresAndMemoryTotal(t *testing.T) {
nodeConfigs := []NodeConfig{
{"n1", 2000, 7500 * utils.MiB, 0, true, "ng1"},
{"n2", 2000, 7500 * utils.MiB, 0, true, "ng1"},
{"n3", 2000, 7500 * utils.MiB, 0, true, "ng1"},
{"n4", 12000, 8000 * utils.MiB, 0, true, "ng1"},
{"n5", 16000, 7500 * utils.MiB, 0, true, "ng1"},
{"n6", 8000, 6000 * utils.MiB, 0, true, "ng1"},
{"n7", 6000, 16000 * utils.MiB, 0, true, "ng1"},
}
nodes := make([]*apiv1.Node, len(nodeConfigs))
for i, n := range nodeConfigs {
node := BuildTestNode(n.Name, n.Cpu, n.Memory)
SetNodeReadyState(node, n.Ready, time.Now())
nodes[i] = node
}

nodes[6].Spec.Taints = []apiv1.Taint{
{
Key: deletetaint.ToBeDeletedTaint,
Value: fmt.Sprint(time.Now().Unix()),
Effect: apiv1.TaintEffectNoSchedule,
},
}

coresTotal, memoryTotal := calculateScaleDownCoresMemoryTotal(nodes, time.Now())

assert.Equal(t, int64(42), coresTotal)
assert.Equal(t, int64(44000*utils.MiB), memoryTotal)
}

func TestFilterOutMasters(t *testing.T) {
nodeConfigs := []NodeConfig{
{"n1", 2000, 4000, 0, false, "ng1"},
Expand Down Expand Up @@ -1333,55 +1302,6 @@ func TestFilterOutMasters(t *testing.T) {
assertEqualSet(t, []string{"n1", "n2", "n4", "n5", "n6"}, withoutMastersNames)
}

func TestCheckScaleDownDeltaWithinLimits(t *testing.T) {
type testcase struct {
limits scaleDownResourcesLimits
delta scaleDownResourcesDelta
exceededResources []string
}
tests := []testcase{
{
limits: scaleDownResourcesLimits{"a": 10},
delta: scaleDownResourcesDelta{"a": 10},
exceededResources: []string{},
},
{
limits: scaleDownResourcesLimits{"a": 10},
delta: scaleDownResourcesDelta{"a": 11},
exceededResources: []string{"a"},
},
{
limits: scaleDownResourcesLimits{"a": 10},
delta: scaleDownResourcesDelta{"b": 10},
exceededResources: []string{},
},
{
limits: scaleDownResourcesLimits{"a": scaleDownLimitUnknown},
delta: scaleDownResourcesDelta{"a": 0},
exceededResources: []string{},
},
{
limits: scaleDownResourcesLimits{"a": scaleDownLimitUnknown},
delta: scaleDownResourcesDelta{"a": 1},
exceededResources: []string{"a"},
},
{
limits: scaleDownResourcesLimits{"a": 10, "b": 20, "c": 30},
delta: scaleDownResourcesDelta{"a": 11, "b": 20, "c": 31},
exceededResources: []string{"a", "c"},
},
}

for _, test := range tests {
checkResult := test.limits.checkScaleDownDeltaWithinLimits(test.delta)
if len(test.exceededResources) == 0 {
assert.Equal(t, scaleDownLimitsNotExceeded(), checkResult)
} else {
assert.Equal(t, scaleDownLimitsCheckResult{true, test.exceededResources}, checkResult)
}
}
}

func generateReplicaSets() []*appsv1.ReplicaSet {
replicas := int32(5)
return []*appsv1.ReplicaSet{
Expand Down
Loading

0 comments on commit ab08e9a

Please sign in to comment.