diff --git a/cluster-autoscaler/loop/trigger.go b/cluster-autoscaler/loop/trigger.go index 52ef962ba1b0..e5dd513c158b 100644 --- a/cluster-autoscaler/loop/trigger.go +++ b/cluster-autoscaler/loop/trigger.go @@ -43,19 +43,29 @@ type scalingTimesGetter interface { LastScaleDownDeleteTime() time.Time } +// provisioningRequestProcessingTimesGetter exposes recent provisioning request processing activity regardless of wether the +// ProvisioningRequest was marked as accepted or failed. This is because a ProvisioningRequest being processed indicates that +// there are other ProvisioningRequests that require processing regardless of the outcome of the current one. Thus, the next iteration +// should be started immediately. +type provisioningRequestProcessingTimesGetter interface { + LastProvisioningRequestProcessTime() time.Time +} + // LoopTrigger object implements criteria used to start new autoscaling iteration type LoopTrigger struct { - podObserver *UnschedulablePodObserver - scanInterval time.Duration - scalingTimesGetter scalingTimesGetter + podObserver *UnschedulablePodObserver + scanInterval time.Duration + scalingTimesGetter scalingTimesGetter + provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter } // NewLoopTrigger creates a LoopTrigger object -func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger { +func NewLoopTrigger(scalingTimesGetter scalingTimesGetter, provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter, podObserver *UnschedulablePodObserver, scanInterval time.Duration) *LoopTrigger { return &LoopTrigger{ - podObserver: podObserver, - scanInterval: scanInterval, - scalingTimesGetter: scalingTimesGetter, + podObserver: podObserver, + scanInterval: scanInterval, + scalingTimesGetter: scalingTimesGetter, + provisioningRequestProcessTimeGetter: provisioningRequestProcessTimeGetter, } } @@ -66,14 +76,18 @@ func (t *LoopTrigger) Wait(lastRun time.Time) { // To improve scale-up throughput, Cluster Autoscaler starts new iteration // immediately if the previous one was productive. - if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) || - !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) { - select { - case <-t.podObserver.unschedulablePodChan: - klog.Info("Autoscaler loop triggered by unschedulable pod appearing") - default: - klog.Infof("Autoscaler loop triggered immediately after a productive iteration") - } + if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) { + t.logTriggerReason("Autoscaler loop triggered immediately after a scale up") + return + } + + if !t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) { + t.logTriggerReason("Autoscaler loop triggered immediately after a scale down") + return + } + + if t.provisioningRequestWasProcessed(lastRun) { + t.logTriggerReason("Autoscaler loop triggered immediately after a provisioning request was processed") return } @@ -118,6 +132,20 @@ func StartPodObserver(ctx context.Context, kubeClient kube_client.Interface) *Un } } +// logTriggerReason logs a message if the next iteration was not triggered by unschedulable pods appearing, else it logs a message that the next iteration was triggered by unschedulable pods appearing +func (t *LoopTrigger) logTriggerReason(message string) { + select { + case <-t.podObserver.unschedulablePodChan: + klog.Info("Autoscaler loop triggered by unschedulable pod appearing") + default: + klog.Infof(message) + } +} + +func (t *LoopTrigger) provisioningRequestWasProcessed(lastRun time.Time) bool { + return t.provisioningRequestProcessTimeGetter != nil && !t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessTime().Before(lastRun) +} + // isRecentUnschedulablePod checks if the object is an unschedulable pod observed recently. func isRecentUnschedulablePod(obj any) bool { pod, ok := obj.(*apiv1.Pod) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index d56d581afb31..fbbba0d5b142 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -468,7 +468,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) { }() } -func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) { +func buildAutoscaler(context ctx.Context, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) { // Create basic config from flags. autoscalingOptions := createAutoscalingOptions() @@ -487,7 +487,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(informerFactory, autoscalingOptions.SchedulerConfig) if err != nil { - return nil, err + return nil, nil, err } deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions) drainabilityRules := rules.Default(deleteOptions) @@ -508,13 +508,14 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets) podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker, scheduling.ScheduleAnywhere) + var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector if autoscalingOptions.ProvisioningRequestEnabled { podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager())) restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts) client, err := provreqclient.NewProvisioningRequestClient(restConfig) if err != nil { - return nil, err + return nil, nil, err } provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{ checkcapacity.New(client), @@ -525,11 +526,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter opts.ScaleUpOrchestrator = scaleUpOrchestrator provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker) opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor}) - injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize) + ProvisioningRequestInjector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize) if err != nil { - return nil, err + return nil, nil, err } - podListProcessor.AddProcessor(injector) + podListProcessor.AddProcessor(ProvisioningRequestInjector) podListProcessor.AddProcessor(provreqProcesor) } @@ -594,7 +595,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter // Create autoscaler. autoscaler, err := core.NewAutoscaler(opts, informerFactory) if err != nil { - return nil, err + return nil, nil, err } // Start informers. This must come after fully constructing the autoscaler because @@ -602,13 +603,22 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter stop := make(chan struct{}) informerFactory.Start(stop) - return autoscaler, nil + podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(autoscalingOptions.KubeClientOpts)) + + // A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a + // ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods + // also marks the ProvisioningRequest as accepted or failed. + trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, podObserver, *scanInterval) + + return autoscaler, trigger, nil } func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) { metrics.RegisterAll(*emitPerNodeGroupMetrics) + context, cancel := ctx.WithCancel(ctx.Background()) + defer cancel() - autoscaler, err := buildAutoscaler(debuggingSnapshotter) + autoscaler, trigger, err := buildAutoscaler(context, debuggingSnapshotter) if err != nil { klog.Fatalf("Failed to create autoscaler: %v", err) } @@ -625,11 +635,7 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho } // Autoscale ad infinitum. - context, cancel := ctx.WithCancel(ctx.Background()) - defer cancel() if *frequentLoopsEnabled { - podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts)) - trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval) lastRun := time.Now() for { trigger.Wait(lastRun) diff --git a/cluster-autoscaler/processors/provreq/injector.go b/cluster-autoscaler/processors/provreq/injector.go index 5fc88380bb0a..70dc78965afc 100644 --- a/cluster-autoscaler/processors/provreq/injector.go +++ b/cluster-autoscaler/processors/provreq/injector.go @@ -24,7 +24,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1" "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest" provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" @@ -38,11 +37,12 @@ import ( // ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list. type ProvisioningRequestPodsInjector struct { - initialRetryTime time.Duration - maxBackoffTime time.Duration - backoffDuration *lru.Cache - clock clock.PassiveClock - client *provreqclient.ProvisioningRequestClient + initialRetryTime time.Duration + maxBackoffTime time.Duration + backoffDuration *lru.Cache + clock clock.PassiveClock + client *provreqclient.ProvisioningRequestClient + lastProvisioningRequestProcessTime time.Time } // IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently. @@ -78,6 +78,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.Prov klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) return err } + p.lastProvisioningRequestProcessTime = p.clock.Now() return nil } @@ -87,6 +88,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil { klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err) } + p.lastProvisioningRequestProcessTime = p.clock.Now() } // GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it. @@ -112,7 +114,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( continue } - provreqpods, err := provreqpods.PodsForProvisioningRequest(pr) + podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr) if err != nil { klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name) p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error()) @@ -121,7 +123,8 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest( if err := p.MarkAsAccepted(pr); err != nil { continue } - return provreqpods, nil + + return podsFromProvReq, nil } return nil, nil } @@ -151,7 +154,7 @@ func (p *ProvisioningRequestPodsInjector) Process( func (p *ProvisioningRequestPodsInjector) CleanUp() {} // NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor. -func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (pods.PodListProcessor, error) { +func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (*ProvisioningRequestPodsInjector, error) { client, err := provreqclient.NewProvisioningRequestClient(kubeConfig) if err != nil { return nil, err @@ -162,3 +165,8 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffT func key(pr *provreqwrapper.ProvisioningRequest) string { return string(pr.UID) } + +// LastProvisioningRequestProcessTime returns the time when the last provisioning request was processed. +func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessTime() time.Time { + return p.lastProvisioningRequestProcessTime +} diff --git a/cluster-autoscaler/processors/provreq/injector_test.go b/cluster-autoscaler/processors/provreq/injector_test.go index 37e4e720168d..0e51026c254c 100644 --- a/cluster-autoscaler/processors/provreq/injector_test.go +++ b/cluster-autoscaler/processors/provreq/injector_test.go @@ -131,7 +131,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) { client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) backoffTime := lru.New(100) backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute) - injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client} + injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now} getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount)) if err != nil { t.Errorf("%s failed: injector.Process return error %v", tc.name, err)