diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index 9ac7855a582..fba4ea3af51 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -29,19 +29,24 @@ import ( provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/utils/clock" ) const ( - defaultRetryTime = 10 * time.Minute + defaultRetryTime = 1 * time.Minute + maxBackoffTime = 10 * time.Minute + // TODO: replace with timeout for element rather than max size of cache. + maxCacheSize = 1000 ) // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - client *provreqclient.ProvisioningRequestClient - clock clock.PassiveClock + clock clock.PassiveClock + client *provreqclient.ProvisioningRequestClient + backoffDuration map[string]time.Duration } // Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list. @@ -49,6 +54,9 @@ func (p *ProvisioningRequestPodsInjector) Process( _ *context.AutoscalingContext, unschedulablePods []*apiv1.Pod, ) ([]*apiv1.Pod, error) { + if len(p.backoffDuration) >= maxCacheSize { + p.backoffDuration = make(map[string]time.Duration) + } provReqs, err := p.client.ProvisioningRequests() if err != nil { return nil, err @@ -60,16 +68,20 @@ func (p *ProvisioningRequestPodsInjector) Process( } conditions := pr.Status.Conditions if apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Provisioned) { + delete(p.backoffDuration, key(pr)) continue } provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned) + retryTime, found := p.backoffDuration[key(pr)] + if !found { + retryTime = defaultRetryTime + } - //TODO(yaroslava): support exponential backoff // Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime inject := true if provisioned != nil { - if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) { + if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(retryTime).Before(p.clock.Now()) { inject = true } else { inject = false @@ -91,6 +103,7 @@ func (p *ProvisioningRequestPodsInjector) Process( continue } unschedulablePods := append(unschedulablePods, provreqpods...) + p.backoffDuration[key(pr)] = max(2*retryTime, maxBackoffTime) return unschedulablePods, nil } } @@ -108,3 +121,7 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListPr } return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil } + +func key(pr *provreqwrapper.ProvisioningRequest) string { + return string(pr.UID) +} diff --git a/cluster-autoscaler/processors/provreq/injector_test.go b/cluster-autoscaler/processors/provreq/injector_test.go index 625b0a7be57..9d84d3e8dc6 100644 --- a/cluster-autoscaler/processors/provreq/injector_test.go +++ b/cluster-autoscaler/processors/provreq/injector_test.go @@ -32,7 +32,7 @@ import ( func TestProvisioningRequestPodsInjector(t *testing.T) { now := time.Now() - minAgo := now.Add(-1 * time.Minute) + minAgo := now.Add(-1 * time.Minute).Add(-1 * time.Second) hourAgo := now.Add(-1 * time.Hour) accepted := metav1.Condition{ @@ -117,7 +117,8 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { } for _, tc := range testCases { client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) - injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)} + backoffTime := map[string]time.Duration{key(notProvisionedRecentlyProvReqB): 2 * time.Minute} + injector := ProvisioningRequestPodsInjector{clock.NewFakePassiveClock(now), client, backoffTime} getUnscheduledPods, err := injector.Process(nil, []*v1.Pod{}) if err != nil { t.Errorf("%s failed: injector.Process return error %v", tc.name, err)