Skip to content

Commit

Permalink
[cluster-autoscaler-release-1.30] Fix ProvisioningRequest update (kub…
Browse files Browse the repository at this point in the history
…ernetes#6825)

* Fix ProvisioningRequest update

* Review remarks

---------

Co-authored-by: Yaroslava Serdiuk <[email protected]>
  • Loading branch information
1 parent 55a18a3 commit 79027cd
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 25 deletions.
7 changes: 6 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/loop"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
kubelet_config "k8s.io/kubernetes/pkg/kubelet/apis/config"

Expand Down Expand Up @@ -504,7 +505,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor, err := provreq.NewCombinedProvReqProcessor(restConfig, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor()})
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, err
}
provreqProcesor := provreq.NewCombinedProvReqProcessor(client, []provreq.ProvisioningRequestProcessor{checkcapacity.NewCheckCapacityProcessor(client)})
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,17 @@ func (p *ProvisioningRequestPodsInjector) Process(
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name)
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Failed, metav1.ConditionTrue, provreqconditions.FailedToCreatePodsReason, err.Error(), metav1.NewTime(p.clock.Now()))
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
}
continue
}
unschedulablePods := append(unschedulablePods, provreqpods...)
provreqconditions.AddOrUpdateCondition(pr, v1beta1.Accepted, metav1.ConditionTrue, provreqconditions.AcceptedReason, provreqconditions.AcceptedMsg, metav1.NewTime(p.clock.Now()))
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
continue
}
unschedulablePods := append(unschedulablePods, provreqpods...)
return unschedulablePods, nil
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

Expand All @@ -38,12 +37,8 @@ type CombinedProvReqProcessor struct {
}

// NewCombinedProvReqProcessor return new CombinedProvReqProcessor.
func NewCombinedProvReqProcessor(kubeConfig *rest.Config, processors []ProvisioningRequestProcessor) (loopstart.Observer, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
}
return &CombinedProvReqProcessor{client: client, processors: processors}, nil
func NewCombinedProvReqProcessor(client *provreqclient.ProvisioningRequestClient, processors []ProvisioningRequestProcessor) loopstart.Observer {
return &CombinedProvReqProcessor{client: client, processors: processors}
}

// Refresh iterates over ProvisioningRequests and updates its conditions/state.
Expand Down
26 changes: 15 additions & 11 deletions cluster-autoscaler/provisioningrequest/checkcapacity/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/klog/v2"
)

const (
Expand All @@ -36,11 +38,12 @@ const (
type checkCapacityProcessor struct {
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
}

// NewCheckCapacityProcessor return ProvisioningRequestProcessor for Check-capacity ProvisioningClass.
func NewCheckCapacityProcessor() *checkCapacityProcessor {
return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated}
func NewCheckCapacityProcessor(client *provreqclient.ProvisioningRequestClient) *checkCapacityProcessor {
return &checkCapacityProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client}
}

// Process iterates over ProvisioningRequests and apply:
Expand Down Expand Up @@ -71,20 +74,21 @@ func (p *checkCapacityProcessor) Process(provReqs []*provreqwrapper.Provisioning
}
}
}
updated := 0
for _, provReq := range expiredProvReq {
if updated >= p.maxUpdated {
break
}
conditions.AddOrUpdateCondition(provReq, v1beta1.BookingExpired, metav1.ConditionTrue, conditions.CapacityReservationTimeExpiredReason, conditions.CapacityReservationTimeExpiredMsg, metav1.NewTime(p.now()))
updated++
_, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest)
if updErr != nil {
klog.Errorf("failed to add BookingExpired condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr)
continue
}
}
for _, provReq := range failedProvReq {
if updated >= p.maxUpdated {
break
}
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.ExpiredReason, conditions.ExpiredMsg, metav1.NewTime(p.now()))
updated++
_, updErr := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest)
if updErr != nil {
klog.Errorf("failed to add Failed condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr)
continue
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestProcess(t *testing.T) {
additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional")
additionalPr.CreationTimestamp = metav1.NewTime(weekAgo)
additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity
processor := checkCapacityProcessor{func() time.Time { return now }, 1}
processor := checkCapacityProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)}
processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions)
if len(test.conditions) == len(test.wantConditions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package checkcapacity

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -93,12 +95,18 @@ func (o *checkCapacityProvClass) Provision(
}

// Assuming that all unschedulable pods comes from one ProvisioningRequest.
func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, provReq *provreqwrapper.ProvisioningRequest) (bool, error) {
func (o *checkCapacityProvClass) checkcapacity(unschedulablePods []*apiv1.Pod, provReq *provreqwrapper.ProvisioningRequest) (capacityAvailable bool, err error) {
capacityAvailable = true
st, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, unschedulablePods, scheduling.ScheduleAnywhere, true)
if len(st) < len(unschedulablePods) || err != nil {
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now())
return false, err
capacityAvailable = false
} else {
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now())
}
_, updErr := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest)
if updErr != nil {
return false, fmt.Errorf("failed to update Provisioned condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, updErr)
}
conditions.AddOrUpdateCondition(provReq, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now())
return true, nil
return capacityAvailable, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ func (o *provReqOrchestrator) bookCapacity() error {
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
Expand Down
20 changes: 20 additions & 0 deletions cluster-autoscaler/provisioningrequest/provreqclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package provreqclient

import (
"context"
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
Expand Down Expand Up @@ -123,6 +125,24 @@ func (c *ProvisioningRequestClient) FetchPodTemplates(pr *v1beta1.ProvisioningRe
return podTemplates, nil
}

// UpdateProvisioningRequest updates the given ProvisioningRequest CR by propagating the changes using the ProvisioningRequestInterface and returns the updated instance or the original one in case of an error.
func (c *ProvisioningRequestClient) UpdateProvisioningRequest(pr *v1beta1.ProvisioningRequest) (*v1beta1.ProvisioningRequest, error) {
ctx, cancel := context.WithTimeout(context.Background(), provisioningRequestClientCallTimeout)
defer cancel()

// UpdateStatus API call on a copy of the PR with cleared Spec field ensures that
// the default null template.metadata.creationTimestamp field of PodTemplateSpec
// will not generate false error logs as a side effect.
prCopy := pr.DeepCopy()
prCopy.Spec = v1beta1.ProvisioningRequestSpec{}
updatedPr, err := c.client.AutoscalingV1beta1().ProvisioningRequests(prCopy.Namespace).UpdateStatus(ctx, prCopy, metav1.UpdateOptions{})
if err != nil {
return pr, err
}
klog.V(4).Infof("Updated ProvisioningRequest %s/%s, status: %q,", updatedPr.Namespace, updatedPr.Name, updatedPr.Status)
return updatedPr, nil
}

// newPRClient creates a new Provisioning Request client from the given config.
func newPRClient(kubeConfig *rest.Config) (*versioned.Clientset, error) {
return versioned.NewForConfig(kubeConfig)
Expand Down

0 comments on commit 79027cd

Please sign in to comment.