Skip to content

Commit

Permalink
Refactor: Move AutoscalingContext to Estimate func
Browse files Browse the repository at this point in the history
  • Loading branch information
azylinski committed Feb 15, 2024
1 parent 5286b3f commit 74e3b69
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 34 deletions.
4 changes: 1 addition & 3 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,11 +494,9 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(

estimateStart := time.Now()
expansionEstimator := o.estimatorBuilder(
o.autoscalingContext.PredicateChecker,
o.autoscalingContext.ClusterSnapshot,
estimator.NewEstimationContext(o.autoscalingContext.MaxNodesTotal, option.SimilarNodeGroups, currentNodeCount),
)
option.NodeCount, option.Pods = expansionEstimator.Estimate(pods, nodeInfo, nodeGroup)
option.NodeCount, option.Pods = expansionEstimator.Estimate(o.autoscalingContext, pods, nodeInfo, nodeGroup)
metrics.UpdateDurationFromStart(metrics.Estimate, estimateStart)

autoscalingOptions, err := nodeGroup.GetOptions(o.autoscalingContext.NodeGroupDefaults)
Expand Down
36 changes: 16 additions & 20 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,38 +21,32 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
cactx "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
klog "k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
type BinpackingNodeEstimator struct {
predicateChecker predicatechecker.PredicateChecker
clusterSnapshot clustersnapshot.ClusterSnapshot
limiter EstimationLimiter
podOrderer EstimationPodOrderer
context EstimationContext
estimationContext EstimationContext
estimationAnalyserFunc EstimationAnalyserFunc // optional
}

// NewBinpackingNodeEstimator builds a new BinpackingNodeEstimator.
func NewBinpackingNodeEstimator(
predicateChecker predicatechecker.PredicateChecker,
clusterSnapshot clustersnapshot.ClusterSnapshot,
limiter EstimationLimiter,
podOrderer EstimationPodOrderer,
context EstimationContext,
estimationContext EstimationContext,
estimationAnalyserFunc EstimationAnalyserFunc,
) *BinpackingNodeEstimator {
return &BinpackingNodeEstimator{
predicateChecker: predicateChecker,
clusterSnapshot: clusterSnapshot,
limiter: limiter,
podOrderer: podOrderer,
context: context,
estimationContext: estimationContext,
estimationAnalyserFunc: estimationAnalyserFunc,
}
}
Expand All @@ -68,21 +62,22 @@ func NewBinpackingNodeEstimator(
// It is assumed that all pods from the given list can fit to nodeTemplate.
// Returns the number of nodes needed to accommodate all pods from the list.
func (e *BinpackingNodeEstimator) Estimate(
ctx *cactx.AutoscalingContext,
pods []*apiv1.Pod,
nodeTemplate *schedulerframework.NodeInfo,
nodeGroup cloudprovider.NodeGroup) (int, []*apiv1.Pod) {

e.limiter.StartEstimation(pods, nodeGroup, e.context)
e.limiter.StartEstimation(pods, nodeGroup, e.estimationContext)
defer e.limiter.EndEstimation()

pods = e.podOrderer.Order(pods, nodeTemplate, nodeGroup)

newNodeNames := make(map[string]bool)
newNodesWithPods := make(map[string]bool)

e.clusterSnapshot.Fork()
ctx.ClusterSnapshot.Fork()
defer func() {
e.clusterSnapshot.Revert()
ctx.ClusterSnapshot.Revert()
}()

newNodeNameIndex := 0
Expand All @@ -92,12 +87,12 @@ func (e *BinpackingNodeEstimator) Estimate(
for _, pod := range pods {
found := false

nodeName, err := e.predicateChecker.FitsAnyNodeMatching(e.clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
nodeName, err := ctx.PredicateChecker.FitsAnyNodeMatching(ctx.ClusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool {
return newNodeNames[nodeInfo.Node().Name]
})
if err == nil {
found = true
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := ctx.ClusterSnapshot.AddPod(pod, nodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
return 0, nil
}
Expand All @@ -124,7 +119,7 @@ func (e *BinpackingNodeEstimator) Estimate(
}

// Add new node
newNodeName, err := e.addNewNodeToSnapshot(nodeTemplate, newNodeNameIndex)
newNodeName, err := e.addNewNodeToSnapshot(ctx.ClusterSnapshot, nodeTemplate, newNodeNameIndex)
if err != nil {
klog.Errorf("Error while adding new node for template to ClusterSnapshot; %v", err)
return 0, nil
Expand All @@ -137,10 +132,10 @@ func (e *BinpackingNodeEstimator) Estimate(
// Note that this may still fail (ex. if topology spreading with zonal topologyKey is used);
// in this case we can't help the pending pod. We keep the node in clusterSnapshot to avoid
// adding and removing node to snapshot for each such pod.
if err := e.predicateChecker.CheckPredicates(e.clusterSnapshot, pod, newNodeName); err != nil {
if err := ctx.PredicateChecker.CheckPredicates(ctx.ClusterSnapshot, pod, newNodeName); err != nil {
continue
}
if err := e.clusterSnapshot.AddPod(pod, newNodeName); err != nil {
if err := ctx.ClusterSnapshot.AddPod(pod, newNodeName); err != nil {
klog.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, newNodeName, err)
return 0, nil
}
Expand All @@ -150,13 +145,14 @@ func (e *BinpackingNodeEstimator) Estimate(
}

if e.estimationAnalyserFunc != nil {
e.estimationAnalyserFunc(e.clusterSnapshot, nodeGroup, newNodesWithPods)
e.estimationAnalyserFunc(ctx, nodeGroup, newNodesWithPods)
}

return len(newNodesWithPods), scheduledPods
}

func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
clusterSnapshot clustersnapshot.ClusterSnapshot,
template *schedulerframework.NodeInfo,
nameIndex int) (string, error) {

Expand All @@ -165,7 +161,7 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
for _, podInfo := range newNodeInfo.Pods {
pods = append(pods, podInfo.Pod)
}
if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
if err := clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
return "", err
}
return newNodeInfo.Node().Name, nil
Expand Down
9 changes: 7 additions & 2 deletions cluster-autoscaler/estimator/binpacking_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
cactx "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -187,12 +188,16 @@ func TestBinpackingEstimate(t *testing.T) {
assert.NoError(t, err)
limiter := NewThresholdBasedEstimationLimiter([]Threshold{NewStaticThreshold(tc.maxNodes, time.Duration(0))})
processor := NewDecreasingPodOrderer()
estimator := NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, processor, nil /* EstimationContext */, nil /* EstimationAnalyserFunc */)
ctx := &cactx.AutoscalingContext{
ClusterSnapshot: clusterSnapshot,
PredicateChecker: predicateChecker,
}
estimator := NewBinpackingNodeEstimator(limiter, processor, nil /* EstimationContext */, nil /* EstimationAnalyserFunc */)
node := makeNode(tc.millicores, tc.memory, "template", "zone-mars")
nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(node)

estimatedNodes, estimatedPods := estimator.Estimate(tc.pods, nodeInfo, nil)
estimatedNodes, estimatedPods := estimator.Estimate(ctx, tc.pods, nodeInfo, nil)
assert.Equal(t, tc.expectNodeCount, estimatedNodes)
assert.Equal(t, tc.expectPodCount, len(estimatedPods))
if tc.expectProcessedPods != nil {
Expand Down
15 changes: 6 additions & 9 deletions cluster-autoscaler/estimator/estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
cactx "k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/kubernetes/pkg/scheduler/framework"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)
Expand All @@ -39,24 +38,22 @@ var AvailableEstimators = []string{BinpackingEstimatorName}
// It returns the number of new nodes needed as well as the list of pods it managed
// to schedule on those nodes.
type Estimator interface {
Estimate([]*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod)
Estimate(*cactx.AutoscalingContext, []*apiv1.Pod, *schedulerframework.NodeInfo, cloudprovider.NodeGroup) (int, []*apiv1.Pod)
}

// EstimatorBuilder creates a new estimator object.
type EstimatorBuilder func(predicatechecker.PredicateChecker, clustersnapshot.ClusterSnapshot, EstimationContext) Estimator
type EstimatorBuilder func(EstimationContext) Estimator

// EstimationAnalyserFunc to be run at the end of the estimation logic.
type EstimationAnalyserFunc func(clustersnapshot.ClusterSnapshot, cloudprovider.NodeGroup, map[string]bool)
type EstimationAnalyserFunc func(*cactx.AutoscalingContext, cloudprovider.NodeGroup, map[string]bool)

// NewEstimatorBuilder creates a new estimator object from flag.
func NewEstimatorBuilder(name string, limiter EstimationLimiter, orderer EstimationPodOrderer, estimationAnalyserFunc EstimationAnalyserFunc) (EstimatorBuilder, error) {
switch name {
case BinpackingEstimatorName:
return func(
predicateChecker predicatechecker.PredicateChecker,
clusterSnapshot clustersnapshot.ClusterSnapshot,
context EstimationContext) Estimator {
return NewBinpackingNodeEstimator(predicateChecker, clusterSnapshot, limiter, orderer, context, estimationAnalyserFunc)
ectx EstimationContext) Estimator {
return NewBinpackingNodeEstimator(limiter, orderer, ectx, estimationAnalyserFunc)
}, nil
}
return nil, fmt.Errorf("unknown estimator: %s", name)
Expand Down

0 comments on commit 74e3b69

Please sign in to comment.