Skip to content

Commit

Permalink
Separate and refactor custom resources logic
Browse files Browse the repository at this point in the history
  • Loading branch information
BigDarkClown authored and aksentyev committed Apr 9, 2021
1 parent be3ea4c commit cd7369b
Show file tree
Hide file tree
Showing 11 changed files with 414 additions and 275 deletions.
10 changes: 5 additions & 5 deletions cluster-autoscaler/cloudprovider/cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,17 +273,17 @@ const (
ResourceNameMemory = "memory"
)

// IsGpuResource checks if given resource name point denotes a gpu type
func IsGpuResource(resourceName string) bool {
// IsCustomResource checks if given resource name point denotes a gpu type
func IsCustomResource(resourceName string) bool {
// hack: we assume anything which is not cpu/memory to be a gpu.
// we are not getting anything more that a map string->limits from the user
return resourceName != ResourceNameCores && resourceName != ResourceNameMemory
}

// ContainsGpuResources returns true iff given list contains any resource name denoting a gpu type
func ContainsGpuResources(resources []string) bool {
// ContainsCustomResources returns true iff given list contains any custom resource name
func ContainsCustomResources(resources []string) bool {
for _, resource := range resources {
if IsGpuResource(resource) {
if IsCustomResource(resource) {
return true
}
}
Expand Down
69 changes: 32 additions & 37 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
Expand Down Expand Up @@ -171,13 +172,13 @@ type scaleDownResourcesDelta map[string]int64
// used as a value in scaleDownResourcesLimits if actual limit could not be obtained due to errors talking to cloud provider
const scaleDownLimitUnknown = math.MinInt64

func computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *cloudprovider.ResourceLimiter, cp cloudprovider.CloudProvider, timestamp time.Time) scaleDownResourcesLimits {
func (sd *ScaleDown) computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *cloudprovider.ResourceLimiter, cp cloudprovider.CloudProvider, timestamp time.Time) scaleDownResourcesLimits {
totalCores, totalMem := calculateScaleDownCoresMemoryTotal(nodes, timestamp)

var totalGpus map[string]int64
var totalGpusErr error
if cloudprovider.ContainsGpuResources(resourceLimiter.GetResources()) {
totalGpus, totalGpusErr = calculateScaleDownGpusTotal(nodes, cp, timestamp)
var totalResources map[string]int64
var totalResourcesErr error
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
totalResources, totalResourcesErr = sd.calculateScaleDownCustomResourcesTotal(nodes, cp, timestamp)
}

resultScaleDownLimits := make(scaleDownResourcesLimits)
Expand All @@ -191,11 +192,11 @@ func computeScaleDownResourcesLeftLimits(nodes []*apiv1.Node, resourceLimiter *c
resultScaleDownLimits[resource] = computeAboveMin(totalCores, min)
case resource == cloudprovider.ResourceNameMemory:
resultScaleDownLimits[resource] = computeAboveMin(totalMem, min)
case cloudprovider.IsGpuResource(resource):
if totalGpusErr != nil {
case cloudprovider.IsCustomResource(resource):
if totalResourcesErr != nil {
resultScaleDownLimits[resource] = scaleDownLimitUnknown
} else {
resultScaleDownLimits[resource] = computeAboveMin(totalGpus[resource], min)
resultScaleDownLimits[resource] = computeAboveMin(totalResources[resource], min)
}
default:
klog.Errorf("Scale down limits defined for unsupported resource '%s'", resource)
Expand Down Expand Up @@ -229,14 +230,9 @@ func calculateScaleDownCoresMemoryTotal(nodes []*apiv1.Node, timestamp time.Time
return coresTotal, memoryTotal
}

func calculateScaleDownGpusTotal(nodes []*apiv1.Node, cp cloudprovider.CloudProvider, timestamp time.Time) (map[string]int64, error) {
type gpuInfo struct {
name string
count int64
}

func (sd *ScaleDown) calculateScaleDownCustomResourcesTotal(nodes []*apiv1.Node, cp cloudprovider.CloudProvider, timestamp time.Time) (map[string]int64, error) {
result := make(map[string]int64)
ngCache := make(map[string]gpuInfo)
ngCache := make(map[string][]customresources.CustomResourceTarget)
for _, node := range nodes {
if isNodeBeingDeleted(node, timestamp) {
// Nodes being deleted do not count towards total cluster resources
Expand All @@ -253,31 +249,28 @@ func calculateScaleDownGpusTotal(nodes []*apiv1.Node, cp cloudprovider.CloudProv
nodeGroup = nil
}

var gpuType string
var gpuCount int64

var cached gpuInfo
var resourceTargets []customresources.CustomResourceTarget
var cacheHit bool

if nodeGroup != nil {
cached, cacheHit = ngCache[nodeGroup.Id()]
if cacheHit {
gpuType = cached.name
gpuCount = cached.count
}
resourceTargets, cacheHit = ngCache[nodeGroup.Id()]
}
if !cacheHit {
gpuType, gpuCount, err = gpu.GetNodeTargetGpus(cp.GPULabel(), node, nodeGroup)
resourceTargets, err = sd.processors.CustomResourcesProcessor.GetNodeResourceTargets(sd.context, node, nodeGroup)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("can not get gpu count for node %v when calculating cluster gpu usage")
}
if nodeGroup != nil {
ngCache[nodeGroup.Id()] = gpuInfo{name: gpuType, count: gpuCount}
ngCache[nodeGroup.Id()] = resourceTargets
}
}
if gpuType == "" || gpuCount == 0 {
continue

for _, resourceTarget := range resourceTargets {
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
continue
}
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount
}
result[gpuType] += gpuCount
}

return result, nil
Expand All @@ -300,19 +293,21 @@ func copyScaleDownResourcesLimits(source scaleDownResourcesLimits) scaleDownReso
return copy
}

func computeScaleDownResourcesDelta(cp cloudprovider.CloudProvider, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, resourcesWithLimits []string) (scaleDownResourcesDelta, errors.AutoscalerError) {
func (sd *ScaleDown) computeScaleDownResourcesDelta(cp cloudprovider.CloudProvider, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup, resourcesWithLimits []string) (scaleDownResourcesDelta, errors.AutoscalerError) {
resultScaleDownDelta := make(scaleDownResourcesDelta)

nodeCPU, nodeMemory := core_utils.GetNodeCoresAndMemory(node)
resultScaleDownDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleDownDelta[cloudprovider.ResourceNameMemory] = nodeMemory

if cloudprovider.ContainsGpuResources(resourcesWithLimits) {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(cp.GPULabel(), node, nodeGroup)
if cloudprovider.ContainsCustomResources(resourcesWithLimits) {
resourceTargets, err := sd.processors.CustomResourcesProcessor.GetNodeResourceTargets(sd.context, node, nodeGroup)
if err != nil {
return scaleDownResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node %v gpu: %v", node.Name)
return scaleDownResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get node %v custom resources: %v", node.Name)
}
for _, resourceTarget := range resourceTargets {
resultScaleDownDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount
}
resultScaleDownDelta[gpuType] = gpuCount
}
return resultScaleDownDelta, nil
}
Expand Down Expand Up @@ -825,7 +820,7 @@ func (sd *ScaleDown) TryToScaleDown(
return scaleDownStatus, errors.ToAutoscalerError(errors.CloudProviderError, errCP)
}

scaleDownResourcesLeft := computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime)
scaleDownResourcesLeft := sd.computeScaleDownResourcesLeftLimits(nodesWithoutMaster, resourceLimiter, sd.context.CloudProvider, currentTime)

nodeGroupSize := utils.GetNodeGroupSizeMap(sd.context.CloudProvider)
resourcesWithLimits := resourceLimiter.GetResources()
Expand Down Expand Up @@ -900,7 +895,7 @@ func (sd *ScaleDown) TryToScaleDown(
continue
}

scaleDownResourcesDelta, err := computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
scaleDownResourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesWithLimits)
if err != nil {
klog.Errorf("Error getting node resources: %v", err)
sd.addUnremovableNodeReason(node, simulator.UnexpectedError)
Expand Down Expand Up @@ -1079,7 +1074,7 @@ func (sd *ScaleDown) getEmptyNodes(candidates []string, maxEmptyBulkDelete int,
availabilityMap[nodeGroup.Id()] = available
}
if available > 0 {
resourcesDelta, err := computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesNames)
resourcesDelta, err := sd.computeScaleDownResourcesDelta(sd.context.CloudProvider, node, nodeGroup, resourcesNames)
if err != nil {
klog.Errorf("Error: %v", err)
continue
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/scale_test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/processors"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
Expand Down Expand Up @@ -142,6 +143,7 @@ func NewTestProcessors() *processors.AutoscalingProcessors {
NodeGroupManager: nodegroups.NewDefaultNodeGroupManager(),
NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(),
NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(),
CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(),
}
}

Expand Down
65 changes: 39 additions & 26 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,18 @@ type scaleUpResourcesDelta map[string]int64
const scaleUpLimitUnknown = math.MaxInt64

func computeScaleUpResourcesLeftLimits(
cp cloudprovider.CloudProvider,
context *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
nodesFromNotAutoscaledGroups []*apiv1.Node,
resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesLimits, errors.AutoscalerError) {
totalCores, totalMem, errCoresMem := calculateScaleUpCoresMemoryTotal(nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)

var totalGpus map[string]int64
var totalGpusErr error
if cloudprovider.ContainsGpuResources(resourceLimiter.GetResources()) {
totalGpus, totalGpusErr = calculateScaleUpGpusTotal(cp.GPULabel(), nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)
var totalResources map[string]int64
var totalResourcesErr error
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
totalResources, totalResourcesErr = calculateScaleUpCustomResourcesTotal(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups)
}

resultScaleUpLimits := make(scaleUpResourcesLimits)
Expand Down Expand Up @@ -91,11 +92,11 @@ func computeScaleUpResourcesLeftLimits(
resultScaleUpLimits[resource] = computeBelowMax(totalMem, max)
}

case cloudprovider.IsGpuResource(resource):
if totalGpusErr != nil {
case cloudprovider.IsCustomResource(resource):
if totalResourcesErr != nil {
resultScaleUpLimits[resource] = scaleUpLimitUnknown
} else {
resultScaleUpLimits[resource] = computeBelowMax(totalGpus[resource], max)
resultScaleUpLimits[resource] = computeBelowMax(totalResources[resource], max)
}

default:
Expand Down Expand Up @@ -139,8 +140,9 @@ func calculateScaleUpCoresMemoryTotal(
return coresTotal, memoryTotal, nil
}

func calculateScaleUpGpusTotal(
GPULabel string,
func calculateScaleUpCustomResourcesTotal(
context *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
nodeGroups []cloudprovider.NodeGroup,
nodeInfos map[string]*schedulerframework.NodeInfo,
nodesFromNotAutoscaledGroups []*apiv1.Node) (map[string]int64, errors.AutoscalerError) {
Expand All @@ -156,23 +158,30 @@ func calculateScaleUpGpusTotal(
return nil, errors.NewAutoscalerError(errors.CloudProviderError, "No node info for: %s", nodeGroup.Id())
}
if currentSize > 0 {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(GPULabel, nodeInfo.Node(), nodeGroup)
resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, nodeInfo.Node(), nodeGroup)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id())
}
if gpuType == "" {
continue
for _, resourceTarget := range resourceTargets {
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
continue
}
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount * int64(currentSize)
}
result[gpuType] += gpuCount * int64(currentSize)
}
}

for _, node := range nodesFromNotAutoscaledGroups {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(GPULabel, node, nil)
resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, node, nil)
if err != nil {
return nil, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node gpus count for node %v:", node.Name)
}
result[gpuType] += gpuCount
for _, resourceTarget := range resourceTargets {
if resourceTarget.ResourceType == "" || resourceTarget.ResourceCount == 0 {
continue
}
result[resourceTarget.ResourceType] += resourceTarget.ResourceCount
}
}

return result, nil
Expand All @@ -185,19 +194,22 @@ func computeBelowMax(total int64, max int64) int64 {
return 0
}

func computeScaleUpResourcesDelta(cp cloudprovider.CloudProvider, nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup, resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesDelta, errors.AutoscalerError) {
func computeScaleUpResourcesDelta(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors,
nodeInfo *schedulerframework.NodeInfo, nodeGroup cloudprovider.NodeGroup, resourceLimiter *cloudprovider.ResourceLimiter) (scaleUpResourcesDelta, errors.AutoscalerError) {
resultScaleUpDelta := make(scaleUpResourcesDelta)

nodeCPU, nodeMemory := getNodeInfoCoresAndMemory(nodeInfo)
resultScaleUpDelta[cloudprovider.ResourceNameCores] = nodeCPU
resultScaleUpDelta[cloudprovider.ResourceNameMemory] = nodeMemory

if cloudprovider.ContainsGpuResources(resourceLimiter.GetResources()) {
gpuType, gpuCount, err := gpu.GetNodeTargetGpus(cp.GPULabel(), nodeInfo.Node(), nodeGroup)
if cloudprovider.ContainsCustomResources(resourceLimiter.GetResources()) {
resourceTargets, err := processors.CustomResourcesProcessor.GetNodeResourceTargets(context, nodeInfo.Node(), nodeGroup)
if err != nil {
return scaleUpResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target gpu for node group %v:", nodeGroup.Id())
return scaleUpResourcesDelta{}, errors.ToAutoscalerError(errors.CloudProviderError, err).AddPrefix("Failed to get target custom resources for node group %v:", nodeGroup.Id())
}
for _, resourceTarget := range resourceTargets {
resultScaleUpDelta[resourceTarget.ResourceType] = resourceTarget.ResourceCount
}
resultScaleUpDelta[gpuType] = gpuCount
}

return resultScaleUpDelta, nil
Expand Down Expand Up @@ -343,7 +355,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
errCP)
}

scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context.CloudProvider, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)
scaleUpResourcesLeft, errLimits := computeScaleUpResourcesLeftLimits(context, processors, nodeGroups, nodeInfos, nodesFromNotAutoscaledGroups, resourceLimiter)
if errLimits != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, errLimits.AddPrefix("Could not compute total resources: ")
}
Expand Down Expand Up @@ -409,7 +421,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
continue
}

scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context.CloudProvider, nodeInfo, nodeGroup, resourceLimiter)
scaleUpResourcesDelta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter)
if err != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
skippedNodeGroups[nodeGroup.Id()] = notReadyReason
Expand Down Expand Up @@ -533,7 +545,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
}

// apply upper limits for CPU and memory
newNodes, err = applyScaleUpResourcesLimits(context.CloudProvider, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
newNodes, err = applyScaleUpResourcesLimits(context, processors, newNodes, scaleUpResourcesLeft, nodeInfo, bestOption.NodeGroup, resourceLimiter)
if err != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError, CreateNodeGroupResults: createNodeGroupResults}, err
}
Expand Down Expand Up @@ -681,14 +693,15 @@ func executeScaleUp(context *context.AutoscalingContext, clusterStateRegistry *c
}

func applyScaleUpResourcesLimits(
cp cloudprovider.CloudProvider,
context *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
newNodes int,
scaleUpResourcesLeft scaleUpResourcesLimits,
nodeInfo *schedulerframework.NodeInfo,
nodeGroup cloudprovider.NodeGroup,
resourceLimiter *cloudprovider.ResourceLimiter) (int, errors.AutoscalerError) {

delta, err := computeScaleUpResourcesDelta(cp, nodeInfo, nodeGroup, resourceLimiter)
delta, err := computeScaleUpResourcesDelta(context, processors, nodeInfo, nodeGroup, resourceLimiter)
if err != nil {
return 0, err
}
Expand Down
3 changes: 1 addition & 2 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"
Expand Down Expand Up @@ -719,7 +718,7 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a
// Treat those nodes as unready until GPU actually becomes available and let
// our normal handling for booting up nodes deal with this.
// TODO: Remove this call when we handle dynamically provisioned resources.
allNodes, readyNodes = gpu.FilterOutNodesWithUnreadyGpus(cp.GPULabel(), allNodes, readyNodes)
allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes)
allNodes, readyNodes = taints.FilterOutNodesWithIgnoredTaints(a.ignoredTaints, allNodes, readyNodes)
return allNodes, readyNodes, nil
}
Expand Down
Loading

0 comments on commit cd7369b

Please sign in to comment.