Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cluster-autoscaler-release-1.30] BookCapacity for ProvisioningRequest pods #7057

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,6 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

// finally, filter out pods that are too "young" to safely be considered for a scale-up (delay is configurable)
unschedulablePodsToHelp = a.filterOutYoungPods(unschedulablePodsToHelp, currentTime)

preScaleUp := func() time.Time {
scaleUpStart := time.Now()
metrics.UpdateLastTime(metrics.ScaleUp, scaleUpStart)
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
scaleUpOrchestrator := provreqorchestrator.NewWrapperOrchestrator(provreqOrchestrator)

opts.ScaleUpOrchestrator = scaleUpOrchestrator
provreqProcesor := provreq.NewProvReqProcessor(client)
provreqProcesor := provreq.NewProvReqProcessor(client, opts.PredicateChecker)
if err != nil {
return nil, err
}
Expand All @@ -518,6 +518,7 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
return nil, err
}
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(provreqProcesor)
}
opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@ limitations under the License.
package provreq

import (
"fmt"
"time"

apiv1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)

const (
Expand All @@ -36,15 +44,20 @@ const (
defaultMaxUpdated = 20
)

type injector interface {
TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error)
}

type provReqProcessor struct {
now func() time.Time
maxUpdated int
client *provreqclient.ProvisioningRequestClient
injector injector
}

// NewProvReqProcessor return ProvisioningRequestProcessor.
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client}
func NewProvReqProcessor(client *provreqclient.ProvisioningRequestClient, predicateChecker predicatechecker.PredicateChecker) *provReqProcessor {
return &provReqProcessor{now: time.Now, maxUpdated: defaultMaxUpdated, client: client, injector: scheduling.NewHintingSimulator(predicateChecker)}
}

// Refresh implements loop.Observer interface and will be run at the start
Expand All @@ -56,15 +69,14 @@ func (p *provReqProcessor) Refresh() {
klog.Errorf("Failed to get ProvisioningRequests list, err: %v", err)
return
}

p.Process(provReqs)
p.refresh(provReqs)
}

// Process iterates over ProvisioningRequests and apply:
// refresh iterates over ProvisioningRequests and apply:
// -BookingExpired condition for Provisioned ProvisioningRequest if capacity reservation time is expired.
// -Failed condition for ProvisioningRequest that were not provisioned during defaultExpirationTime.
// TODO(yaroslava): fetch reservation and expiration time from ProvisioningRequest
func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningRequest) {
func (p *provReqProcessor) refresh(provReqs []*provreqwrapper.ProvisioningRequest) {
expiredProvReq := []*provreqwrapper.ProvisioningRequest{}
failedProvReq := []*provreqwrapper.ProvisioningRequest{}
for _, provReq := range provReqs {
Expand Down Expand Up @@ -108,5 +120,50 @@ func (p *provReqProcessor) Process(provReqs []*provreqwrapper.ProvisioningReques
}
}

// Cleanup cleans up internal state.
// CleanUp cleans up internal state
func (p *provReqProcessor) CleanUp() {}

// Process implements PodListProcessor.Process() and inject fake pods to the cluster snapshoot for Provisioned ProvReqs in order to
// reserve capacity from ScaleDown.
func (p *provReqProcessor) Process(context *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {
err := p.bookCapacity(context)
if err != nil {
klog.Warningf("Failed to book capacity for ProvisioningRequests: %s", err)
}
return unschedulablePods, nil
}

// bookCapacity schedule fake pods for ProvisioningRequest that should have reserved capacity
// in the cluster.
func (p *provReqProcessor) bookCapacity(ctx *context.AutoscalingContext) error {
provReqs, err := p.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if !conditions.ShouldCapacityBeBooked(provReq) {
continue
}
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := p.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
}
if len(podsToCreate) == 0 {
return nil
}
// Scheduling the pods to reserve capacity for provisioning request.
if _, _, err = p.injector.TrySchedulePods(ctx.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
return err
}
return nil
}
88 changes: 85 additions & 3 deletions cluster-autoscaler/processors/provreq/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,26 @@ limitations under the License.
package provreq

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/scheduler/framework"

"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/config"
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
)

func TestProcess(t *testing.T) {
func TestRefresh(t *testing.T) {
now := time.Now()
dayAgo := now.Add(-1 * 24 * time.Hour)
weekAgo := now.Add(-1 * defaultExpirationTime).Add(-1 * 5 * time.Minute)
Expand Down Expand Up @@ -146,8 +153,8 @@ func TestProcess(t *testing.T) {
additionalPr := provreqclient.ProvisioningRequestWrapperForTesting("namespace", "additional")
additionalPr.CreationTimestamp = metav1.NewTime(weekAgo)
additionalPr.Spec.ProvisioningClassName = v1beta1.ProvisioningClassCheckCapacity
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr)}
processor.Process([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
processor := provReqProcessor{func() time.Time { return now }, 1, provreqclient.NewFakeProvisioningRequestClient(nil, t, pr, additionalPr), nil}
processor.refresh([]*provreqwrapper.ProvisioningRequest{pr, additionalPr})
assert.ElementsMatch(t, test.wantConditions, pr.Status.Conditions)
if len(test.conditions) == len(test.wantConditions) {
assert.ElementsMatch(t, []metav1.Condition{
Expand All @@ -164,3 +171,78 @@ func TestProcess(t *testing.T) {
}
}
}

type fakeInjector struct {
pods []*apiv1.Pod
}

func (f *fakeInjector) TrySchedulePods(clusterSnapshot clustersnapshot.ClusterSnapshot, pods []*apiv1.Pod, isNodeAcceptable func(*framework.NodeInfo) bool, breakOnFailure bool) ([]scheduling.Status, int, error) {
f.pods = pods
return nil, 0, nil
}

func TestBookCapacity(t *testing.T) {
testCases := []struct {
name string
conditions []string
provReq *provreqwrapper.ProvisioningRequest
capacityIsBooked bool
}{
{
name: "ProvReq is new, check-capacity class",
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity),
capacityIsBooked: false,
},
{
name: "ProvReq is Failed, best-effort-atomic class",
conditions: []string{v1beta1.Failed},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: false,
},
{
name: "ProvReq is Provisioned, unknown class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), "unknown"),
capacityIsBooked: false,
},
{
name: "ProvReq is Provisioned, capacity should be booked, check-capacity class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassCheckCapacity),
capacityIsBooked: true,
},
{
name: "ProvReq is Provisioned, capacity should be booked, best-effort-atomic class",
conditions: []string{v1beta1.Provisioned},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: true,
},
{
name: "ProvReq has BookingExpired, capacity should not be booked, best-effort-atomic class",
conditions: []string{v1beta1.Provisioned, v1beta1.BookingExpired},
provReq: provreqwrapper.BuildTestProvisioningRequest("ns", "pr", "2", "100m", "", 10, false, time.Now(), v1beta1.ProvisioningClassBestEffortAtomicScaleUp),
capacityIsBooked: false,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
test := test
injector := &fakeInjector{pods: []*apiv1.Pod{}}
for _, condition := range test.conditions {
conditions.AddOrUpdateCondition(test.provReq, condition, metav1.ConditionTrue, "", "", metav1.Now())
}

processor := &provReqProcessor{
now: func() time.Time { return time.Now() },
client: provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, test.provReq),
maxUpdated: 20,
injector: injector,
}
ctx, _ := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, nil, nil, nil, nil, nil)
processor.bookCapacity(&ctx)
if (test.capacityIsBooked && len(injector.pods) == 0) || (!test.capacityIsBooked && len(injector.pods) > 0) {
t.Fail()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,14 @@ import (

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions"
provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods"
"k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient"
"k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling"
ca_errors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
"k8s.io/klog/v2"

ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
Expand Down Expand Up @@ -96,7 +91,6 @@ func (o *provReqOrchestrator) ScaleUp(

o.context.ClusterSnapshot.Fork()
defer o.context.ClusterSnapshot.Revert()
o.bookCapacity()

// unschedulablePods pods should belong to one ProvisioningClass, so only one provClass should try to ScaleUp.
for _, provClass := range o.provisioningClasses {
Expand All @@ -115,35 +109,3 @@ func (o *provReqOrchestrator) ScaleUpToNodeGroupMinSize(
) (*status.ScaleUpStatus, ca_errors.AutoscalerError) {
return nil, nil
}

func (o *provReqOrchestrator) bookCapacity() error {
provReqs, err := o.client.ProvisioningRequests()
if err != nil {
return fmt.Errorf("couldn't fetch ProvisioningRequests in the cluster: %v", err)
}
podsToCreate := []*apiv1.Pod{}
for _, provReq := range provReqs {
if conditions.ShouldCapacityBeBooked(provReq) {
pods, err := provreq_pods.PodsForProvisioningRequest(provReq)
if err != nil {
// ClusterAutoscaler was able to create pods before, so we shouldn't have error here.
// If there is an error, mark PR as invalid, because we won't be able to book capacity
// for it anyway.
conditions.AddOrUpdateCondition(provReq, v1beta1.Failed, metav1.ConditionTrue, conditions.FailedToBookCapacityReason, fmt.Sprintf("Couldn't create pods, err: %v", err), metav1.Now())
if _, err := o.client.UpdateProvisioningRequest(provReq.ProvisioningRequest); err != nil {
klog.Errorf("failed to add Accepted condition to ProvReq %s/%s, err: %v", provReq.Namespace, provReq.Name, err)
}
continue
}
podsToCreate = append(podsToCreate, pods...)
}
}
if len(podsToCreate) == 0 {
return nil
}
// scheduling the pods to reserve capacity for provisioning request with BookCapacity condition
if _, _, err = o.injector.TrySchedulePods(o.context.ClusterSnapshot, podsToCreate, scheduling.ScheduleAnywhere, false); err != nil {
klog.Warningf("Error during capacity booking: %v", err)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,10 @@ func TestScaleUp(t *testing.T) {
scaleUpResult: status.ScaleUpNotNeeded,
},
{
name: "capacity in the cluster is booked",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, bookedCapacityProvReq},
name: "capacity is there, check-capacity class",
provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq},
provReqToScaleUp: newCheckCapacityMemProvReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
scaleUpResult: status.ScaleUpSuccessful,
},
{
name: "unsupported ProvisioningRequest is ignored",
Expand All @@ -211,12 +211,6 @@ func TestScaleUp(t *testing.T) {
provReqToScaleUp: atomicScaleUpProvReq,
scaleUpResult: status.ScaleUpNotNeeded,
},
{
name: "some capacity is pre-booked, large atomic scale-up request doesn't fit",
provReqs: []*provreqwrapper.ProvisioningRequest{bookedCapacityProvReq, largeAtomicScaleUpProvReq},
provReqToScaleUp: largeAtomicScaleUpProvReq,
scaleUpResult: status.ScaleUpNoOptionsAvailable,
},
{
name: "capacity is there, large atomic scale-up request doesn't require scale-up",
provReqs: []*provreqwrapper.ProvisioningRequest{largeAtomicScaleUpProvReq},
Expand Down
Loading