Skip to content

Commit

Permalink
WIP: BookCapacity for ProvisioningRequest pods
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed May 29, 2024
1 parent b3a501c commit 0b8fdab
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 67 deletions.
5 changes: 4 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)

err = a.processors.CapacityReservation.BookCapacity(a.AutoscalingContext)
if err != nil {
klog.Warningf("Failed to reserve capacity: %v", err)
}
preScaleUp := func() time.Time {
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
Expand Down
14 changes: 8 additions & 6 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/loop"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
Expand Down Expand Up @@ -496,22 +497,23 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
provreqOrchestrator, err := provreqorchestrator.New(restConfig)
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, err
}
provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{
checkcapacity.New(client),
besteffortatomic.New(client),
})
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

opts.ScaleUpOrchestrator = scaleUpOrchestrator
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, err
}
provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)})
provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)}, opts.PredicateChecker)
if err != nil {
return nil, err
}
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
opts.Processors.CapacityReservation = provreqProcesor
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package capacityreservation

import (
"k8s.io/autoscaler/cluster-autoscaler/context"
)

// CapacityReservation is interface to reserve capacity in the cluster.
type CapacityReservation interface {
BookCapacity(ctx *context.AutoscalingContext) error
}
4 changes: 3 additions & 1 deletion cluster-autoscaler/processors/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster"
"k8s.io/autoscaler/cluster-autoscaler/processors/binpacking"
"k8s.io/autoscaler/cluster-autoscaler/processors/capacityreservation"
"k8s.io/autoscaler/cluster-autoscaler/processors/customresources"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups"
Expand Down Expand Up @@ -70,7 +71,8 @@ type AutoscalingProcessors struct {
// * scale-downs per nodegroup
// * scale-up failures per nodegroup
// * scale-down failures per nodegroup
ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
ScaleStateNotifier *nodegroupchange.NodeGroupChangeObserversList
CapacityReservation capacityreservation.CapacityReservation
}

// DefaultProcessors returns default set of processors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@ limitations under the License.
package provreq

import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
"fmt"

apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/klog/v2"
)

Expand All @@ -34,11 +43,12 @@ type ProvisioningRequestProcessor interface {
type CombinedProvReqProcessor struct {
client *provreqclient.ProvisioningRequestClient
processors []ProvisioningRequestProcessor
injector *scheduling.HintingSimulator
}

// NewCombinedProvReqProcessor return new CombinedProvReqProcessor.
func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor) loopstart.Observer {
return &CombinedProvReqProcessor{client: client, processors: processors}
func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor, predicateChecker predicatechecker.PredicateChecker) *CombinedProvReqProcessor {
return &CombinedProvReqProcessor{client: client, processors: processors, injector: scheduling.NewHintingSimulator(predicateChecker)}
}

// Refresh iterates over ProvisioningRequests and updates its conditions/state.
Expand All @@ -55,3 +65,35 @@ func (cp *CombinedProvReqProcessor) Refresh() {

// CleanUp cleans up internal state
func (cp *CombinedProvReqProcessor) CleanUp() {}

func (cp *CombinedProvReqProcessor) BookCapacity(ctx *context.AutoscalingContext) error {
provReqs, err := cp.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if conditions.ShouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := cp.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
}
}
if len(podsToCreate) == 0 {
return nil
}
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
if _, _, err = cp.injector.TrySchedulePods(ctx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
klog.Warningf("Error during capacity booking: %v", err)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,21 @@ import (

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type provisioningClass interface {
// ProvisioningClass is an interface for ProvisioningRequests.
type ProvisioningClass interface {
Provision([]*apiv1.Pod, []*apiv1.Node, []*appsv1.DaemonSet,
map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, ca_errors.AutoscalerError)
Initialize(*context.AutoscalingContext, *ca_processors.AutoscalingProcessors, *clusterstate.ClusterStateRegistry,
Expand All @@ -55,23 +48,15 @@ type provReqOrchestrator struct {
context *context.AutoscalingContext
client *provreqclient.ProvisioningRequestClient
injector *scheduling.HintingSimulator
provisioningClasses []provisioningClass
provisioningClasses []ProvisioningClass
}

// New return new orchestrator.
func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}

func New(client *provreqclient.ProvisioningRequestClient, classes []ProvisioningClass) *provReqOrchestrator {
return &provReqOrchestrator{
client: client,
provisioningClasses: []provisioningClass{
checkcapacity.New(client),
besteffortatomic.New(client),
},
}, nil
client: client,
provisioningClasses: classes,
}
}

// Initialize initialize orchestrator.
Expand Down Expand Up @@ -106,7 +91,6 @@ func (o *provReqOrchestrator) ScaleUp(

o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()
o.bookCapacity()

// unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp.
for _, provClass := range o.provisioningClasses {
Expand All @@ -125,35 +109,3 @@ func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
return nil, nil
}

func (o *provReqOrchestrator) bookCapacity() error {
provReqs, err := o.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if conditions.ShouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
}
}
if len(podsToCreate) == 0 {
return nil
}
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
klog.Warningf("Error during capacity booking: %v", err)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.Provisio

orchestrator := &provReqOrchestrator{
client: client,
provisioningClasses: []provisioningClass{checkcapacity.New(client), besteffortatomic.New(client)},
provisioningClasses: []ProvisioningClass{checkcapacity.New(client), besteffortatomic.New(client)},
}
orchestrator.Initialize(&autoscalingContext, processors, clusterState, estimatorBuilder, taints.TaintConfig{})
return orchestrator, nodeInfos
Expand Down

0 comments on commit 0b8fdab

Please sign in to comment.