Skip to content

Commit

Permalink
fix: emit pod nomination events and add provisioning node tainted tri…
Browse files Browse the repository at this point in the history
…gger controller (#933)
  • Loading branch information
njtran authored Feb 2, 2024
1 parent bd832a3 commit 49a70dc
Show file tree
Hide file tree
Showing 15 changed files with 183 additions and 105 deletions.
3 changes: 2 additions & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ func NewControllers(
return []controller.Controller{
p, evictionQueue, disruptionQueue,
disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster, disruptionQueue),
provisioning.NewController(kubeClient, p, recorder),
provisioning.NewPodController(kubeClient, p, recorder),
provisioning.NewNodeController(kubeClient, p, recorder),
nodepoolhash.NewController(kubeClient),
informer.NewDaemonSetController(kubeClient, cluster),
informer.NewNodeController(kubeClient, cluster),
Expand Down
34 changes: 17 additions & 17 deletions pkg/controllers/disruption/consolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,16 @@ func (c *consolidation) sortCandidates(candidates []*Candidate) []*Candidate {
// computeConsolidation computes a consolidation action to take
//
// nolint:gocyclo
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, error) {
func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...*Candidate) (Command, pscheduling.Results, error) {
var err error
// Run scheduling simulation to compute consolidation option
results, err := SimulateScheduling(ctx, c.kubeClient, c.cluster, c.provisioner, candidates...)
if err != nil {
// if a candidate node is now deleting, just retry
if errors.Is(err, errCandidateDeleting) {
return Command{}, nil
return Command{}, pscheduling.Results{}, nil
}
return Command{}, err
return Command{}, pscheduling.Results{}, err
}

// if not all of the pods were scheduled, we can't do anything
Expand All @@ -132,29 +133,29 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, results.NonPendingPodSchedulingErrors())...)
}
return Command{}, nil
return Command{}, pscheduling.Results{}, nil
}

// were we able to schedule all the pods on the inflight candidates?
if len(results.NewNodeClaims) == 0 {
return Command{
candidates: candidates,
}, nil
}, results, nil
}

// we're not going to turn a single node into multiple candidates
if len(results.NewNodeClaims) != 1 {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("Can't remove without creating %d candidates", len(results.NewNodeClaims)))...)
}
return Command{}, nil
return Command{}, pscheduling.Results{}, nil
}

// get the current node price based on the offering
// fallback if we can't find the specific zonal pricing data
candidatePrice, err := getCandidatePrices(candidates)
if err != nil {
return Command{}, fmt.Errorf("getting offering price from candidate node, %w", err)
return Command{}, pscheduling.Results{}, fmt.Errorf("getting offering price from candidate node, %w", err)
}

allExistingAreSpot := true
Expand All @@ -178,8 +179,7 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace with a cheaper node")...)
}
// no instance types remain after filtering by price
return Command{}, nil
return Command{}, pscheduling.Results{}, nil
}

// We are consolidating a node from OD -> [OD,Spot] but have filtered the instance types by cost based on the
Expand All @@ -194,23 +194,23 @@ func (c *consolidation) computeConsolidation(ctx context.Context, candidates ...
return Command{
candidates: candidates,
replacements: results.NewNodeClaims,
}, nil
}, results, nil
}

// Compute command to execute spot-to-spot consolidation if:
// 1. The SpotToSpotConsolidation feature flag is set to true.
// 2. For single-node consolidation:
// a. There are at least 15 cheapest instance type replacement options to consolidate.
// b. The current candidate is NOT part of the first 15 cheapest instance types inorder to avoid repeated consolidation.
func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, candidates []*Candidate, results *pscheduling.Results,
candidatePrice float64) (Command, error) {
func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, candidates []*Candidate, results pscheduling.Results,
candidatePrice float64) (Command, pscheduling.Results, error) {

// Spot consolidation is turned off.
if !options.FromContext(ctx).FeatureGates.SpotToSpotConsolidation {
if len(candidates) == 1 {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "SpotToSpotConsolidation is disabled, can't replace a spot node with a spot node")...)
}
return Command{}, nil
return Command{}, pscheduling.Results{}, nil
}

// Since we are sure that the replacement nodeclaim considered for the spot candidates are spot, we will enforce it through the requirements.
Expand All @@ -227,7 +227,7 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, "Can't replace spot node with a cheaper spot node")...)
}
// no instance types remain after filtering by price
return Command{}, nil
return Command{}, pscheduling.Results{}, nil
}

// For multi-node consolidation:
Expand All @@ -236,7 +236,7 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
return Command{
candidates: candidates,
replacements: results.NewNodeClaims,
}, nil
}, results, nil
}

// For single-node consolidation:
Expand All @@ -246,7 +246,7 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
if len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions) < MinInstanceTypesForSpotToSpotConsolidation {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("SpotToSpotConsolidation requires %d cheaper instance type options than the current candidate to consolidate, got %d",
MinInstanceTypesForSpotToSpotConsolidation, len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions)))...)
return Command{}, nil
return Command{}, pscheduling.Results{}, nil
}

// Restrict the InstanceTypeOptions for launch to 15 so we don't get into a continual consolidation situation.
Expand All @@ -261,7 +261,7 @@ func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, cand
return Command{
candidates: candidates,
replacements: results.NewNodeClaims,
}, nil
}, results, nil
}

// getCandidatePrices returns the sum of the prices of the given candidates
Expand Down
31 changes: 19 additions & 12 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/cloudprovider/fake"
"sigs.k8s.io/karpenter/pkg/controllers/disruption"
pscheduling "sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/operator/options"
"sigs.k8s.io/karpenter/pkg/scheduling"
Expand Down Expand Up @@ -544,9 +545,10 @@ var _ = Describe("Consolidation", func() {

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
cmd, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, results, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(cmd)
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))
wg.Wait()

Expect(emptyConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -607,9 +609,10 @@ var _ = Describe("Consolidation", func() {

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
cmd, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, results, err := emptyConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(cmd)
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))
wg.Wait()

Expect(emptyConsolidation.IsConsolidated()).To(BeFalse())
Expand All @@ -633,9 +636,10 @@ var _ = Describe("Consolidation", func() {

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
cmd, err := multiConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, results, err := multiConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(cmd)
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))
wg.Wait()

Expect(multiConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -696,9 +700,10 @@ var _ = Describe("Consolidation", func() {

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
cmd, err := multiConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, results, err := multiConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(cmd)
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))
wg.Wait()

Expect(multiConsolidation.IsConsolidated()).To(BeFalse())
Expand All @@ -722,9 +727,10 @@ var _ = Describe("Consolidation", func() {

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
cmd, err := singleConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, results, err := singleConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(cmd)
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))
wg.Wait()

Expect(singleConsolidation.IsConsolidated()).To(BeFalse())
Expand Down Expand Up @@ -785,9 +791,10 @@ var _ = Describe("Consolidation", func() {

var wg sync.WaitGroup
ExpectTriggerVerifyAction(&wg)
cmd, err := singleConsolidation.ComputeCommand(ctx, budgets, candidates...)
cmd, results, err := singleConsolidation.ComputeCommand(ctx, budgets, candidates...)
Expect(err).To(Succeed())
Expect(cmd)
Expect(results).To(Equal(pscheduling.Results{}))
Expect(cmd).To(Equal(disruption.Command{}))
wg.Wait()

Expect(singleConsolidation.IsConsolidated()).To(BeFalse())
Expand Down
25 changes: 22 additions & 3 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
Expand Down Expand Up @@ -159,7 +160,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
}

// Determine the disruption action
cmd, err := disruption.ComputeCommand(ctx, disruptionBudgetMapping, candidates...)
cmd, schedulingResults, err := disruption.ComputeCommand(ctx, disruptionBudgetMapping, candidates...)
if err != nil {
return false, fmt.Errorf("computing disruption decision, %w", err)
}
Expand All @@ -168,7 +169,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
}

// Attempt to disrupt
if err := c.executeCommand(ctx, disruption, cmd); err != nil {
if err := c.executeCommand(ctx, disruption, cmd, schedulingResults); err != nil {
return false, fmt.Errorf("disrupting candidates, %w", err)
}

Expand All @@ -179,7 +180,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
// 1. Taint candidate nodes
// 2. Spin up replacement nodes
// 3. Add Command to orchestration.Queue to wait to delete the candiates.
func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command) error {
func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results) error {
disruptionActionsPerformedCounter.With(map[string]string{
actionLabel: string(cmd.Action()),
methodLabel: m.Type(),
Expand All @@ -206,6 +207,24 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command)
}
}

// Nominate each node for scheduling and emit pod nomination events
// We emit all nominations before we exit the disruption loop as
// we want to ensure that nodes that are nominated are respected in the subsequent
// disruption reconciliation. This is essential in correctly modeling multiple
// disruption commands in parallel.
// This will only nominate nodes for 2 * batchingWindow. Once the candidates are
// tainted with the Karpenter taint, the provisioning controller will continue
// to do scheduling simulations and nominate the pods on the candidate nodes until
// the node is cleaned up.
for _, node := range schedulingResults.ExistingNodes {
if len(node.Pods) > 0 {
c.cluster.NominateNodeForPod(ctx, node.ProviderID())
}
for _, pod := range node.Pods {
c.recorder.Publish(scheduling.NominatePodEvent(pod, node.Node, node.NodeClaim))
}
}

providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)
Expand Down
17 changes: 7 additions & 10 deletions pkg/controllers/disruption/drift.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (d *Drift) ShouldDisrupt(ctx context.Context, c *Candidate) bool {
}

// ComputeCommand generates a disruption command given candidates
func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, error) {
func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
sort.Slice(candidates, func(i int, j int) bool {
return candidates[i].NodeClaim.StatusConditions().GetCondition(v1beta1.Drifted).LastTransitionTime.Inner.Time.Before(
candidates[j].NodeClaim.StatusConditions().GetCondition(v1beta1.Drifted).LastTransitionTime.Inner.Time)
Expand Down Expand Up @@ -85,7 +86,7 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
if len(empty) > 0 {
return Command{
candidates: empty,
}, nil
}, scheduling.Results{}, nil
}

for _, candidate := range candidates {
Expand All @@ -102,24 +103,20 @@ func (d *Drift) ComputeCommand(ctx context.Context, disruptionBudgetMapping map[
if errors.Is(err, errCandidateDeleting) {
continue
}
return Command{}, err
return Command{}, scheduling.Results{}, err
}
// Emit an event that we couldn't reschedule the pods on the node.
if !results.AllNonPendingPodsScheduled() {
d.recorder.Publish(disruptionevents.Blocked(candidate.Node, candidate.NodeClaim, "Scheduling simulation failed to schedule all pods")...)
continue
}
if len(results.NewNodeClaims) == 0 {
return Command{
candidates: []*Candidate{candidate},
}, nil
}

return Command{
candidates: []*Candidate{candidate},
replacements: results.NewNodeClaims,
}, nil
}, results, nil
}
return Command{}, nil
return Command{}, scheduling.Results{}, nil
}

func (d *Drift) Type() string {
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/disruption/emptiness.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/utils/clock"

disruptionevents "sigs.k8s.io/karpenter/pkg/controllers/disruption/events"
"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
"sigs.k8s.io/karpenter/pkg/events"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
Expand Down Expand Up @@ -61,7 +62,7 @@ func (e *Emptiness) ShouldDisrupt(_ context.Context, c *Candidate) bool {
}

// ComputeCommand generates a disruption command given candidates
func (e *Emptiness) ComputeCommand(_ context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, error) {
func (e *Emptiness) ComputeCommand(_ context.Context, disruptionBudgetMapping map[string]int, candidates ...*Candidate) (Command, scheduling.Results, error) {
// First check how many nodes are empty so that we can emit a metric on how many nodes are eligible
emptyCandidates := lo.Filter(candidates, func(cn *Candidate, _ int) bool {
return cn.NodeClaim.DeletionTimestamp.IsZero() && len(cn.reschedulablePods) == 0
Expand All @@ -86,7 +87,7 @@ func (e *Emptiness) ComputeCommand(_ context.Context, disruptionBudgetMapping ma
}
return Command{
candidates: empty,
}, nil
}, scheduling.Results{}, nil
}

func (e *Emptiness) Type() string {
Expand Down
Loading

0 comments on commit 49a70dc

Please sign in to comment.