Skip to content

Commit

Permalink
race fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Apr 23, 2024
1 parent 780cd73 commit d348441
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 56 deletions.
4 changes: 3 additions & 1 deletion pkg/controllers/disruption/consolidation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3750,7 +3750,7 @@ var _ = Describe("Consolidation", func() {
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
ExpectExists(ctx, env.Client, nodes[0])
})
It("should not consolidate if the action becomes invalid during the node TTL wait", func() {
FIt("should not consolidate if the action becomes invalid during the node TTL wait", func() {
pod := test.Pod(test.PodOptions{ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
v1beta1.DoNotDisruptAnnotationKey: "true",
Expand Down Expand Up @@ -3788,6 +3788,8 @@ var _ = Describe("Consolidation", func() {
Eventually(finished.Load, 10*time.Second).Should(BeTrue())
wg.Wait()

ExpectReconcileSucceeded(ctx, queue, types.NamespacedName{})

// nothing should be removed since the node is no longer empty
Expect(ExpectNodeClaims(ctx, env.Client)).To(HaveLen(1))
Expect(ExpectNodes(ctx, env.Client)).To(HaveLen(1))
Expand Down
29 changes: 9 additions & 20 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"

Check failure on line 22 in pkg/controllers/disruption/emptynodeconsolidation.go

View workflow job for this annotation

GitHub Actions / presubmit (1.23.x)

"fmt" imported and not used

Check failure on line 22 in pkg/controllers/disruption/emptynodeconsolidation.go

View workflow job for this annotation

GitHub Actions / presubmit (1.24.x)

"fmt" imported and not used

Check failure on line 22 in pkg/controllers/disruption/emptynodeconsolidation.go

View workflow job for this annotation

GitHub Actions / Analyze Go

"fmt" imported and not used

Check failure on line 22 in pkg/controllers/disruption/emptynodeconsolidation.go

View workflow job for this annotation

GitHub Actions / presubmit (1.25.x)

"fmt" imported and not used

Check failure on line 22 in pkg/controllers/disruption/emptynodeconsolidation.go

View workflow job for this annotation

GitHub Actions / presubmit (1.26.x)

"fmt" imported and not used

Check failure on line 22 in pkg/controllers/disruption/emptynodeconsolidation.go

View workflow job for this annotation

GitHub Actions / presubmit (1.27.x)

"fmt" imported and not used

Check failure on line 22 in pkg/controllers/disruption/emptynodeconsolidation.go

View workflow job for this annotation

GitHub Actions / presubmit (1.28.x)

"fmt" imported and not used

Check failure on line 22 in pkg/controllers/disruption/emptynodeconsolidation.go

View workflow job for this annotation

GitHub Actions / presubmit (1.29.x)

"fmt" imported and not used

"github.com/samber/lo"
"knative.dev/pkg/logging"

"sigs.k8s.io/karpenter/pkg/controllers/provisioning/scheduling"
Expand Down Expand Up @@ -88,31 +89,19 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return Command{}, scheduling.Results{}, errors.New("interrupted")
case <-c.clock.After(consolidationTTL):
}
validationCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, c.ShouldDisrupt, c.queue)

v := NewValidation(c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder, c.queue)
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
logging.FromContext(ctx).Errorf("computing validation candidates %s", err)
return Command{}, scheduling.Results{}, err
}
// Get the current representation of the proposed candidates from before the validation timeout
// We do this so that we can re-validate that the candidates that were computed before we made the decision are the same
candidatesToDelete := mapCandidates(cmd.candidates, validationCandidates)

postValidationMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder)
if err != nil {
return Command{}, scheduling.Results{}, fmt.Errorf("building disruption budgets, %w", err)
if len(validatedCandidates) == 0 || lo.ContainsBy(validatedCandidates, func(c *Candidate) bool {
return len(c.reschedulablePods) != 0
}) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}

// The deletion of empty NodeClaims is easy to validate, we just ensure that:
// 1. All the candidatesToDelete are still empty
// 2. The node isn't a target of a recent scheduling simulation
// 3. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget
for _, n := range candidatesToDelete {
if len(n.reschedulablePods) != 0 || c.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
postValidationMapping[n.nodePool.Name]--
}
return cmd, scheduling.Results{}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return cmd, scheduling.Results{}, nil
}

v := NewValidation(consolidationTTL, m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue)
isValid, err := v.IsValid(ctx, cmd)
v := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue)
isValid, err := v.IsValid(ctx, cmd, MultiNodeConsolidationTimeoutDuration)
if err != nil {
return Command{}, scheduling.Results{}, fmt.Errorf("validating, %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
consolidationTypeLabel: s.ConsolidationType(),
}).Set(float64(len(candidates)))

v := NewValidation(consolidationTTL, s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue)
v := NewValidation(s.clock, s.cluster, s.kubeClient, s.provisioner, s.cloudProvider, s.recorder, s.queue)

// Set a timeout
timeout := s.clock.Now().Add(SingleNodeConsolidationTimeoutDuration)
Expand Down Expand Up @@ -78,7 +78,7 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if cmd.Action() == NoOpAction {
continue
}
isValid, err := v.IsValid(ctx, cmd)
isValid, err := v.IsValid(ctx, cmd, SingleNodeConsolidationTimeoutDuration)
if err != nil {
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
}
Expand Down
74 changes: 44 additions & 30 deletions pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
// of the commands passed to IsValid were constructed based off of the same consolidation state. This allows it to
// skip the validation TTL for all but the first command.
type Validation struct {
validationPeriod time.Duration
start time.Time
clock clock.Clock
cluster *state.Cluster
Expand All @@ -51,10 +50,9 @@ type Validation struct {
queue *orchestration.Queue
}

func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner,
func NewValidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Client, provisioner *provisioning.Provisioner,
cp cloudprovider.CloudProvider, recorder events.Recorder, queue *orchestration.Queue) *Validation {
return &Validation{
validationPeriod: validationPeriod,
clock: clk,
cluster: cluster,
kubeClient: kubeClient,
Expand All @@ -65,52 +63,68 @@ func NewValidation(validationPeriod time.Duration, clk clock.Clock, cluster *sta
}
}

//nolint:gocyclo
func (v *Validation) IsValid(ctx context.Context, cmd Command) (bool, error) {
func (v *Validation) IsValid(ctx context.Context, cmd Command, validationPeriod time.Duration) (bool, error) {
var err error
v.once.Do(func() {
v.start = v.clock.Now()
})

waitDuration := v.validationPeriod - v.clock.Since(v.start)
waitDuration := validationPeriod - v.clock.Since(v.start)
if waitDuration > 0 {
select {
case <-ctx.Done():
return false, errors.New("context canceled")
case <-v.clock.After(waitDuration):
}
}
// Get the current representation of the proposed candidates from before the validation timeout
// We do this so that we can re-validate that the candidates that were computed before we made the decision are the same
// We perform filtering here to ensure that none of the proposed candidates have blocking PDBs or do-not-evict/do-not-disrupt pods scheduled to them
validationCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue)
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if len(validatedCandidates) == 0 || err != nil {
return false, err
}
isValid, err := v.ValidateCommand(ctx, cmd, validatedCandidates)
if err != nil {
return false, fmt.Errorf("constructing validation candidates, %w", err)
return false, fmt.Errorf("validating command, %w", err)
}
validationCandidates = mapCandidates(cmd.candidates, validationCandidates)
// If we filtered out any candidates, return false as some NodeClaims in the consolidation decision have changed.
if len(validationCandidates) != len(cmd.candidates) {
return false, nil
// Revalidate candidates after validating the command. This mitigates the chance of a race condition outlined in
// the following GitHub issue: https://github.com/kubernetes-sigs/karpenter/issues/1167.
validatedCandidates, err = v.ValidateCandidates(ctx, validatedCandidates...)
if len(validatedCandidates) == 0 || err != nil {
return false, err
}
// Rebuild the disruption budget mapping to see if any budgets have changed since validation.
postValidationMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder)
return isValid, nil
}

// validateCandidates gets the current representation of the provided candidates and ensures that they are all still valid.
// For a candidate to still be valid, the following conditions must be met:
// a. It must pass the global candidate filtering logic (no blocking PDBs, no do-not-disrupt annotation, etc)
// b. It must not have any pods nominated for it
// c. It must still be disruptible without violating node disruption budgets
// If these conditions are met for all candidates, validateCandidates returns a slice with the updated representations.
// Otherwise, validateCandidates returns an empty slice.
func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Candidate) ([]*Candidate, error) {
validatedCandidates, err := GetCandidates(ctx, v.cluster, v.kubeClient, v.recorder, v.clock, v.cloudProvider, v.ShouldDisrupt, v.queue)
if err != nil {
return false, fmt.Errorf("building disruption budgets, %w", err)
}
// 1. a candidate we are about to delete is a target of a currently pending pod, wait for that to settle
// before continuing consolidation
// 2. the number of candidates for a given nodepool can no longer be disrupted as it would violate the budget
for _, n := range validationCandidates {
if v.cluster.IsNodeNominated(n.ProviderID()) || postValidationMapping[n.nodePool.Name] == 0 {
return false, nil
}
postValidationMapping[n.nodePool.Name]--
return nil, fmt.Errorf("constructing validation candidates, %w", err)
}
validatedCandidates = mapCandidates(candidates, validatedCandidates)
// If we filtered out any candidates, return nil as some NodeClaims in the consolidation decision have changed.
if len(validatedCandidates) != len(candidates) {
return nil, nil
}
isValid, err := v.ValidateCommand(ctx, cmd, validationCandidates)
budgetMapping, err := BuildDisruptionBudgets(ctx, v.cluster, v.clock, v.kubeClient, v.recorder)
if err != nil {
return false, fmt.Errorf("validating command, %w", err)
return nil, fmt.Errorf("building disruption budgets, %w", err)
}
return isValid, nil
// Return nil if any candidate meets either of the following conditions:
// a. A pod was nominated to the candidate
// b. Disrupting the candidate would violate node disruption budgets
for _, vc := range validatedCandidates {
if v.cluster.IsNodeNominated(vc.ProviderID()) || budgetMapping[vc.nodePool.Name] == 0 {
return nil, nil
}
budgetMapping[vc.nodePool.Name]--
}
return validatedCandidates, nil
}

// ShouldDisrupt is a predicate used to filter candidates
Expand Down
4 changes: 3 additions & 1 deletion pkg/utils/pod/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ func IsReschedulable(pod *v1.Pod) bool {
// - Is an active pod (isn't terminal or actively terminating)
// - Doesn't tolerate the "karpenter.sh/disruption=disrupting" taint
// - Isn't a mirror pod (https://kubernetes.io/docs/tasks/configure-pod-container/static-pod/)
// - Doesn't have the "karpenter.sh/do-not-disrupt=true" annotation
func IsEvictable(pod *v1.Pod) bool {
return IsActive(pod) &&
!ToleratesDisruptionNoScheduleTaint(pod) &&
!IsOwnedByNode(pod)
!IsOwnedByNode(pod) &&
IsDisruptable(pod)
}

// IsWaitingEviction checks if this is a pod that we are waiting to be removed from the node by ensuring that the pod:
Expand Down

0 comments on commit d348441

Please sign in to comment.