Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: address disruption taint race condition #1180

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3788,6 +3788,8 @@ var _ = Describe("Consolidation", func() {
Eventually(finished.Load, 10*time.Second).Should(BeTrue())
wg.Wait()

ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})

// nothing should be removed since the node is no longer empty
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
Expand Down
34 changes: 14 additions & 20 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package disruption
import (
"context"
"errors"
"fmt"

"github.com/samber/lo"
"knative.dev/pkg/logging"

"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
Expand Down Expand Up @@ -88,31 +88,25 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return Command{}, scheduling.Results{}, errors.New("interrupted")
case <-c.clock.After(consolidationTTL):
}
validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, c.ShouldDisrupt, c.queue)
if err != nil {
logging.FromContext(ctx).Errorf("computing validation candidates %s", err)
return Command{}, scheduling.Results{}, err
}
// Get the current representation of the proposed candidates from before the validation timeout
// We do this so that we can re-validate that the candidates that were computed before we made the decision are the same
candidatesToDelete := mapCandidates(cmd.candidates, validationCandidates)

postValidationMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder)
v := NewValidation(c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder, c.queue)
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
return Command{}, scheduling.Results{}, fmt.Errorf("building disruption budgets, %w", err)
}

// The deletion of empty NodeClaims is easy to validate, we just ensure that:
// 1. All the candidatesToDelete are still empty
// 2. The node isn't a target of a recent scheduling simulation
// 3. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget
for _, n := range candidatesToDelete {
if len(n.reschedulablePods) != 0 || c.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
postValidationMapping[n.nodePool.Name]--
return Command{}, scheduling.Results{}, err
}

// TODO (jmdeal@): better encapsulate within validation
if lo.ContainsBy(validatedCandidates, func(c *Candidate) bool {
return len(c.reschedulablePods) != 0
}) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}

return cmd, scheduling.Results{}, nil
}

Expand Down
15 changes: 6 additions & 9 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,12 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return cmd, scheduling.Results{}, nil
}

v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue)
isValid, err := v.IsValid(ctx, cmd)
if err != nil {
return Command{}, scheduling.Results{}, fmt.Errorf("validating, %w", err)
}

if !isValid {
logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue).IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
}
return cmd, results, nil
}
Expand Down
13 changes: 6 additions & 7 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
consolidationTypeLabel: s.ConsolidationType(),
}).Set(float64(len(candidates)))

v := NewValidation(consolidationTTL, s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue)
v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue)

// Set a timeout
timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration)
Expand Down Expand Up @@ -78,14 +78,13 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if cmd.Action() == NoOpAction {
continue
}
isValid, err := v.IsValid(ctx, cmd)
if err != nil {
if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
}
if !isValid {
logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return cmd, results, nil
}
if !constrainedByBudgets {
Expand Down
146 changes: 89 additions & 57 deletions pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,82 +35,114 @@ import (
"sigs.k8s.io/karpenter/pkg/events"
)

type ValidationError struct {
error
}

func NewValidationError(err error) *ValidationError {
return &ValidationError{error: err}
}

func IsValidationError(err error) bool {
if err == nil {
return false
}
var validationError *ValidationError
return errors.As(err, &validationError)
}

// Validation is used to perform validation on a consolidation command. It makes an assumption that when re-used, all
// of the commands passed to IsValid were constructed based off of the same consolidation state. This allows it to
// skip the validation TTL for all but the first command.
type Validation struct {
validationPeriod time.Duration
start time.Time
clock clock.Clock
cluster *state.Cluster
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
provisioner *provisioning.Provisioner
once sync.Once
recorder events.Recorder
queue *orchestration.Queue
start time.Time
clock clock.Clock
cluster *state.Cluster
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
provisioner *provisioning.Provisioner
once sync.Once
recorder events.Recorder
queue *orchestration.Queue
}

func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner,
func NewValidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner,
cp cloudprovider.CloudProvider, recorder events.Recorder, queue *orchestration.Queue) *Validation {
return &Validation{
validationPeriod: validationPeriod,
clock: clk,
cluster: cluster,
kubeClient: kubeClient,
provisioner: provisioner,
cloudProvider: cp,
recorder: recorder,
queue: queue,
clock: clk,
cluster: cluster,
kubeClient: kubeClient,
provisioner: provisioner,
cloudProvider: cp,
recorder: recorder,
queue: queue,
}
}

//nolint:gocyclo
func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
func (v *Validation) IsValid(ctx context.Context, cmd Command, validationPeriod time.Duration) error {
var err error
v.once.Do(func() {
v.start = v.clock.Now()
})

waitDuration := v.validationPeriod - v.clock.Since(v.start)
waitDuration := validationPeriod - v.clock.Since(v.start)
if waitDuration > 0 {
select {
case <-ctx.Done():
return false, errors.New("context canceled")
return errors.New("context canceled")
case <-v.clock.After(waitDuration):
}
}
// Get the current representation of the proposed candidates from before the validation timeout
// We do this so that we can re-validate that the candidates that were computed before we made the decision are the same
// We perform filtering here to ensure that none of the proposed candidates have blocking PDBs or do-not-evict/do-not-disrupt pods scheduled to them
validationCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue)
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
return false, fmt.Errorf("constructing validation candidates, %w", err)
return err
}
validationCandidates = mapCandidates(cmd.candidates, validationCandidates)
// If we filtered out any candidates, return false as some NodeClaims in the consolidation decision have changed.
if len(validationCandidates) != len(cmd.candidates) {
return false, nil
if err := v.ValidateCommand(ctx, cmd, validatedCandidates); err != nil {
return err
}
// Rebuild the disruption budget mapping to see if any budgets have changed since validation.
postValidationMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder)
// Revalidate candidates after validating the command. This mitigates the chance of a race condition outlined in
// the following GitHub issue: https://github.com/kubernetes-sigs/karpenter/issues/1167.
if _, err = v.ValidateCandidates(ctx, validatedCandidates...); err != nil {
return err
}
return nil
}

// ValidateCandidates gets the current representation of the provided candidates and ensures that they are all still valid.
// For a candidate to still be valid, the following conditions must be met:
//
// a. It must pass the global candidate filtering logic (no blocking PDBs, no do-not-disrupt annotation, etc)
// b. It must not have any pods nominated for it
// c. It must still be disruptible without violating node disruption budgets
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
//
// If these conditions are met for all candidates, ValidateCandidates returns a slice with the updated representations.
func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Candidate) ([]*Candidate, error) {
validatedCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue)
if err != nil {
return false, fmt.Errorf("building disruption budgets, %w", err)
}
// 1. a candidate we are about to delete is a target of a currently pending pod, wait for that to settle
// before continuing consolidation
// 2. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget
for _, n := range validationCandidates {
if v.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 {
return false, nil
}
postValidationMapping[n.nodePool.Name]--
return nil, fmt.Errorf("constructing validation candidates, %w", err)
}
isValid, err := v.ValidateCommand(ctx, cmd, validationCandidates)
validatedCandidates = mapCandidates(candidates, validatedCandidates)
// If we filtered out any candidates, return nil as some NodeClaims in the consolidation decision have changed.
if len(validatedCandidates) != len(candidates) {
return nil, NewValidationError(fmt.Errorf("%d candidates are no longer valid", len(candidates)-len(validatedCandidates)))
}
disruptionBudgetMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder)
if err != nil {
return false, fmt.Errorf("validating command, %w", err)
return nil, fmt.Errorf("building disruption budgets, %w", err)
}
// Return nil if any candidate meets either of the following conditions:
// a. A pod was nominated to the candidate
// b. Disrupting the candidate would violate node disruption budgets
for _, vc := range validatedCandidates {
if v.cluster.IsNodeNominated(vc.ProviderID()) {
return nil, NewValidationError(fmt.Errorf("a candidate was nominated during validation"))
jmdeal marked this conversation as resolved.
Show resolved Hide resolved
}
if disruptionBudgetMapping[vc.nodePool.Name] == 0 {
return nil, NewValidationError(fmt.Errorf("a candidate can no longer be disrupted without violating budgets"))
}
disruptionBudgetMapping[vc.nodePool.Name]--
}
return isValid, nil
return validatedCandidates, nil
}

// ShouldDisrupt is a predicate used to filter candidates
Expand All @@ -123,17 +155,17 @@ func (v *Validation) ShouldDisrupt(_ context.Context, c *Candidate) bool {
}

// ValidateCommand validates a command for a Method
func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidates []*Candidate) (bool, error) {
func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidates []*Candidate) error {
// None of the chosen candidate are valid for execution, so retry
if len(candidates) == 0 {
return false, nil
return NewValidationError(fmt.Errorf("no candidates"))
}
results, err := SimulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, candidates...)
if err != nil {
return false, fmt.Errorf("simluating scheduling, %w", err)
return fmt.Errorf("simluating scheduling, %w", err)
}
if !results.AllNonPendingPodsScheduled() {
return false, nil
return NewValidationError(fmt.Errorf("all pending pods could not be scheduled"))
}

// We want to ensure that the re-simulated scheduling using the current cluster state produces the same result.
Expand All @@ -145,22 +177,22 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
if len(results.NewNodeClaims) == 0 {
if len(cmd.replacements) == 0 {
// scheduling produced zero new NodeClaims and we weren't expecting any, so this is valid.
return true, nil
return nil
}
// if it produced no new NodeClaims, but we were expecting one we should re-simulate as there is likely a better
// consolidation option now
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// we need more than one replacement node which is never valid currently (all of our node replacement is m->1, never m->n)
if len(results.NewNodeClaims) > 1 {
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// we now know that scheduling simulation wants to create one new node
if len(cmd.replacements) == 0 {
// but we weren't expecting any new NodeClaims, so this is invalid
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// We know that the scheduling simulation wants to create a new node and that the command we are verifying wants
Expand All @@ -175,11 +207,11 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
// now says that we need to launch a 4xlarge. It's still launching the correct number of NodeClaims, but it's just
// as expensive or possibly more so we shouldn't validate.
if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, results.NewNodeClaims[0].InstanceTypeOptions) {
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// Now we know:
// - current scheduling simulation says to create a new node with types T = {T_0, T_1, ..., T_n}
// - our lifecycle command says to create a node with types {U_0, U_1, ..., U_n} where U is a subset of T
return true, nil
return nil
}
Loading