diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 74e0b01a8409..c5ff21aaa4e7 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -33,6 +33,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/loop" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config" @@ -502,7 +503,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator) opts.ScaleUpOrchestrator = scaleUpOrchestrator - provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()}) + client, err := provreqclient.NewProvisioningRequestClient(restConfig) + if err != nil { + return nil, err + } + provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)}) if err != nil { return nil, err } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_injector.go b/cluster-autoscaler/processors/provreq/provisioning_request_injector.go index 2c8394846a98..c5d48f2b1267 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_injector.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_injector.go @@ -78,10 +78,17 @@ func (p *ProvisioningRequestPodsInjector) Process( if err != nil { klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name) provreqconditions.AddOrUpdateCondition(pr, v1beta1.Failed, metav1.ConditionTrue, provreqconditions.FailedToCreatePodsReason, err.Error(), metav1.NewTime(p.clock.Now())) + if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { + klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) + } continue } - unschedulablePods := append(unschedulablePods, provreqpods...) provreqconditions.AddOrUpdateCondition(pr, v1beta1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.clock.Now())) + if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { + klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) + continue + } + unschedulablePods := append(unschedulablePods, provreqpods...) return unschedulablePods, nil } } diff --git a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go index 06cb7af6b546..74dc64e5529b 100644 --- a/cluster-autoscaler/processors/provreq/provisioning_request_processor.go +++ b/cluster-autoscaler/processors/provreq/provisioning_request_processor.go @@ -20,7 +20,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/observers/loopstart" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" - "k8s.io/client-go/rest" "k8s.io/klog/v2" ) @@ -38,12 +37,8 @@ type CombinedProvReqProcessor struct { } // NewCombinedProvReqProcessor return new CombinedProvReqProcessor. -func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) { - client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) - if err != nil { - return nil, err - } - return &CombinedProvReqProcessor{client: client, processors: processors}, nil +func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor) loopstart.Observer { + return &CombinedProvReqProcessor{client: client, processors: processors} } // Refresh iterates over ProvisioningRequests and updates its conditions/state. diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go index 3d336e698d56..9cd18b99b7cf 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor.go @@ -23,7 +23,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" + "k8s.io/klog/v2" ) const ( @@ -36,11 +38,12 @@ const ( type checkCapacityProcessor struct { now func() time.Time maxUpdated int + client *provreqclient.ProvisioningRequestClient } // NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass. -func NewCheckCapacityProcessor() *checkCapacityProcessor { - return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated} +func NewCheckCapacityProcessor(client *provreqclient.ProvisioningRequestClient) *checkCapacityProcessor { + return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client} } // Process iterates over ProvisioningRequests and apply: @@ -71,20 +74,21 @@ func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.Provisioning } } } - updated := 0 for _, provReq := range expiredProvReq { - if updated >= p.maxUpdated { - break - } conditions.AddOrUpdateCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, conditions.CapacityReservationTimeExpiredReason, conditions.CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now())) - updated++ + _, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest) + if updErr != nil { + klog.Errorf("failed to add BookingExpired condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr) + continue + } } for _, provReq := range failedProvReq { - if updated >= p.maxUpdated { - break - } conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.ExpiredReason, conditions.ExpiredMsg, metav1.NewTime(p.now())) - updated++ + _, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest) + if updErr != nil { + klog.Errorf("failed to add Failed condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr) + continue + } } } diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go index 8a899f7537b3..93620fe9a986 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/processor_test.go @@ -146,7 +146,7 @@ func TestProcess(t *testing.T) { additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional") additionalPr.CreationTimestamp = metav1.NewTime(weekAgo) additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity - processor := checkCapacityProcessor{func() time.Time { return now }, 1} + processor := checkCapacityProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)} processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr}) assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions) if len(test.conditions) == len(test.wantConditions) { diff --git a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go index ad03b1656f85..d535d433a648 100644 --- a/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go +++ b/cluster-autoscaler/provisioningrequest/checkcapacity/provisioningclass.go @@ -17,6 +17,8 @@ limitations under the License. package checkcapacity import ( + "fmt" + appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -93,12 +95,18 @@ func (o *checkCapacityProvClass) Provision( } // Assuming that all unschedulable pods comes from one ProvisioningRequest. -func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, provReq *provreqwrapper.ProvisioningRequest) (bool, error) { +func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, provReq *provreqwrapper.ProvisioningRequest) (capacityAvailable bool, err error) { + capacityAvailable = true st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true) if len(st) < len(unschedulablePods) || err != nil { conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now()) - return false, err + capacityAvailable = false + } else { + conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now()) + } + _, updErr := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest) + if updErr != nil { + return false, fmt.Errorf("failed to update Provisioned condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr) } - conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now()) - return true, nil + return capacityAvailable, err } diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index d4d31ff05528..beadd9acd1d3 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -132,6 +132,9 @@ func (o *provReqOrchestrator) bookCapacity() error { // If there is an error, mark PR as invalid, because we won't be able to book capacity // for it anyway. conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now()) + if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil { + klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err) + } continue } podsToCreate = append(podsToCreate, pods...) diff --git a/cluster-autoscaler/provisioningrequest/provreqclient/client.go b/cluster-autoscaler/provisioningrequest/provreqclient/client.go index 9d135bb3f60f..d9612613b091 100644 --- a/cluster-autoscaler/provisioningrequest/provreqclient/client.go +++ b/cluster-autoscaler/provisioningrequest/provreqclient/client.go @@ -17,11 +17,13 @@ limitations under the License. package provreqclient import ( + "context" "fmt" "time" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" @@ -123,6 +125,24 @@ func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRe return podTemplates, nil } +// UpdateProvisioningRequest updates the given ProvisioningRequest CR by propagating the changes using the ProvisioningRequestInterface and returns the updated instance or the original one in case of an error. +func (c *ProvisioningRequestClient) UpdateProvisioningRequest(pr *v1beta1.ProvisioningRequest) (*v1beta1.ProvisioningRequest, error) { + ctx, cancel := context.WithTimeout(context.Background(), provisioningRequestClientCallTimeout) + defer cancel() + + // UpdateStatus API call on a copy of the PR with cleared Spec field ensures that + // the default null template.metadata.creationTimestamp field of PodTemplateSpec + // will not generate false error logs as a side effect. + prCopy := pr.DeepCopy() + prCopy.Spec = v1beta1.ProvisioningRequestSpec{} + updatedPr, err := c.client.AutoscalingV1beta1().ProvisioningRequests(prCopy.Namespace).UpdateStatus(ctx, prCopy, metav1.UpdateOptions{}) + if err != nil { + return pr, err + } + klog.V(4).Infof("Updated ProvisioningRequest %s/%s, status: %q,", updatedPr.Namespace, updatedPr.Name, updatedPr.Status) + return updatedPr, nil +} + // newPRClient creates a new Provisioning Request client from the given config. func newPRClient(kubeConfig *rest.Config) (*versioned.Clientset, error) { return versioned.NewForConfig(kubeConfig)