Skip to content

Commit

Permalink
Pass nodeGroup->NodeInfo map to ClusterStateRegistry
Browse files Browse the repository at this point in the history
Change-Id: Ie2a51694b5731b39c8a4135355a3b4c832c26801
  • Loading branch information
losipiuk committed Jan 8, 2019
1 parent f9455cc commit 6a760d2
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 31 deletions.
5 changes: 4 additions & 1 deletion cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"

"k8s.io/klog"
)
Expand Down Expand Up @@ -119,6 +120,7 @@ type ClusterStateRegistry struct {
scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest
scaleDownRequests []*ScaleDownRequest
nodes []*apiv1.Node
nodeInfosForGroups map[string]*schedulercache.NodeInfo
cloudProvider cloudprovider.CloudProvider
perNodeGroupReadiness map[string]Readiness
totalReadiness Readiness
Expand Down Expand Up @@ -267,7 +269,7 @@ func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprov
}

// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime time.Time) error {
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulercache.NodeInfo, currentTime time.Time) error {
csr.updateNodeGroupMetrics()
targetSizes, err := getTargetSizes(csr.cloudProvider)
if err != nil {
Expand All @@ -284,6 +286,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime ti
defer csr.Unlock()

csr.nodes = nodes
csr.nodeInfosForGroups = nodeInfosForGroups
csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances
csr.cloudProviderNodeInstances = cloudProviderNodeInstances

Expand Down
7 changes: 1 addition & 6 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ var (
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet) (*status.ScaleUpStatus, errors.AutoscalerError) {
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulercache.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
Expand All @@ -263,11 +263,6 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
podsRemainUnschedulable[pod] = make(map[string]status.Reasons)
}
glogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ListerRegistry,
daemonSets, context.PredicateChecker)
if err != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, err.AddPrefix("failed to build node infos for node groups: ")
}

nodesFromNotAutoscaledGroups, err := FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider)
if err != nil {
Expand Down
36 changes: 23 additions & 13 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,9 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
}
context.ExpanderStrategy = expander

nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, time.Now())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

extraPods := make([]*apiv1.Pod, len(config.extraPods))
for i, p := range config.extraPods {
Expand All @@ -425,7 +426,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {

processors := ca_processors.TestProcessors()

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{})
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -499,19 +500,21 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
}
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(
provider,
clusterstate.ClusterStateRegistryConfig{MaxNodeProvisionTime: 5 * time.Minute},
context.LogRecorder,
newBackoff())
clusterState.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng2"), 1, time.Now())
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

p3 := BuildTestPod("p-new", 550, 0)

processors := ca_processors.TestProcessors()

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*appsv1.DaemonSet{})
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
// A node is already coming - no need for scale up.
assert.False(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -543,6 +546,8 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {

context := NewScaleTestAutoscalingContext(defaultOptions, &fake.Clientset{}, listers, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(
provider,
clusterstate.ClusterStateRegistryConfig{
Expand All @@ -551,13 +556,13 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
context.LogRecorder,
newBackoff())
clusterState.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng2"), 1, time.Now())
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

p3 := BuildTestPod("p-new", 550, 0)
p4 := BuildTestPod("p-new", 550, 0)

processors := ca_processors.TestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3, p4}, []*apiv1.Node{n1, n2}, []*appsv1.DaemonSet{})
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3, p4}, nodes, []*appsv1.DaemonSet{}, nodeInfos)

assert.NoError(t, err)
// Two nodes needed but one node is already coming, so it should increase by one.
Expand Down Expand Up @@ -595,12 +600,14 @@ func TestScaleUpUnhealthy(t *testing.T) {
}
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

nodes := []*apiv1.Node{n1, n2}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 550, 0)

processors := ca_processors.TestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1, n2}, []*appsv1.DaemonSet{})
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)

assert.NoError(t, err)
// Node group is unhealthy.
Expand Down Expand Up @@ -632,12 +639,14 @@ func TestScaleUpNoHelp(t *testing.T) {
}
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

nodes := []*apiv1.Node{n1}
nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())
p3 := BuildTestPod("p-new", 500, 0)

processors := ca_processors.TestProcessors()
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, []*apiv1.Node{n1}, []*appsv1.DaemonSet{})
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -695,16 +704,17 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

nodeInfos, _ := GetNodeInfosForGroups(nodes, provider, fakeClient, []*appsv1.DaemonSet{}, context.PredicateChecker)
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, time.Now())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

pods := make([]*apiv1.Pod, 0)
for i := 0; i < 2; i++ {
pods = append(pods, BuildTestPod(fmt.Sprintf("test-pod-%v", i), 80, 0))
}

processors := ca_processors.TestProcessors()
scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{})
scaleUpStatus, typedErr := ScaleUp(&context, processors, clusterState, pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)

assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -758,7 +768,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
processors.NodeGroupListProcessor = &mockAutoprovisioningNodeGroupListProcessor{t}
processors.NodeGroupManager = &mockAutoprovisioningNodeGroupManager{t}

scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*appsv1.DaemonSet{})
scaleUpStatus, err := ScaleUp(&context, processors, clusterState, []*apiv1.Pod{p1}, []*apiv1.Node{}, []*appsv1.DaemonSet{}, nil)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", getStringFromChan(createdGroups))
Expand Down
28 changes: 17 additions & 11 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"

"k8s.io/klog"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)

const (
Expand Down Expand Up @@ -137,7 +138,19 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
return nil
}

typedErr = a.updateClusterState(allNodes, currentTime)
daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything())
if err != nil {
klog.Errorf("Failed to get daemonset list")
return errors.ToAutoscalerError(errors.ApiCallError, err)
}

nodeInfosForGroups, autoscalerError := GetNodeInfosForGroups(readyNodes, autoscalingContext.CloudProvider, autoscalingContext.ListerRegistry,
daemonsets, autoscalingContext.PredicateChecker)
if err != nil {
return autoscalerError.AddPrefix("failed to build node infos for node groups: ")
}

typedErr = a.updateClusterState(allNodes, nodeInfosForGroups, currentTime)
if typedErr != nil {
return typedErr
}
Expand Down Expand Up @@ -293,17 +306,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
scaleUpStatus.Result = status.ScaleUpInCooldown
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
daemonsets, err := a.ListerRegistry.DaemonSetLister().List(labels.Everything())
if err != nil {
klog.Errorf("Failed to get daemonset list")
scaleUpStatus.Result = status.ScaleUpError
return errors.ToAutoscalerError(errors.ApiCallError, err)
}

scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)

scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets)
scaleUpStatus, typedErr = ScaleUp(autoscalingContext, a.processors, a.clusterStateRegistry, unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups)

metrics.UpdateDurationFromStart(metrics.ScaleUp, scaleUpStart)

Expand Down Expand Up @@ -511,14 +517,14 @@ func (a *StaticAutoscaler) actOnEmptyCluster(allNodes, readyNodes []*apiv1.Node,
return false
}

func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, currentTime time.Time) errors.AutoscalerError {
func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulercache.NodeInfo, currentTime time.Time) errors.AutoscalerError {
err := a.AutoscalingContext.CloudProvider.Refresh()
if err != nil {
klog.Errorf("Failed to refresh cloud provider config: %v", err)
return errors.ToAutoscalerError(errors.CloudProviderError, err)
}

err = a.clusterStateRegistry.UpdateNodes(allNodes, currentTime)
err = a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime)
if err != nil {
klog.Errorf("Failed to update node registry: %v", err)
a.scaleDown.CleanUpUnneededNodes()
Expand Down

0 comments on commit 6a760d2

Please sign in to comment.