Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Review remarks
Browse files Browse the repository at this point in the history
yaroslava-serdiuk committed Mar 27, 2024
1 parent 3876035 commit 507f9f7
Showing 12 changed files with 54 additions and 62 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
@@ -499,7 +499,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
if err != nil {
return nil, err
}
scaleUpOrchestrator := orchestrator.NewWrapperOrchestrator(provreqOrchestrator)
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ var SupportedProvisioningClasses = []string{v1beta1.ProvisioningClassCheckCapaci

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client provreqclient.ProvisioningRequestClient
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
}

Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ type ProvisioningRequestProcessor interface {
// CombinedProvReqProcessor is responsible for processing ProvisioningRequest for each ProvisioningClass
// every CA loop and updating conditions for expired ProvisioningRequests.
type CombinedProvReqProcessor struct {
client provreqclient.ProvisioningRequestClient
client *provreqclient.ProvisioningRequestClient
processors []ProvisioningRequestProcessor
}

Original file line number Diff line number Diff line change
@@ -22,11 +22,11 @@ import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
@@ -37,20 +37,20 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type checkCapacityScaleUpMode struct {
type checkCapacityProvClass struct {
context *context.AutoscalingContext
client provreqclient.ProvisioningRequestClient
client *provreqclient.ProvisioningRequestClient
injector *scheduling.HintingSimulator
}

// New create check-capacity scale-up mode.
func New(
client provreqclient.ProvisioningRequestClient,
) *checkCapacityScaleUpMode {
return &checkCapacityScaleUpMode{client: client}
client *provreqclient.ProvisioningRequestClient,
) *checkCapacityProvClass {
return &checkCapacityProvClass{client: client}
}

func (o *checkCapacityScaleUpMode) Initialize(
func (o *checkCapacityProvClass) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
@@ -62,24 +62,28 @@ func (o *checkCapacityScaleUpMode) Initialize(
o.injector = injector
}

// ScaleUp return if there is capacity in the cluster for pods from ProvisioningRequest.
func (o *checkCapacityScaleUpMode) ScaleUp(
// Provision return if there is capacity in the cluster for pods from ProvisioningRequest.
func (o *checkCapacityProvClass) Provision(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if pr, err := provreqclient.VerifyProvisioningRequestClass(o.client, unschedulablePods, v1beta1.ProvisioningClassCheckCapacity); err != nil {

if len(unschedulablePods) == 0 {
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
}
pr, err := provreqclient.ProvisioningRequestForPods(o.client, unschedulablePods)
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error()))
} else if pr == nil {
}
if pr.V1Beta1().Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity {
return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil
}

o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()

scaleUpIsSuccessful, err := o.scaleUp(unschedulablePods)
scaleUpIsSuccessful, err := o.checkcapacity(unschedulablePods)
if err != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error()))
}
@@ -90,7 +94,7 @@ func (o *checkCapacityScaleUpMode) ScaleUp(
}

// Assuming that all unschedulable pods comes from one ProvisioningRequest.
func (o *checkCapacityScaleUpMode) scaleUp(unschedulablePods []*apiv1.Pod) (bool, error) {
func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod) (bool, error) {
provReq, err := o.client.ProvisioningRequest(unschedulablePods[0].Namespace, unschedulablePods[0].OwnerReferences[0].Name)
if err != nil {
return false, fmt.Errorf("failed retrive ProvisioningRequest from unscheduled pods, err: %v", err)
22 changes: 11 additions & 11 deletions cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
@@ -42,20 +42,20 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

type scaleUpMode interface {
ScaleUp([]*apiv1.Pod, []*apiv1.Node, []*appsv1.DaemonSet,
type provisioningClass interface {
Provision([]*apiv1.Pod, []*apiv1.Node, []*appsv1.DaemonSet,
map[string]*schedulerframework.NodeInfo) (*status.ScaleUpStatus, ca_errors.AutoscalerError)
Initialize(*context.AutoscalingContext, *ca_processors.AutoscalingProcessors, *clusterstate.ClusterStateRegistry,
estimator.EstimatorBuilder, taints.TaintConfig, *scheduling.HintingSimulator)
}

// provReqOrchestrator is an orchestrator that contains orchestrators for all supported Provisioning Classes.
type provReqOrchestrator struct {
initialized bool
context *context.AutoscalingContext
client provreqclient.ProvisioningRequestClient
injector *scheduling.HintingSimulator
scaleUpModes []scaleUpMode
initialized bool
context *context.AutoscalingContext
client *provreqclient.ProvisioningRequestClient
injector *scheduling.HintingSimulator
provisioningClasses []provisioningClass
}

// New return new orchestrator.
@@ -65,7 +65,7 @@ func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) {
return nil, err
}

return &provReqOrchestrator{client: client, scaleUpModes: []scaleUpMode{checkcapacity.New(client)}}, nil
return &provReqOrchestrator{client: client, provisioningClasses: []provisioningClass{checkcapacity.New(client)}}, nil
}

// Initialize initialize orchestrator.
@@ -79,7 +79,7 @@ func (o *provReqOrchestrator) Initialize(
o.initialized = true
o.context = autoscalingContext
o.injector = scheduling.NewHintingSimulator(autoscalingContext.PredicateChecker)
for _, mode := range o.scaleUpModes {
for _, mode := range o.provisioningClasses {
mode.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig, o.injector)
}
}
@@ -98,8 +98,8 @@ func (o *provReqOrchestrator) ScaleUp(
o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()
o.bookCapacity()
for _, scaleUpMode := range o.scaleUpModes {
st, err := scaleUpMode.ScaleUp(unschedulablePods, nodes, daemonSets, nodeInfos)
for _, provClass := range o.provisioningClasses {
st, err := provClass.Provision(unschedulablePods, nodes, daemonSets, nodeInfos)
errors.Join(combinedError, err)
if st != nil && st.Result != status.ScaleUpNotTried {
orchestratorStatus = st
Original file line number Diff line number Diff line change
@@ -111,8 +111,8 @@ func TestScaleUp(t *testing.T) {
assert.NoError(t, err)
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
orchestrator := &provReqOrchestrator{
client: client,
scaleUpModes: []scaleUpMode{checkcapacity.New(client)},
client: client,
provisioningClasses: []provisioningClass{checkcapacity.New(client)},
}
orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{})
st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{})
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
@@ -44,7 +45,7 @@ type WrapperOrchestrator struct {
// NewWrapperOrchestrator return WrapperOrchestrator
func NewWrapperOrchestrator(provReqOrchestrator scaleup.Orchestrator) *WrapperOrchestrator {
return &WrapperOrchestrator{
podsOrchestrator: New(),
podsOrchestrator: orchestrator.New(),
provReqOrchestrator: provReqOrchestrator,
}
}
Original file line number Diff line number Diff line change
@@ -39,7 +39,7 @@ const (
regularPodsErrorMsg = "regularPodsError"
)

func TestScaleUp(t *testing.T) {
func TestWrapperScaleUp(t *testing.T) {
o := WrapperOrchestrator{
provReqOrchestrator: &fakeScaleUp{provisioningRequestErrorMsg},
podsOrchestrator: &fakeScaleUp{regularPodsErrorMsg},
1 change: 0 additions & 1 deletion cluster-autoscaler/provisioningrequest/pods/pods.go
Original file line number Diff line number Diff line change
@@ -48,7 +48,6 @@ func PodsForProvisioningRequest(pr *provreqwrapper.ProvisioningRequest) ([]*v1.P
for i, podSet := range podSets {
for j := 0; j < int(podSet.Count); j++ {
pod, err := controller.GetPodFromTemplate(&podSet.PodTemplate, pr.RuntimeObject(), ownerReference(pr))
pod.OwnerReferences = []metav1.OwnerReference{*ownerReference(pr)}
if err != nil {
return nil, fmt.Errorf("while creating pod for pr: %s/%s podSet: %d, got error: %w", pr.Namespace(), pr.Name(), i, err)
}
29 changes: 10 additions & 19 deletions cluster-autoscaler/provisioningrequest/provreqclient/client.go
Original file line number Diff line number Diff line change
@@ -41,21 +41,15 @@ const (
provisioningRequestClientCallTimeout = 4 * time.Second
)

// ProvisioningRequestClient is an interface with methods for v1beta1 ProvReq CRD client.
type ProvisioningRequestClient interface {
ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error)
ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error)
}

// provisioningRequestClient represents client for v1beta1 ProvReq CRD.
type provisioningRequestClient struct {
// ProvisioningRequestClient represents client for v1beta1 ProvReq CRD.
type ProvisioningRequestClient struct {
client versioned.Interface
provReqLister listers.ProvisioningRequestLister
podTemplLister v1.PodTemplateLister
}

// NewProvisioningRequestClient configures and returns a provisioningRequestClient.
func NewProvisioningRequestClient(kubeConfig *rest.Config) (*provisioningRequestClient, error) {
func NewProvisioningRequestClient(kubeConfig *rest.Config) (*ProvisioningRequestClient, error) {
prClient, err := newPRClient(kubeConfig)
if err != nil {
return nil, fmt.Errorf("Failed to create Provisioning Request client: %v", err)
@@ -76,15 +70,15 @@ func NewProvisioningRequestClient(kubeConfig *rest.Config) (*provisioningRequest
return nil, err
}

return &provisioningRequestClient{
return &ProvisioningRequestClient{
client: prClient,
provReqLister: provReqLister,
podTemplLister: podTemplLister,
}, nil
}

// ProvisioningRequest gets a specific ProvisioningRequest CR.
func (c *provisioningRequestClient) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) {
func (c *ProvisioningRequestClient) ProvisioningRequest(namespace, name string) (*provreqwrapper.ProvisioningRequest, error) {
v1Beta1PR, err := c.provReqLister.ProvisioningRequests(namespace).Get(name)
if err != nil {
return nil, err
@@ -97,7 +91,7 @@ func (c *provisioningRequestClient) ProvisioningRequest(namespace, name string)
}

// ProvisioningRequests gets all ProvisioningRequest CRs.
func (c *provisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) {
func (c *ProvisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.ProvisioningRequest, error) {
v1Beta1PRs, err := c.provReqLister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("error fetching provisioningRequests: %w", err)
@@ -114,7 +108,7 @@ func (c *provisioningRequestClient) ProvisioningRequests() ([]*provreqwrapper.Pr
}

// FetchPodTemplates fetches PodTemplates referenced by the Provisioning Request.
func (c *provisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) {
func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRequest) ([]*apiv1.PodTemplate, error) {
podTemplates := make([]*apiv1.PodTemplate, 0, len(pr.Spec.PodSets))
for _, podSpec := range pr.Spec.PodSets {
podTemplate, err := c.podTemplLister.PodTemplates(pr.Namespace).Get(podSpec.PodTemplateRef.Name)
@@ -164,10 +158,10 @@ func newPodTemplatesLister(client *kubernetes.Clientset, stopChannel <-chan stru
return podTemplLister, nil
}

// VerifyProvisioningRequestClass check that all pods belong to one ProvisioningRequest that belongs to check-capacity ProvisioningRequst class.
func VerifyProvisioningRequestClass(client ProvisioningRequestClient, unschedulablePods []*apiv1.Pod, className string) (*provreqwrapper.ProvisioningRequest, error) {
// ProvisioningRequestForPods check that all pods belong to one ProvisioningRequest and return it.
func ProvisioningRequestForPods(client *ProvisioningRequestClient, unschedulablePods []*apiv1.Pod) (*provreqwrapper.ProvisioningRequest, error) {
if len(unschedulablePods) == 0 {
return nil, nil
return nil, fmt.Errorf("empty unschedulablePods list")
}
if unschedulablePods[0].OwnerReferences == nil || len(unschedulablePods[0].OwnerReferences) == 0 {
return nil, fmt.Errorf("pod %s has no OwnerReference", unschedulablePods[0].Name)
@@ -176,9 +170,6 @@ func VerifyProvisioningRequestClass(client ProvisioningRequestClient, unschedula
if err != nil {
return nil, fmt.Errorf("failed retrive ProvisioningRequest from unscheduled pods, err: %v", err)
}
if provReq.V1Beta1().Spec.ProvisioningClassName != className {
return nil, nil
}
for _, pod := range unschedulablePods {
if pod.Namespace != unschedulablePods[0].Namespace {
return nil, fmt.Errorf("pods %s and %s are from different namespaces", pod.Name, unschedulablePods[0].Name)
Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ func TestFetchPodTemplates(t *testing.T) {
}
}

func TestVerifyProvisioningRequestClass(t *testing.T) {
func TestProvisioningRequestForPods(t *testing.T) {
checkCapacityProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "check-capacity", "1m", "100", "", int32(100), false, time.Now(), v1beta1.ProvisioningClassCheckCapacity)
customProvReq := provreqwrapper.BuildTestProvisioningRequest("ns", "custom", "1m", "100", "", int32(100), false, time.Now(), "custom")
checkCapacityPods, _ := pods.PodsForProvisioningRequest(checkCapacityProvReq)
@@ -67,6 +67,7 @@ func TestVerifyProvisioningRequestClass(t *testing.T) {
name: "no pods",
pods: []*apiv1.Pod{},
className: "some-class",
err: true,
},
{
name: "pods from one Provisioning Class",
@@ -92,22 +93,18 @@ func TestVerifyProvisioningRequestClass(t *testing.T) {
className: v1beta1.ProvisioningClassCheckCapacity,
err: true,
},
{
name: "wrong Provisioning Class name",
pods: customProvReqPods,
className: v1beta1.ProvisioningClassCheckCapacity,
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
pr, err := VerifyProvisioningRequestClass(client, tc.pods, tc.className)
assert.Equal(t, pr, tc.pr)
pr, err := ProvisioningRequestForPods(client, tc.pods)
if tc.err {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, pr, tc.pr)
assert.Equal(t, pr.V1Beta1().Spec.ProvisioningClassName, tc.className)
}
})
}
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ import (
)

// NewFakeProvisioningRequestClient mock ProvisioningRequestClient for tests.
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *provisioningRequestClient {
func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...*provreqwrapper.ProvisioningRequest) *ProvisioningRequestClient {
t.Helper()
provReqClient := fake.NewSimpleClientset()
podTemplClient := fake_kubernetes.NewSimpleClientset()
@@ -60,7 +60,7 @@ func NewFakeProvisioningRequestClient(ctx context.Context, t *testing.T, prs ...
if err != nil {
t.Fatalf("Failed to create Provisioning Request lister. Error was: %v", err)
}
return &provisioningRequestClient{
return &ProvisioningRequestClient{
client: provReqClient,
provReqLister: provReqLister,
podTemplLister: podTemplLister,

0 comments on commit 507f9f7

Please sign in to comment.