Skip to content

Commit

Permalink
Cherry-pick of kubernetes#1550: Pass nodeGroup->NodeInfo map to Clust…
Browse files Browse the repository at this point in the history
…erStateRegistry

Change-Id: Ie2a51694b5731b39c8a4135355a3b4c832c26801
  • Loading branch information
losipiuk authored and jkaniuk committed Feb 14, 2019
1 parent 7a60cf6 commit 3421046
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 75 deletions.
5 changes: 4 additions & 1 deletion cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"

"github.com/golang/glog"
)
Expand Down Expand Up @@ -118,6 +119,7 @@ type ClusterStateRegistry struct {
scaleUpRequests []*ScaleUpRequest
scaleDownRequests []*ScaleDownRequest
nodes []*apiv1.Node
nodeInfosForGroups map[string]*schedulercache.NodeInfo
cloudProvider cloudprovider.CloudProvider
perNodeGroupReadiness map[string]Readiness
totalReadiness Readiness
Expand Down Expand Up @@ -235,7 +237,7 @@ func (csr *ClusterStateRegistry) RegisterFailedScaleUp(nodeGroupName string, rea
}

// UpdateNodes updates the state of the nodes in the ClusterStateRegistry and recalculates the stats
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime time.Time) error {
func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulercache.NodeInfo, currentTime time.Time) error {
csr.updateNodeGroupMetrics()
targetSizes, err := getTargetSizes(csr.cloudProvider)
if err != nil {
Expand All @@ -250,6 +252,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, currentTime ti
defer csr.Unlock()

csr.nodes = nodes
csr.nodeInfosForGroups = nodeInfosForGroups

csr.updateUnregisteredNodes(notRegistered)
csr.updateReadinessStats(currentTime)
Expand Down
39 changes: 20 additions & 19 deletions cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestOKWithScaleUp(t *testing.T) {
Time: now,
ExpectedAddTime: now.Add(time.Minute),
})
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestEmptyOK(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder)
err := clusterstate.UpdateNodes([]*apiv1.Node{}, now.Add(-5*time.Second))
err := clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now.Add(-5*time.Second))
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand All @@ -112,7 +112,7 @@ func TestEmptyOK(t *testing.T) {
Time: now.Add(-3 * time.Second),
ExpectedAddTime: now.Add(1 * time.Minute),
})
err = clusterstate.UpdateNodes([]*apiv1.Node{}, now)
err = clusterstate.UpdateNodes([]*apiv1.Node{}, nil, now)

assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
Expand Down Expand Up @@ -141,7 +141,7 @@ func TestOKOneUnreadyNode(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestNodeWithoutNodeGroupDontCrash(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder)
err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{noNgNode}, nil, now)
assert.NoError(t, err)
clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{noNgNode}, now)
}
Expand All @@ -204,7 +204,7 @@ func TestOKOneUnreadyNodeWithScaleDownCandidate(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
clusterstate.UpdateScaleDownCandidates([]*apiv1.Node{ng1_1}, now)

assert.NoError(t, err)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestMissingNodes(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.False(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestTooManyUnready(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1}, nil, now)
assert.NoError(t, err)
assert.False(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand All @@ -331,14 +331,15 @@ func TestExpiredScaleUp(t *testing.T) {
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 2 * time.Minute,
}, fakeLogRecorder)
clusterstate.RegisterScaleUp(&ScaleUpRequest{
NodeGroupName: "ng1",
Increase: 4,
Time: now.Add(-3 * time.Minute),
ExpectedAddTime: now.Add(-1 * time.Minute),
})
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.False(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand Down Expand Up @@ -409,7 +410,7 @@ func TestUpcomingNodes(t *testing.T) {
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, nil, now)
assert.NoError(t, err)

upcomingNodes := clusterstate.GetUpcomingNodes()
Expand All @@ -432,19 +433,19 @@ func TestIncorrectSize(t *testing.T) {
OkTotalUnreadyCount: 1,
}, fakeLogRecorder)
now := time.Now()
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-5*time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-5*time.Minute))
incorrect := clusterstate.incorrectNodeGroupSizes["ng1"]
assert.Equal(t, 5, incorrect.ExpectedSize)
assert.Equal(t, 1, incorrect.CurrentSize)
assert.Equal(t, now.Add(-5*time.Minute), incorrect.FirstObserved)

clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, now.Add(-4*time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now.Add(-4*time.Minute))
incorrect = clusterstate.incorrectNodeGroupSizes["ng1"]
assert.Equal(t, 5, incorrect.ExpectedSize)
assert.Equal(t, 1, incorrect.CurrentSize)
assert.Equal(t, now.Add(-5*time.Minute), incorrect.FirstObserved)

clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1}, now.Add(-3*time.Minute))
clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_1}, nil, now.Add(-3*time.Minute))
incorrect = clusterstate.incorrectNodeGroupSizes["ng1"]
assert.Equal(t, 5, incorrect.ExpectedSize)
assert.Equal(t, 2, incorrect.CurrentSize)
Expand All @@ -468,7 +469,7 @@ func TestUnregisteredNodes(t *testing.T) {
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 10 * time.Second,
}, fakeLogRecorder)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, time.Now().Add(-time.Minute))
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(-time.Minute))

assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
Expand All @@ -478,14 +479,14 @@ func TestUnregisteredNodes(t *testing.T) {

// The node didn't come up in MaxNodeProvisionTime, it should no longer be
// counted as upcoming (but it is still an unregistered node)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, time.Now().Add(time.Minute))
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetUnregisteredNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetUnregisteredNodes()[0].Node.Name)
upcomingNodes = clusterstate.GetUpcomingNodes()
assert.Equal(t, 0, len(upcomingNodes))

err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, time.Now().Add(time.Minute))
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, time.Now().Add(time.Minute))
assert.NoError(t, err)
assert.Equal(t, 0, len(clusterstate.GetUnregisteredNodes()))
}
Expand Down Expand Up @@ -626,7 +627,7 @@ func TestScaleUpBackoff(t *testing.T) {
Time: now.Add(-3 * time.Minute),
ExpectedAddTime: now.Add(-1 * time.Minute),
})
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand All @@ -645,7 +646,7 @@ func TestScaleUpBackoff(t *testing.T) {
Time: now.Add(-2 * time.Minute),
ExpectedAddTime: now.Add(-1 * time.Second),
})
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, now)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand All @@ -664,7 +665,7 @@ func TestScaleUpBackoff(t *testing.T) {
ng1_4 := BuildTestNode("ng1-4", 1000, 1000)
SetNodeReadyState(ng1_4, true, now.Add(-1*time.Minute))
provider.AddNode("ng1", ng1_4)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3, ng1_4}, now)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, ng1_3, ng1_4}, nil, now)
assert.NoError(t, err)
assert.True(t, clusterstate.IsClusterHealthy())
assert.True(t, clusterstate.IsNodeGroupHealthy("ng1"))
Expand Down
7 changes: 1 addition & 6 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func getNodeInfoCoresAndMemory(nodeInfo *schedulercache.NodeInfo) (int64, int64)
// false if it didn't and error if an error occurred. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.AutoscalingProcessors, clusterStateRegistry *clusterstate.ClusterStateRegistry, unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node, daemonSets []*extensionsv1.DaemonSet) (*status.ScaleUpStatus, errors.AutoscalerError) {
nodes []*apiv1.Node, daemonSets []*extensionsv1.DaemonSet, nodeInfos map[string]*schedulercache.NodeInfo) (*status.ScaleUpStatus, errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
Expand All @@ -250,11 +250,6 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
podsRemainUnschedulable[pod] = true
}
glogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet,
daemonSets, context.PredicateChecker)
if err != nil {
return nil, err.AddPrefix("failed to build node infos for node groups: ")
}

nodesFromNotAutoscaledGroups, err := FilterOutNodesFromNotAutoscaledGroups(nodes, context.CloudProvider)
if err != nil {
Expand Down
Loading

0 comments on commit 3421046

Please sign in to comment.