diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 74e0b01a8409..c5ff21aaa4e7 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -33,6 +33,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/loop" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -502,7 +503,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) opts.ScaleUpOrchestrator = scaleUpOrchestrator - provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()}) + client, err := provreqclient.NewProvisioningRequestClient(restConfig) + if err != nil { + return nil, err + } + provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)}) if err != nil { return nil, err } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go index 451519c7a1f7..74dc64e5529b 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go @@ -20,13 +20,12 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" - "k8s.io/client-go/rest" "k8s.io/klog/v2" ) // ProvisioningRequestProcessor process ProvisioningRequests in the cluster. type ProvisioningRequestProcessor interface { - Process(*provreqclient.ProvisioningRequestClient, []*provreqwrapper.ProvisioningRequest) + Process([]*provreqwrapper.ProvisioningRequest) CleanUp() } @@ -38,12 +37,8 @@ type CombinedProvReqProcessor struct { } // NewCombinedProvReqProcessor return new CombinedProvReqProcessor. -func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) { - client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) - if err != nil { - return nil, err - } - return &CombinedProvReqProcessor{client: client, processors: processors}, nil +func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor) loopstart.Observer { + return &CombinedProvReqProcessor{client: client, processors: processors} } // Refresh iterates over ProvisioningRequests and updates its conditions/state. @@ -54,7 +49,7 @@ func (cp *CombinedProvReqProcessor) Refresh() { return } for _, p := range cp.processors { - p.Process(cp.client, provReqs) + p.Process(provReqs) } } diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go index d11c7ecade80..db69af505118 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go @@ -38,18 +38,19 @@ const ( type checkCapacityProcessor struct { now func() time.Time maxUpdated int + client *provreqclient.ProvisioningRequestClient } // NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass. -func NewCheckCapacityProcessor() *checkCapacityProcessor { - return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated} +func NewCheckCapacityProcessor(client *provreqclient.ProvisioningRequestClient) *checkCapacityProcessor { + return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client} } // Process iterates over ProvisioningRequests and apply: // -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired. // -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime. // TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest -func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningRequestClient, provReqs []*provreqwrapper.ProvisioningRequest) { +func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) { expiredProvReq := []*provreqwrapper.ProvisioningRequest{} failedProvReq := []*provreqwrapper.ProvisioningRequest{} for _, provReq := range provReqs { @@ -79,7 +80,7 @@ func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningReque break } conditions.AddOrUpdateCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, conditions.CapacityReservationTimeExpiredReason, conditions.CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now())) - _, updErr := client.UpdateProvisioningRequest(provReq.ProvisioningRequest) + _, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest) if updErr != nil { klog.Errorf("failed to add BookingExpired condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr) continue @@ -91,7 +92,7 @@ func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningReque break } conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.ExpiredReason, conditions.ExpiredMsg, metav1.NewTime(p.now())) - _, updErr := client.UpdateProvisioningRequest(provReq.ProvisioningRequest) + _, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest) if updErr != nil { klog.Errorf("failed to add Failed condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr) continue diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go index 610a69342ee5..93620fe9a986 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go @@ -146,8 +146,8 @@ func TestProcess(t *testing.T) { additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional") additionalPr.CreationTimestamp = metav1.NewTime(weekAgo) additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity - processor := checkCapacityProcessor{func() time.Time { return now }, 1} - processor.Process(provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), []*provreqwrapper.ProvisioningRequest{pr, additionalPr}) + processor := checkCapacityProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)} + processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr}) assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions) if len(test.conditions) == len(test.wantConditions) { assert.ElementsMatch(t, []metav1.Condition{