Skip to content

Commit

Permalink
Merge pull request #1550 from losipiuk/lukaszos/pass-nodegroup-nodein…
Browse files Browse the repository at this point in the history
…fo-map-to-clusterstateregistry-b698e

Pass nodeGroup->NodeInfo map to ClusterStateRegistry
  • Loading branch information
k8s-ci-robot authored Jan 8, 2019
2 parents f9455cc + 85a83b6 commit 14ed6ec
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 77 deletions.
5 changes: 4 additions & 1 deletion cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"

"k8s.io/klog"
)
Expand Down Expand Up @@ -119,6 +120,7 @@ type ClusterStateRegistry struct {
scaleUpRequests map[string]*ScaleUpRequest // nodeGroupName -> ScaleUpRequest
scaleDownRequests []*ScaleDownRequest
nodes []*apiv1.Node
nodeInfosForGroups map[string]*schedulercache.NodeInfo
cloudProvider cloudprovider.CloudProvider
perNodeGroupReadiness map[string]Readiness
totalReadiness Readiness
Expand Down Expand Up @@ -267,7 +269,7 @@ func (csr *ClusterStateRegistry) registerFailedScaleUpNoLock(nodeGroup cloudprov
}

// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime time.Time) error {
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulercache.NodeInfo, currentTime time.Time) error {
csr.updateNodeGroupMetrics()
targetSizes, err := getTargetSizes(csr.cloudProvider)
if err != nil {
Expand All @@ -284,6 +286,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime ti
defer csr.Unlock()

csr.nodes = nodes
csr.nodeInfosForGroups = nodeInfosForGroups
csr.previousCloudProviderNodeInstances = csr.cloudProviderNodeInstances
csr.cloudProviderNodeInstances = cloudProviderNodeInstances

Expand Down
46 changes: 23 additions & 23 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestOKWithScaleUp(t *testing.T) {
MaxNodeProvisionTime: time.Minute,
}, fakeLogRecorder, newBackoff())
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, time.Now())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())

Expand Down Expand Up @@ -97,7 +97,7 @@ func TestEmptyOK(t *testing.T) {
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: time.Minute,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{}, now.Add(-5*time.Second))
err := clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now.Add(-5*time.Second))
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand All @@ -107,7 +107,7 @@ func TestEmptyOK(t *testing.T) {
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 3, now.Add(-3*time.Second))
// clusterstate.scaleUpRequests["ng1"].Time = now.Add(-3 * time.Second)
// clusterstate.scaleUpRequests["ng1"].ExpectedAddTime = now.Add(1 * time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{}, now)
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)

assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestOKOneUnreadyNode(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestNodeWithoutNodeGroupDontCrash(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, nil, now)
assert.NoError(t, err)
clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{noNgNode}, now)
}
Expand All @@ -199,7 +199,7 @@ func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{ng1_1}, now)

assert.NoError(t, err)
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestMissingNodes(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.False(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand Down Expand Up @@ -304,7 +304,7 @@ func TestTooManyUnready(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.False(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand All @@ -329,7 +329,7 @@ func TestExpiredScaleUp(t *testing.T) {
MaxNodeProvisionTime: 2 * time.Minute,
}, fakeLogRecorder, newBackoff())
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 4, now.Add(-3*time.Minute))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.False(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestUpcomingNodes(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, nil, now)
assert.NoError(t, err)

upcomingNodes := clusterstate.GetUpcomingNodes()
Expand All @@ -423,19 +423,19 @@ func TestIncorrectSize(t *testing.T) {
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
now := time.Now()
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-5*time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-5*time.Minute))
incorrect := clusterstate.incorrectNodeGroupSizes["ng1"]
assert.Equal(t, 5, incorrect.ExpectedSize)
assert.Equal(t, 1, incorrect.CurrentSize)
assert.Equal(t, now.Add(-5*time.Minute), incorrect.FirstObserved)

clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-4*time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-4*time.Minute))
incorrect = clusterstate.incorrectNodeGroupSizes["ng1"]
assert.Equal(t, 5, incorrect.ExpectedSize)
assert.Equal(t, 1, incorrect.CurrentSize)
assert.Equal(t, now.Add(-5*time.Minute), incorrect.FirstObserved)

clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1}, now.Add(-3*time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1}, nil, now.Add(-3*time.Minute))
incorrect = clusterstate.incorrectNodeGroupSizes["ng1"]
assert.Equal(t, 5, incorrect.ExpectedSize)
assert.Equal(t, 2, incorrect.CurrentSize)
Expand All @@ -459,7 +459,7 @@ func TestUnregisteredNodes(t *testing.T) {
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 10 * time.Second,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, time.Now().Add(-time.Minute))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(-time.Minute))

assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
Expand All @@ -469,14 +469,14 @@ func TestUnregisteredNodes(t *testing.T) {

// The node didn't come up in MaxNodeProvisionTime, it should no longer be
// counted as upcoming (but it is still an unregistered node)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, time.Now().Add(time.Minute))
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name)
upcomingNodes = clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, len(upcomingNodes))

err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, time.Now().Add(time.Minute))
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
assert.Equal(t, 0, len(clusterstate.GetUnregisteredNodes()))
}
Expand Down Expand Up @@ -614,7 +614,7 @@ func TestScaleUpBackoff(t *testing.T) {

// After failed scale-up, node group should be still healthy, but should backoff from scale-ups
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-180*time.Second))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand All @@ -629,7 +629,7 @@ func TestScaleUpBackoff(t *testing.T) {
// Another failed scale up should cause longer backoff
clusterstate.RegisterOrUpdateScaleUp(provider.GetNodeGroup("ng1"), 1, now.Add(-121*time.Second))

err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand All @@ -643,7 +643,7 @@ func TestScaleUpBackoff(t *testing.T) {
ng1_4 := BuildTestNode("ng1-4", 1000, 1000)
SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute))
provider.AddNode("ng1", ng1_4)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3, ng1_4}, now)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3, ng1_4}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand Down Expand Up @@ -674,20 +674,20 @@ func TestGetClusterSize(t *testing.T) {
}, fakeLogRecorder, newBackoff())

// There are 2 actual nodes in 2 node groups with target sizes of 5 and 1.
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
currentSize, targetSize := clusterstate.GetClusterSize()
assert.Equal(t, 2, currentSize)
assert.Equal(t, 6, targetSize)

// Current size should increase after a new node is added.
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, now.Add(time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, nil, now.Add(time.Minute))
currentSize, targetSize = clusterstate.GetClusterSize()
assert.Equal(t, 3, currentSize)
assert.Equal(t, 6, targetSize)

// Target size should increase after a new node group is added.
provider.AddNodeGroup("ng3", 1, 10, 1)
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, now.Add(2*time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, nil, now.Add(2*time.Minute))
currentSize, targetSize = clusterstate.GetClusterSize()
assert.Equal(t, 3, currentSize)
assert.Equal(t, 7, targetSize)
Expand All @@ -696,7 +696,7 @@ func TestGetClusterSize(t *testing.T) {
for _, ng := range provider.NodeGroups() {
ng.(*testprovider.TestNodeGroup).SetTargetSize(10)
}
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, now.Add(3*time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1, ng2_1}, nil, now.Add(3*time.Minute))
currentSize, targetSize = clusterstate.GetClusterSize()
assert.Equal(t, 3, currentSize)
assert.Equal(t, 30, targetSize)
Expand Down
7 changes: 1 addition & 6 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ var (
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet) (*status.ScaleUpStatus, errors.AutoscalerError) {
nodes []*apiv1.Node, daemonSets []*appsv1.DaemonSet, nodeInfos map[string]*schedulercache.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
Expand All @@ -263,11 +263,6 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
podsRemainUnschedulable[pod] = make(map[string]status.Reasons)
}
glogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ListerRegistry,
daemonSets, context.PredicateChecker)
if err != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, err.AddPrefix("failed to build node infos for node groups: ")
}

nodesFromNotAutoscaledGroups, err := FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider)
if err != nil {
Expand Down
Loading

0 comments on commit 14ed6ec

Please sign in to comment.