Skip to content

Commit

Permalink
Add support for frequent loops when provisioningrequest is encountere…
Browse files Browse the repository at this point in the history
…d in last iteration
  • Loading branch information
Duke0404 committed Sep 24, 2024
1 parent a98d199 commit 7e2f1a7
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 36 deletions.
68 changes: 53 additions & 15 deletions cluster-autoscaler/loop/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ package loop

import (
"context"
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_client "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
Expand All @@ -43,38 +45,59 @@ type scalingTimesGetter interface {
LastScaleDownDeleteTime() time.Time
}

// provisioningRequestProcessingTimesGetter exposes recent provisioning request processing activity regardless of wether the
// ProvisioningRequest was marked as accepted or failed. This is because a ProvisioningRequest being processed indicates that
// there are other ProvisioningRequests that require processing regardless of the outcome of the current one. Thus, the next iteration
// should be started immediately.
type provisioningRequestProcessingTimesGetter interface {
LastProvisioningRequestProcessTime() time.Time
}

// LoopTrigger object implements criteria used to start new autoscaling iteration
type LoopTrigger struct {
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
initialized bool
podObserver *UnschedulablePodObserver
scanInterval time.Duration
scalingTimesGetter scalingTimesGetter
provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter
}

// NewLoopTrigger creates a LoopTrigger object
func NewLoopTrigger(podObserver *UnschedulablePodObserver, scalingTimesGetter scalingTimesGetter, scanInterval time.Duration) *LoopTrigger {
func NewLoopTrigger(scalingTimesGetter scalingTimesGetter, provisioningRequestProcessTimeGetter provisioningRequestProcessingTimesGetter, scanInterval time.Duration) *LoopTrigger {
return &LoopTrigger{
podObserver: podObserver,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
podObserver: nil,
scanInterval: scanInterval,
scalingTimesGetter: scalingTimesGetter,
provisioningRequestProcessTimeGetter: provisioningRequestProcessTimeGetter,
}
}

// Initialize initializes the LoopTrigger object by providing a pointer to the UnschedulablePodObserver
func (t *LoopTrigger) Initialize(podObserver *UnschedulablePodObserver) {
t.podObserver = podObserver
t.initialized = true
}

// Wait waits for the next autoscaling iteration
func (t *LoopTrigger) Wait(lastRun time.Time) {
func (t *LoopTrigger) Wait(lastRun time.Time) errors.AutoscalerError {
sleepStart := time.Now()
defer metrics.UpdateDurationFromStart(metrics.LoopWait, sleepStart)

// To improve scale-up throughput, Cluster Autoscaler starts new iteration
// immediately if the previous one was productive.
if !t.initialized {
return errors.ToAutoscalerError(errors.InternalError, fmt.Errorf("LoopTrigger not initialized"))
}

if !t.scalingTimesGetter.LastScaleUpTime().Before(lastRun) ||
!t.scalingTimesGetter.LastScaleDownDeleteTime().Before(lastRun) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
default:
klog.Infof("Autoscaler loop triggered immediately after a productive iteration")
}
return
t.triggerNextIteration("Autoscaler loop triggered immediately after a productive iteration")
return nil
}

if t.provisioningRequestWasProcessed(lastRun) {
t.triggerNextIteration("Autoscaler loop triggered immediately after a provisioning request was processed")
return nil
}

// Unschedulable pod triggers autoscaling immediately.
Expand All @@ -84,6 +107,7 @@ func (t *LoopTrigger) Wait(lastRun time.Time) {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
}
return nil
}

// UnschedulablePodObserver triggers a new loop if there are new unschedulable pods
Expand Down Expand Up @@ -118,6 +142,20 @@ func StartPodObserver(ctx context.Context, kubeClient kube_client.Interface) *Un
}
}

// triggerNextIteration logs a message if the next iteration was not triggered by unschedulable pods appearing, else it logs a message that the next iteration was triggered by unschedulable pods appearing
func (t *LoopTrigger) triggerNextIteration(message string) {
select {
case <-t.podObserver.unschedulablePodChan:
klog.Info("Autoscaler loop triggered by unschedulable pod appearing")
default:
klog.Infof(message)
}
}

func (t *LoopTrigger) provisioningRequestWasProcessed(lastRun time.Time) bool {
return t.provisioningRequestProcessTimeGetter != nil && !t.provisioningRequestProcessTimeGetter.LastProvisioningRequestProcessTime().Before(lastRun)
}

// isRecentUnschedulablePod checks if the object is an unschedulable pod observed recently.
func isRecentUnschedulablePod(obj any) bool {
pod, ok := obj.(*apiv1.Pod)
Expand Down
30 changes: 19 additions & 11 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func registerSignalHandlers(autoscaler core.Autoscaler) {
}()
}

func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, error) {
func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) (core.Autoscaler, *loop.LoopTrigger, error) {
// Create basic config from flags.
autoscalingOptions := createAutoscalingOptions()

Expand All @@ -490,7 +490,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter

predicateChecker, err := predicatechecker.NewSchedulerBasedPredicateChecker(informerFactory, autoscalingOptions.SchedulerConfig)
if err != nil {
return nil, err
return nil, nil, err
}
deleteOptions := options.NewNodeDeleteOptions(autoscalingOptions)
drainabilityRules := rules.Default(deleteOptions)
Expand All @@ -511,13 +511,14 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
opts.Processors.TemplateNodeInfoProvider = nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nodeInfoCacheExpireTime, *forceDaemonSets)
podListProcessor := podlistprocessor.NewDefaultPodListProcessor(opts.PredicateChecker, scheduling.ScheduleAnywhere)

var ProvisioningRequestInjector *provreq.ProvisioningRequestPodsInjector
if autoscalingOptions.ProvisioningRequestEnabled {
podListProcessor.AddProcessor(provreq.NewProvisioningRequestPodsFilter(provreq.NewDefautlEventManager()))

restConfig := kube_util.GetKubeConfig(autoscalingOptions.KubeClientOpts)
client, err := provreqclient.NewProvisioningRequestClient(restConfig)
if err != nil {
return nil, err
return nil, nil, err
}
provreqOrchestrator := provreqorchestrator.New(client, []provreqorchestrator.ProvisioningClass{
checkcapacity.New(client),
Expand All @@ -528,11 +529,11 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
opts.LoopStartNotifier = loopstart.NewObserversList([]loopstart.Observer{provreqProcesor})
injector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize)
ProvisioningRequestInjector, err := provreq.NewProvisioningRequestPodsInjector(restConfig, opts.ProvisioningRequestInitialBackoffTime, opts.ProvisioningRequestMaxBackoffTime, opts.ProvisioningRequestMaxBackoffCacheSize)
if err != nil {
return nil, err
return nil, nil, err
}
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(ProvisioningRequestInjector)
podListProcessor.AddProcessor(provreqProcesor)
}

Expand Down Expand Up @@ -597,21 +598,26 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
// Create autoscaler.
autoscaler, err := core.NewAutoscaler(opts, informerFactory)
if err != nil {
return nil, err
return nil, nil, err
}

// Start informers. This must come after fully constructing the autoscaler because
// additional informers might have been registered in the factory during NewAutoscaler.
stop := make(chan struct{})
informerFactory.Start(stop)

return autoscaler, nil
// A ProvisioningRequestPodsInjector is used as provisioningRequestProcessingTimesGetter here to obtain the last time a
// ProvisioningRequest was processed. This is because the ProvisioningRequestPodsInjector in addition to injecting pods
// also marks the ProvisioningRequest as accepted or failed.
trigger := loop.NewLoopTrigger(autoscaler, ProvisioningRequestInjector, *scanInterval)

return autoscaler, trigger, nil
}

func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) {
metrics.RegisterAll(*emitPerNodeGroupMetrics)

autoscaler, err := buildAutoscaler(debuggingSnapshotter)
autoscaler, trigger, err := buildAutoscaler(debuggingSnapshotter)
if err != nil {
klog.Fatalf("Failed to create autoscaler: %v", err)
}
Expand All @@ -632,10 +638,12 @@ func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapsho
defer cancel()
if *frequentLoopsEnabled {
podObserver := loop.StartPodObserver(context, kube_util.CreateKubeClient(createAutoscalingOptions().KubeClientOpts))
trigger := loop.NewLoopTrigger(podObserver, autoscaler, *scanInterval)
trigger.Initialize(podObserver)
lastRun := time.Now()
for {
trigger.Wait(lastRun)
if err := trigger.Wait(lastRun); err != nil {
klog.Fatalf("Failed to wait for next loop: %v", err)
}
lastRun = time.Now()
loop.RunAutoscalerOnce(autoscaler, healthCheck, lastRun)
}
Expand Down
26 changes: 17 additions & 9 deletions cluster-autoscaler/processors/provreq/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
provreqconditions "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreqpods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
Expand All @@ -38,11 +37,12 @@ import (

// ProvisioningRequestPodsInjector creates in-memory pods from ProvisioningRequest and inject them to unscheduled pods list.
type ProvisioningRequestPodsInjector struct {
initialRetryTime time.Duration
maxBackoffTime time.Duration
backoffDuration *lru.Cache
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
initialRetryTime time.Duration
maxBackoffTime time.Duration
backoffDuration *lru.Cache
clock clock.PassiveClock
client *provreqclient.ProvisioningRequestClient
lastProvisioningRequestProcessTime time.Time
}

// IsAvailableForProvisioning checks if the provisioning request is the correct state for processing and provisioning has not been attempted recently.
Expand Down Expand Up @@ -78,6 +78,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsAccepted(pr *provreqwrapper.Prov
klog.Errorf("failed add Accepted condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
return err
}
p.lastProvisioningRequestProcessTime = p.clock.Now()
return nil
}

Expand All @@ -87,6 +88,7 @@ func (p *ProvisioningRequestPodsInjector) MarkAsFailed(pr *provreqwrapper.Provis
if _, err := p.client.UpdateProvisioningRequest(pr.ProvisioningRequest); err != nil {
klog.Errorf("failed add Failed condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, err)
}
p.lastProvisioningRequestProcessTime = p.clock.Now()
}

// GetPodsFromNextRequest picks one ProvisioningRequest meeting the condition passed using isSupportedClass function, marks it as accepted and returns pods from it.
Expand All @@ -112,7 +114,7 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
continue
}

provreqpods, err := provreqpods.PodsForProvisioningRequest(pr)
podsFromProvReq, err := provreqpods.PodsForProvisioningRequest(pr)
if err != nil {
klog.Errorf("Failed to get pods for ProvisioningRequest %v", pr.Name)
p.MarkAsFailed(pr, provreqconditions.FailedToCreatePodsReason, err.Error())
Expand All @@ -121,7 +123,8 @@ func (p *ProvisioningRequestPodsInjector) GetPodsFromNextRequest(
if err := p.MarkAsAccepted(pr); err != nil {
continue
}
return provreqpods, nil

return podsFromProvReq, nil
}
return nil, nil
}
Expand Down Expand Up @@ -151,7 +154,7 @@ func (p *ProvisioningRequestPodsInjector) Process(
func (p *ProvisioningRequestPodsInjector) CleanUp() {}

// NewProvisioningRequestPodsInjector creates a ProvisioningRequest filter processor.
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (pods.PodListProcessor, error) {
func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffTime, maxBackoffTime time.Duration, maxCacheSize int) (*ProvisioningRequestPodsInjector, error) {
client, err := provreqclient.NewProvisioningRequestClient(kubeConfig)
if err != nil {
return nil, err
Expand All @@ -162,3 +165,8 @@ func NewProvisioningRequestPodsInjector(kubeConfig *rest.Config, initialBackoffT
func key(pr *provreqwrapper.ProvisioningRequest) string {
return string(pr.UID)
}

// LastProvisioningRequestProcessTime returns the time when the last provisioning request was processed.
func (p *ProvisioningRequestPodsInjector) LastProvisioningRequestProcessTime() time.Time {
return p.lastProvisioningRequestProcessTime
}
2 changes: 1 addition & 1 deletion cluster-autoscaler/processors/provreq/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestProvisioningRequestPodsInjector(t *testing.T) {
client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...)
backoffTime := lru.New(100)
backoffTime.Add(key(notProvisionedRecentlyProvReqB), 2*time.Minute)
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client}
injector := ProvisioningRequestPodsInjector{1 * time.Minute, 10 * time.Minute, backoffTime, clock.NewFakePassiveClock(now), client, now}
getUnscheduledPods, err := injector.Process(nil, provreqwrapper.BuildTestPods("ns", "pod", tc.existingUnsUnschedulablePodCount))
if err != nil {
t.Errorf("%s failed: injector.Process return error %v", tc.name, err)
Expand Down

0 comments on commit 7e2f1a7

Please sign in to comment.