From 9a3da049cf6b7d606ee0164a5b4270efda1bfa1a Mon Sep 17 00:00:00 2001 From: Yaroslava Serdiuk Date: Thu, 2 May 2024 14:25:24 +0000 Subject: [PATCH] Review remarks --- cluster-autoscaler/main.go | 7 ++++++- .../provreq/provisioning_request_processor.go | 13 ++++--------- .../provisioningrequest/checkcapacity/processor.go | 11 ++++++----- .../checkcapacity/processor_test.go | 4 ++-- .../checkcapacity/provisioningclass.go | 5 +++-- .../provisioningrequest/provreqclient/client.go | 3 +++ 6 files changed, 24 insertions(+), 19 deletions(-) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 74e0b01a8409..c5ff21aaa4e7 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -33,6 +33,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/loop" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -502,7 +503,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) opts.ScaleUpOrchestrator = scaleUpOrchestrator - provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()}) + client, err := provreqclient.NewProvisioningRequestClient(restConfig) + if err != nil { + return nil, err + } + provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)}) if err != nil { return nil, err } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go index 451519c7a1f7..74dc64e5529b 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go @@ -20,13 +20,12 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" - "k8s.io/client-go/rest" "k8s.io/klog/v2" ) // ProvisioningRequestProcessor process ProvisioningRequests in the cluster. type ProvisioningRequestProcessor interface { - Process(*provreqclient.ProvisioningRequestClient, []*provreqwrapper.ProvisioningRequest) + Process([]*provreqwrapper.ProvisioningRequest) CleanUp() } @@ -38,12 +37,8 @@ type CombinedProvReqProcessor struct { } // NewCombinedProvReqProcessor return new CombinedProvReqProcessor. -func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) { - client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) - if err != nil { - return nil, err - } - return &CombinedProvReqProcessor{client: client, processors: processors}, nil +func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor) loopstart.Observer { + return &CombinedProvReqProcessor{client: client, processors: processors} } // Refresh iterates over ProvisioningRequests and updates its conditions/state. @@ -54,7 +49,7 @@ func (cp *CombinedProvReqProcessor) Refresh() { return } for _, p := range cp.processors { - p.Process(cp.client, provReqs) + p.Process(provReqs) } } diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go index d11c7ecade80..db69af505118 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go @@ -38,18 +38,19 @@ const ( type checkCapacityProcessor struct { now func() time.Time maxUpdated int + client *provreqclient.ProvisioningRequestClient } // NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass. -func NewCheckCapacityProcessor() *checkCapacityProcessor { - return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated} +func NewCheckCapacityProcessor(client *provreqclient.ProvisioningRequestClient) *checkCapacityProcessor { + return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client} } // Process iterates over ProvisioningRequests and apply: // -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired. // -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime. // TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest -func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningRequestClient, provReqs []*provreqwrapper.ProvisioningRequest) { +func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) { expiredProvReq := []*provreqwrapper.ProvisioningRequest{} failedProvReq := []*provreqwrapper.ProvisioningRequest{} for _, provReq := range provReqs { @@ -79,7 +80,7 @@ func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningReque break } conditions.AddOrUpdateCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, conditions.CapacityReservationTimeExpiredReason, conditions.CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now())) - _, updErr := client.UpdateProvisioningRequest(provReq.ProvisioningRequest) + _, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest) if updErr != nil { klog.Errorf("failed to add BookingExpired condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr) continue @@ -91,7 +92,7 @@ func (p *checkCapacityProcessor) Process(client *provreqclient.ProvisioningReque break } conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.ExpiredReason, conditions.ExpiredMsg, metav1.NewTime(p.now())) - _, updErr := client.UpdateProvisioningRequest(provReq.ProvisioningRequest) + _, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest) if updErr != nil { klog.Errorf("failed to add Failed condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr) continue diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go index 610a69342ee5..93620fe9a986 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go @@ -146,8 +146,8 @@ func TestProcess(t *testing.T) { additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional") additionalPr.CreationTimestamp = metav1.NewTime(weekAgo) additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity - processor := checkCapacityProcessor{func() time.Time { return now }, 1} - processor.Process(provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), []*provreqwrapper.ProvisioningRequest{pr, additionalPr}) + processor := checkCapacityProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)} + processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr}) assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions) if len(test.conditions) == len(test.wantConditions) { assert.ElementsMatch(t, []metav1.Condition{ diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go index 9dd3c1b1043b..d535d433a648 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go @@ -17,6 +17,8 @@ limitations under the License. package checkcapacity import ( + "fmt" + appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,7 +33,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" - "k8s.io/klog/v2" ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -105,7 +106,7 @@ func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, p } _, updErr := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest) if updErr != nil { - klog.Errorf("failed to update Provisioned condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err) + return false, fmt.Errorf("failed to update Provisioned condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr) } return capacityAvailable, err } diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client.go b/cluster-autoscaler/provisioningrequest/provreqclient/client.go index 7496b408daae..d9612613b091 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client.go @@ -130,6 +130,9 @@ func (c *ProvisioningRequestClient) UpdateProvisioningRequest(pr *v1beta1.Provis ctx, cancel := context.WithTimeout(context.Background(), provisioningRequestClientCallTimeout) defer cancel() + // UpdateStatus API call on a copy of the PR with cleared Spec field ensures that + // the default null template.metadata.creationTimestamp field of PodTemplateSpec + // will not generate false error logs as a side effect. prCopy := pr.DeepCopy() prCopy.Spec = v1beta1.ProvisioningRequestSpec{} updatedPr, err := c.client.AutoscalingV1beta1().ProvisioningRequests(prCopy.Namespace).UpdateStatus(ctx, prCopy, metav1.UpdateOptions{})