Skip to content

Commit

Permalink
Cherry-pick of kubernetes#1643: Cache exemplar ready node for each no…
Browse files Browse the repository at this point in the history
…de group
  • Loading branch information
jkaniuk committed Feb 15, 2019
1 parent 2e6319d commit c7e9815
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 59 deletions.
14 changes: 13 additions & 1 deletion cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ func (tcp *TestCloudProvider) AddAutoprovisionedNodeGroup(id string, min int, ma
return nodeGroup
}

// DeleteNodeGroup removes node group from test cloud provider.
func (tcp *TestCloudProvider) DeleteNodeGroup(id string) {
tcp.Lock()
defer tcp.Unlock()

delete(tcp.groups, id)
}

// AddNode adds the given node to the group.
func (tcp *TestCloudProvider) AddNode(nodeGroupId string, node *apiv1.Node) {
tcp.Lock()
Expand Down Expand Up @@ -296,7 +304,11 @@ func (tng *TestNodeGroup) Create() (cloudprovider.NodeGroup, error) {
// Delete deletes the node group on the cloud provider side.
// This will be executed only for autoprovisioned node groups, once their size drops to 0.
func (tng *TestNodeGroup) Delete() error {
return tng.cloudProvider.onNodeGroupDelete(tng.id)
err := tng.cloudProvider.onNodeGroupDelete(tng.id)
if err == nil {
tng.cloudProvider.DeleteNodeGroup(tng.Id())
}
return err
}

// DecreaseTargetSize decreases the target size of the node group. This function
Expand Down
14 changes: 7 additions & 7 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
}
context.ExpanderStrategy = expander

nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := GetNodeInfosForGroups(nodes, nil, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -520,7 +520,7 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, fakeClient, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := GetNodeInfosForGroups(nodes, nil, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.RegisterScaleUp(&clusterstate.ScaleUpRequest{
NodeGroup: provider.GetNodeGroup("ng2"),
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
context := NewScaleTestAutoscalingContext(defaultOptions, fakeClient, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := GetNodeInfosForGroups(nodes, nil, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.RegisterScaleUp(&clusterstate.ScaleUpRequest{
NodeGroup: provider.GetNodeGroup("ng2"),
Expand Down Expand Up @@ -640,7 +640,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, fakeClient, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := GetNodeInfosForGroups(nodes, nil, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)
Expand Down Expand Up @@ -686,7 +686,7 @@ func TestScaleUpNoHelp(t *testing.T) {
context := NewScaleTestAutoscalingContext(options, fakeClient, provider)

nodes := []*apiv1.Node{n1}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := GetNodeInfosForGroups(nodes, nil, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes([]*apiv1.Node{n1}, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)
Expand Down Expand Up @@ -762,7 +762,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
context := NewScaleTestAutoscalingContext(options, fakeClient, provider)

nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := GetNodeInfosForGroups(nodes, nil, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

Expand Down Expand Up @@ -827,7 +827,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t}

nodes := []*apiv1.Node{}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)
nodeInfos, _ := GetNodeInfosForGroups(nodes, nil, provider, fakeClient, []*extensionsv1.DaemonSet{}, context.PredicateChecker)

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*extensionsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
Expand Down
7 changes: 5 additions & 2 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type StaticAutoscaler struct {
scaleDown *ScaleDown
processors *ca_processors.AutoscalingProcessors
initialized bool
// Caches nodeInfo computed for previously seen nodes
nodeInfoCache map[string]*schedulercache.NodeInfo
}

// NewStaticAutoscaler creates an instance of Autoscaler filled with provided parameters
Expand Down Expand Up @@ -97,6 +99,7 @@ func NewStaticAutoscaler(
scaleDown: scaleDown,
processors: processors,
clusterStateRegistry: clusterStateRegistry,
nodeInfoCache: make(map[string]*schedulercache.NodeInfo),
}
}

Expand Down Expand Up @@ -143,8 +146,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return errors.ToAutoscalerError(errors.ApiCallError, err)
}

nodeInfosForGroups, autoscalerError := GetNodeInfosForGroups(readyNodes, autoscalingContext.CloudProvider, autoscalingContext.ClientSet,
daemonsets, autoscalingContext.PredicateChecker)
nodeInfosForGroups, autoscalerError := GetNodeInfosForGroups(
readyNodes, a.nodeInfoCache, autoscalingContext.CloudProvider, autoscalingContext.ClientSet, daemonsets, autoscalingContext.PredicateChecker)
if err != nil {
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
}
Expand Down
58 changes: 48 additions & 10 deletions cluster-autoscaler/core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,52 +218,69 @@ func CheckPodsSchedulableOnNode(context *context.AutoscalingContext, pods []*api
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
//
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, kubeClient kube_client.Interface,
func GetNodeInfosForGroups(nodes []*apiv1.Node, nodeInfoCache map[string]*schedulercache.NodeInfo, cloudProvider cloudprovider.CloudProvider, kubeClient kube_client.Interface,
daemonsets []*extensionsv1.DaemonSet, predicateChecker *simulator.PredicateChecker) (map[string]*schedulercache.NodeInfo, errors.AutoscalerError) {
result := make(map[string]*schedulercache.NodeInfo)
seenGroups := make(map[string]bool)

// processNode returns information whether the nodeTemplate was generated and if there was an error.
processNode := func(node *apiv1.Node) (bool, errors.AutoscalerError) {
processNode := func(node *apiv1.Node) (bool, string, errors.AutoscalerError) {
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return false, errors.ToAutoscalerError(errors.CloudProviderError, err)
return false, "", errors.ToAutoscalerError(errors.CloudProviderError, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return false, nil
return false, "", nil
}
id := nodeGroup.Id()
if _, found := result[id]; !found {
// Build nodeInfo.
nodeInfo, err := simulator.BuildNodeInfoForNode(node, kubeClient)
if err != nil {
return false, err
return false, "", err
}
sanitizedNodeInfo, err := sanitizeNodeInfo(nodeInfo, id)
if err != nil {
return false, err
return false, "", err
}
result[id] = sanitizedNodeInfo
return true, nil
return true, id, nil
}
return false, nil
return false, "", nil
}

for _, node := range nodes {
// Broken nodes might have some stuff missing. Skipping.
if !kube_util.IsNodeReadyAndSchedulable(node) {
continue
}
_, typedErr := processNode(node)
added, id, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulercache.NodeInfo{}, typedErr
}
if added && nodeInfoCache != nil {
if nodeInfoCopy, err := deepCopyNodeInfo(result[id]); err == nil {
nodeInfoCache[id] = nodeInfoCopy
}
}
}
for _, nodeGroup := range cloudProvider.NodeGroups() {
id := nodeGroup.Id()
seenGroups[id] = true
if _, found := result[id]; found {
continue
}

// No good template, check cache of previously running nodes.
if nodeInfoCache != nil {
if nodeInfo, found := nodeInfoCache[id]; found {
if nodeInfoCopy, err := deepCopyNodeInfo(nodeInfo); err == nil {
result[id] = nodeInfoCopy
continue
}
}
}

// No good template, trying to generate one. This is called only if there are no
// working nodes in the node groups. By default CA tries to use a real-world example.
nodeInfo, err := GetNodeInfoFromTemplate(nodeGroup, daemonsets, predicateChecker)
Expand All @@ -278,11 +295,18 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou
result[id] = nodeInfo
}

// Remove invalid node groups from cache
for id := range nodeInfoCache {
if _, ok := seenGroups[id]; !ok {
delete(nodeInfoCache, id)
}
}

// Last resort - unready/unschedulable nodes.
for _, node := range nodes {
// Allowing broken nodes
if !kube_util.IsNodeReadyAndSchedulable(node) {
added, typedErr := processNode(node)
added, _, typedErr := processNode(node)
if typedErr != nil {
return map[string]*schedulercache.NodeInfo{}, typedErr
}
Expand Down Expand Up @@ -336,6 +360,20 @@ func FilterOutNodesFromNotAutoscaledGroups(nodes []*apiv1.Node, cloudProvider cl
return result, nil
}

func deepCopyNodeInfo(nodeInfo *schedulercache.NodeInfo) (*schedulercache.NodeInfo, errors.AutoscalerError) {
newPods := make([]*apiv1.Pod, 0)
for _, pod := range nodeInfo.Pods() {
newPods = append(newPods, pod.DeepCopy())
}

// Build a new node info.
newNodeInfo := schedulercache.NewNodeInfo(newPods...)
if err := newNodeInfo.SetNode(nodeInfo.Node().DeepCopy()); err != nil {
return nil, errors.ToAutoscalerError(errors.InternalError, err)
}
return newNodeInfo, nil
}

func sanitizeNodeInfo(nodeInfo *schedulercache.NodeInfo, nodeGroupName string) (*schedulercache.NodeInfo, errors.AutoscalerError) {
// Sanitize node name.
sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName)
Expand Down
Loading

0 comments on commit c7e9815

Please sign in to comment.