Skip to content

Commit

Permalink
Add backoff mechanism for ProvReq retry
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslava-serdiuk committed Aug 20, 2024
1 parent 616930b commit 242edc5
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
27 changes: 22 additions & 5 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,34 @@ import (
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
)

const (
defaultRetryTime = 10 * time.Minute
defaultRetryTime = 1 * time.Minute
maxBackoffTime = 10 * time.Minute
// TODO: replace with timeout for element rather than max size of cache.
maxCacheSize = 1000
)

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
client *provreqclient.ProvisioningRequestClient
clock clock.PassiveClock
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
backoffDuration map[string]time.Duration
}

// Process pick one ProvisioningRequest, update Accepted condition and inject pods to unscheduled pods list.
func (p *ProvisioningRequestPodsInjector) Process(
_ *context.AutoscalingContext,
unschedulablePods []*apiv1.Pod,
) ([]*apiv1.Pod, error) {
if len(p.backoffDuration) >= maxCacheSize {
p.backoffDuration = make(map[string]time.Duration)
}
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return nil, err
Expand All @@ -60,16 +68,20 @@ func (p *ProvisioningRequestPodsInjector) Process(
}
conditions := pr.Status.Conditions
if apimeta.IsStatusConditionTrue(conditions, v1beta1.Failed) || apimeta.IsStatusConditionTrue(conditions, v1beta1.Provisioned) {
delete(p.backoffDuration, key(pr))
continue
}

provisioned := apimeta.FindStatusCondition(conditions, v1beta1.Provisioned)
retryTime, found := p.backoffDuration[key(pr)]
if !found {
retryTime = defaultRetryTime
}

//TODO(yaroslava): support exponential backoff
// Inject pods if ProvReq wasn't scaled up before or it has Provisioned == False condition more than defaultRetryTime
inject := true
if provisioned != nil {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(defaultRetryTime).Before(p.clock.Now()) {
if provisioned.Status == metav1.ConditionFalse && provisioned.LastTransitionTime.Add(retryTime).Before(p.clock.Now()) {
inject = true
} else {
inject = false
Expand All @@ -91,6 +103,7 @@ func (p *ProvisioningRequestPodsInjector) Process(
continue
}
unschedulablePods := append(unschedulablePods, provreqpods...)
p.backoffDuration[key(pr)] = max(2*retryTime, maxBackoffTime)
return unschedulablePods, nil
}
}
Expand All @@ -108,3 +121,7 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config) (pods.PodListPr
}
return &ProvisioningRequestPodsInjector{client: client, clock: clock.RealClock{}}, nil
}

func key(pr *provreqwrapper.ProvisioningRequest) string {
return string(pr.UID)
}
5 changes: 3 additions & 2 deletions cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (

func TestProvisioningRequestPodsInjector(t *testing.T) {
now := time.Now()
minAgo := now.Add(-1 * time.Minute)
minAgo := now.Add(-1 * time.Minute).Add(-1 * time.Second)
hourAgo := now.Add(-1 * time.Hour)

accepted := metav1.Condition{
Expand Down Expand Up @@ -117,7 +117,8 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
}
for _, tc := range testCases {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
injector := ProvisioningRequestPodsInjector{client, clock.NewFakePassiveClock(now)}
backoffTime := map[string]time.Duration{key(notProvisionedRecentlyProvReqB): 2 * time.Minute}
injector := ProvisioningRequestPodsInjector{clock.NewFakePassiveClock(now), client, backoffTime}
getUnscheduledPods, err := injector.Process(nil, []*v1.Pod{})
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down

0 comments on commit 242edc5

Please sign in to comment.