Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capacity prediction based on physical memory #1643

Merged
merged 2 commits into from
Feb 11, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ func (tcp *TestCloudProvider) AddAutoprovisionedNodeGroup(id string, min int, ma
return nodeGroup
}

// DeleteNodeGroup removes node group from test cloud provider.
func (tcp *TestCloudProvider) DeleteNodeGroup(id string) {
tcp.Lock()
defer tcp.Unlock()

delete(tcp.groups, id)
}

// AddNode adds the given node to the group.
func (tcp *TestCloudProvider) AddNode(nodeGroupId string, node *apiv1.Node) {
tcp.Lock()
Expand Down Expand Up @@ -296,7 +304,11 @@ func (tng *TestNodeGroup) Create() (cloudprovider.NodeGroup, error) {
// Delete deletes the node group on the cloud provider side.
// This will be executed only for autoprovisioned node groups, once their size drops to 0.
func (tng *TestNodeGroup) Delete() error {
return tng.cloudProvider.onNodeGroupDelete(tng.id)
err := tng.cloudProvider.onNodeGroupDelete(tng.id)
if err == nil {
tng.cloudProvider.DeleteNodeGroup(tng.Id())
}
return err
}

// DecreaseTargetSize decreases the target size of the node group. This function
Expand Down
14 changes: 7 additions & 7 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
}
context.ExpanderStrategy = expander

nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -500,7 +500,7 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(
provider,
clusterstate.ClusterStateRegistryConfig{MaxNodeProvisionTime: 5 * time.Minute},
Expand Down Expand Up @@ -546,7 +546,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
context := NewScaleTestAutoscalingContext(defaultOptions, &fake.Clientset{}, listers, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(
provider,
clusterstate.ClusterStateRegistryConfig{
Expand Down Expand Up @@ -600,7 +600,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
Expand Down Expand Up @@ -639,7 +639,7 @@ func TestScaleUpNoHelp(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

nodes := []*apiv1.Node{n1}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
Expand Down Expand Up @@ -703,7 +703,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

nodeInfos, _ := getNodeInfosForGroups(nodes, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, listers, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -768,7 +768,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t}

nodes := []*apiv1.Node{}
nodeInfos, _ := getNodeInfosForGroups(nodes, provider, context.ListerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := getNodeInfosForGroups(nodes, nil, provider, context.ListerRegistry, []*appsv1.DaemonSet{}, context.PredicateChecker)

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
Expand Down
7 changes: 5 additions & 2 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type StaticAutoscaler struct {
scaleDown *ScaleDown
processors *ca_processors.AutoscalingProcessors
initialized bool
// Caches nodeInfo computed for previously seen nodes
nodeInfoCache map[string]*schedulercache.NodeInfo
}

// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
Expand Down Expand Up @@ -99,6 +101,7 @@ func NewStaticAutoscaler(
scaleDown: scaleDown,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
nodeInfoCache: make(map[string]*schedulercache.NodeInfo),
}
}

Expand Down Expand Up @@ -149,8 +152,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return errors.ToAutoscalerError(errors.ApiCallError, err)
}

nodeInfosForGroups, autoscalerError := getNodeInfosForGroups(readyNodes, autoscalingContext.CloudProvider, autoscalingContext.ListerRegistry,
daemonsets, autoscalingContext.PredicateChecker)
nodeInfosForGroups, autoscalerError := getNodeInfosForGroups(
readyNodes, a.nodeInfoCache, autoscalingContext.CloudProvider, autoscalingContext.ListerRegistry, daemonsets, autoscalingContext.PredicateChecker)
if err != nil {
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
}
Expand Down
61 changes: 50 additions & 11 deletions cluster-autoscaler/core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ package core

import (
"fmt"
"k8s.io/kubernetes/pkg/scheduler/util"
"math/rand"
"reflect"
"sort"
"time"

"k8s.io/kubernetes/pkg/scheduler/util"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
Expand Down Expand Up @@ -247,52 +248,69 @@ func checkPodsSchedulableOnNode(context *context.AutoscalingContext, pods []*api
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
//
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
func getNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
func getNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulercache.NodeInfo, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
daemonsets []*appsv1.DaemonSet, predicateChecker *simulator.PredicateChecker) (map[string]*schedulercache.NodeInfo, errors.AutoscalerError) {
result := make(map[string]*schedulercache.NodeInfo)
seenGroups := make(map[string]bool)

// processNode returns information whether the nodeTemplate was generated and if there was an error.
processNode := func(node *apiv1.Node) (bool, errors.AutoscalerError) {
processNode := func(node *apiv1.Node) (bool, string, errors.AutoscalerError) {
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return false, errors.ToAutoscalerError(errors.CloudProviderError, err)
return false, "", errors.ToAutoscalerError(errors.CloudProviderError, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return false, nil
return false, "", nil
}
id := nodeGroup.Id()
if _, found := result[id]; !found {
// Build nodeInfo.
nodeInfo, err := simulator.BuildNodeInfoForNode(node, listers)
if err != nil {
return false, err
return false, "", err
}
sanitizedNodeInfo, err := sanitizeNodeInfo(nodeInfo, id)
if err != nil {
return false, err
return false, "", err
}
result[id] = sanitizedNodeInfo
return true, nil
return true, id, nil
}
return false, nil
return false, "", nil
}

for _, node := range nodes {
// Broken nodes might have some stuff missing. Skipping.
if !kube_util.IsNodeReadyAndSchedulable(node) {
continue
}
_, typedErr := processNode(node)
added, id, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulercache.NodeInfo{}, typedErr
}
if added && nodeInfoCache != nil {
if nodeInfoCopy, err := deepCopyNodeInfo(result[id]); err == nil {
nodeInfoCache[id] = nodeInfoCopy
}
}
}
for _, nodeGroup := range cloudProvider.NodeGroups() {
id := nodeGroup.Id()
seenGroups[id] = true
if _, found := result[id]; found {
continue
}

// No good template, check cache of previously running nodes.
if nodeInfoCache != nil {
if nodeInfo, found := nodeInfoCache[id]; found {
if nodeInfoCopy, err := deepCopyNodeInfo(nodeInfo); err == nil {
result[id] = nodeInfoCopy
continue
}
}
}

// No good template, trying to generate one. This is called only if there are no
// working nodes in the node groups. By default CA tries to use a real-world example.
nodeInfo, err := getNodeInfoFromTemplate(nodeGroup, daemonsets, predicateChecker)
Expand All @@ -307,11 +325,18 @@ func getNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou
result[id] = nodeInfo
}

// Remove invalid node groups from cache
for id := range nodeInfoCache {
if _, ok := seenGroups[id]; !ok {
delete(nodeInfoCache, id)
}
}

// Last resort - unready/unschedulable nodes.
for _, node := range nodes {
// Allowing broken nodes
if !kube_util.IsNodeReadyAndSchedulable(node) {
added, typedErr := processNode(node)
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulercache.NodeInfo{}, typedErr
}
Expand Down Expand Up @@ -365,6 +390,20 @@ func filterOutNodesFromNotAutoscaledGroups(nodes []*apiv1.Node, cloudProvider cl
return result, nil
}

func deepCopyNodeInfo(nodeInfo *schedulercache.NodeInfo) (*schedulercache.NodeInfo, errors.AutoscalerError) {
newPods := make([]*apiv1.Pod, 0)
for _, pod := range nodeInfo.Pods() {
newPods = append(newPods, pod.DeepCopy())
}

// Build a new node info.
newNodeInfo := schedulercache.NewNodeInfo(newPods...)
if err := newNodeInfo.SetNode(nodeInfo.Node().DeepCopy()); err != nil {
return nil, errors.ToAutoscalerError(errors.InternalError, err)
}
return newNodeInfo, nil
}

func sanitizeNodeInfo(nodeInfo *schedulercache.NodeInfo, nodeGroupName string) (*schedulercache.NodeInfo, errors.AutoscalerError) {
// Sanitize node name.
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName)
Expand Down
Loading