Skip to content

Commit

Permalink
Add provreqOrchestrator that handle ProvReq classes (#6627)
Browse files Browse the repository at this point in the history
* Add provreqOrchestrator that handle ProvReq classes

* Review remarks

* Review remarks
  • Loading branch information
yaroslava-serdiuk authored Apr 17, 2024
1 parent af1e610 commit 5f94f2c
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 214 deletions.
5 changes: 4 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/previouscandidates"
provreqorchestrator "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
Expand Down Expand Up @@ -494,10 +495,12 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
scaleUpOrchestrator, err := orchestrator.NewWrapperOrchestrator(restConfig)
provreqOrchestrator, err := provreqorchestrator.New(restConfig)
if err != nil {
return nil, err
}
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var SupportedProvisioningClasses = []string{v1beta1.ProvisioningClassCheckCapaci

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client provisioningRequestClient
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,10 @@ type ProvisioningRequestProcessor interface {
CleanUp()
}

type provisioningRequestClient interface {
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}

// CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass
// every CA loop and updating conditions for expired ProvisioningRequests.
type CombinedProvReqProcessor struct {
client provisioningRequestClient
client *provreqclient.ProvisioningRequestClient
processors []ProvisioningRequestProcessor
}

Expand Down
180 changes: 0 additions & 180 deletions cluster-autoscaler/provisioningrequest/checkcapacity/orchestrator.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package checkcapacity

import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"

ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type checkCapacityProvClass struct {
context *context.AutoscalingContext
client *provreqclient.ProvisioningRequestClient
injector *scheduling.HintingSimulator
}

// New create check-capacity scale-up mode.
func New(
client *provreqclient.ProvisioningRequestClient,
) *checkCapacityProvClass {
return &checkCapacityProvClass{client: client}
}

func (o *checkCapacityProvClass) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
estimatorBuilder estimator.EstimatorBuilder,
taintConfig taints.TaintConfig,
injector *scheduling.HintingSimulator,
) {
o.context = autoscalingContext
o.injector = injector
}

// Provision return if there is capacity in the cluster for pods from ProvisioningRequest.
func (o *checkCapacityProvClass) Provision(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if len(unschedulablePods) == 0 {
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
}
pr, err := provreqclient.ProvisioningRequestForPods(o.client, unschedulablePods)
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error()))
}
if pr.Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity {
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
}

o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()

scaleUpIsSuccessful, err := o.checkcapacity(unschedulablePods, pr)
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error()))
}
if scaleUpIsSuccessful {
return &status.ScaleUpStatus{Result: status.ScaleUpSuccessful}, nil
}
return &status.ScaleUpStatus{Result: status.ScaleUpNoOptionsAvailable}, nil
}

// Assuming that all unschedulable pods comes from one ProvisioningRequest.
func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, provReq *provreqwrapper.ProvisioningRequest) (bool, error) {
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
if len(st) < len(unschedulablePods) || err != nil {
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
return false, err
}
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now())
return true, nil
}
Loading

0 comments on commit 5f94f2c

Please sign in to comment.