Skip to content

Commit

Permalink
more err handling changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Apr 24, 2024
1 parent 28b44f1 commit b1fe58a
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 42 deletions.
9 changes: 7 additions & 2 deletions pkg/controllers/disruption/emptynodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,15 @@ func (c *EmptyNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
v := NewValidation(c.clock, c.cluster, c.kubeClient, c.provisioner, c.cloudProvider, c.recorder, c.queue)
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, err
}
// In addition to the normal candidate validation checks, also check that the nodes are still empty
if len(validatedCandidates) == 0 || lo.ContainsBy(validatedCandidates, func(c *Candidate) bool {

// TODO (jmdeal@): better encapsulate within validation
if lo.ContainsBy(validatedCandidates, func(c *Candidate) bool {
return len(c.reschedulablePods) != 0
}) {
logging.FromContext(ctx).Debugf("abandoning empty node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
Expand Down
15 changes: 6 additions & 9 deletions pkg/controllers/disruption/multinodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,12 @@ func (m *MultiNodeConsolidation) ComputeCommand(ctx context.Context, disruptionB
return cmd, scheduling.Results{}, nil
}

v := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue)
isValid, err := v.IsValid(ctx, cmd, consolidationTTL)
if err != nil {
return Command{}, scheduling.Results{}, fmt.Errorf("validating, %w", err)
}

if !isValid {
logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
if err := NewValidation(m.clock, m.cluster, m.kubeClient, m.provisioner, m.cloudProvider, m.recorder, m.queue).IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning multi-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
}
return cmd, results, nil
}
Expand Down
11 changes: 5 additions & 6 deletions pkg/controllers/disruption/singlenodeconsolidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,13 @@ func (s *SingleNodeConsolidation) ComputeCommand(ctx context.Context, disruption
if cmd.Action() == NoOpAction {
continue
}
isValid, err := v.IsValid(ctx, cmd, consolidationTTL)
if err != nil {
if err := v.IsValid(ctx, cmd, consolidationTTL); err != nil {
if IsValidationError(err) {
logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return Command{}, scheduling.Results{}, fmt.Errorf("validating consolidation, %w", err)
}
if !isValid {
logging.FromContext(ctx).Debugf("abandoning single-node consolidation attempt due to pod churn, command is no longer valid, %s", cmd)
return Command{}, scheduling.Results{}, nil
}
return cmd, results, nil
}
if !constrainedByBudgets {
Expand Down
46 changes: 21 additions & 25 deletions pkg/controllers/disruption/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewValidation(clk clock.Clock, cluster *state.Cluster, kubeClient client.Cl
}
}

func (v *Validation) IsValid(ctx context.Context, cmd Command, validationPeriod time.Duration) (bool, error) {
func (v *Validation) IsValid(ctx context.Context, cmd Command, validationPeriod time.Duration) error {
var err error
v.once.Do(func() {
v.start = v.clock.Now()
Expand All @@ -89,30 +89,23 @@ func (v *Validation) IsValid(ctx context.Context, cmd Command, validationPeriod
if waitDuration > 0 {
select {
case <-ctx.Done():
return false, errors.New("context canceled")
return errors.New("context canceled")
case <-v.clock.After(waitDuration):
}
}
validatedCandidates, err := v.ValidateCandidates(ctx, cmd.candidates...)
if err != nil {
if IsValidationError(err) {
return false, nil
}
return false, err
return err
}
isValid, err := v.ValidateCommand(ctx, cmd, validatedCandidates)
if err != nil {
return false, fmt.Errorf("validating command, %w", err)
if err := v.ValidateCommand(ctx, cmd, validatedCandidates); err != nil {
return err
}
// Revalidate candidates after validating the command. This mitigates the chance of a race condition outlined in
// the following GitHub issue: https://github.com/kubernetes-sigs/karpenter/issues/1167.
if _, err = v.ValidateCandidates(ctx, validatedCandidates...); err != nil {
if IsValidationError(err) {
return false, nil
}
return false, err
return err
}
return isValid, nil
return nil
}

// ValidateCandidates gets the current representation of the provided candidates and ensures that they are all still valid.
Expand Down Expand Up @@ -141,9 +134,12 @@ func (v *Validation) ValidateCandidates(ctx context.Context, candidates ...*Cand
// a. A pod was nominated to the candidate
// b. Disrupting the candidate would violate node disruption budgets
for _, vc := range validatedCandidates {
if v.cluster.IsNodeNominated(vc.ProviderID()) || disruptionBudgetMapping[vc.nodePool.Name] == 0 {
if v.cluster.IsNodeNominated(vc.ProviderID()) {
return nil, NewValidationError(fmt.Errorf("a candidate was nominated during validation"))
}
if disruptionBudgetMapping[vc.nodePool.Name] == 0 {
return nil, NewValidationError(fmt.Errorf("a candidate can no longer be disrupted without violating budgets"))
}
disruptionBudgetMapping[vc.nodePool.Name]--
}
return validatedCandidates, nil
Expand All @@ -159,17 +155,17 @@ func (v *Validation) ShouldDisrupt(_ context.Context, c *Candidate) bool {
}

// ValidateCommand validates a command for a Method
func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidates []*Candidate) (bool, error) {
func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidates []*Candidate) error {
// None of the chosen candidate are valid for execution, so retry
if len(candidates) == 0 {
return false, nil
return NewValidationError(fmt.Errorf("no candidates"))
}
results, err := SimulateScheduling(ctx, v.kubeClient, v.cluster, v.provisioner, candidates...)
if err != nil {
return false, fmt.Errorf("simluating scheduling, %w", err)
return fmt.Errorf("simluating scheduling, %w", err)
}
if !results.AllNonPendingPodsScheduled() {
return false, nil
return NewValidationError(fmt.Errorf("all pending pods could not be scheduled"))
}

// We want to ensure that the re-simulated scheduling using the current cluster state produces the same result.
Expand All @@ -181,22 +177,22 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
if len(results.NewNodeClaims) == 0 {
if len(cmd.replacements) == 0 {
// scheduling produced zero new NodeClaims and we weren't expecting any, so this is valid.
return true, nil
return nil
}
// if it produced no new NodeClaims, but we were expecting one we should re-simulate as there is likely a better
// consolidation option now
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// we need more than one replacement node which is never valid currently (all of our node replacement is m->1, never m->n)
if len(results.NewNodeClaims) > 1 {
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// we now know that scheduling simulation wants to create one new node
if len(cmd.replacements) == 0 {
// but we weren't expecting any new NodeClaims, so this is invalid
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// We know that the scheduling simulation wants to create a new node and that the command we are verifying wants
Expand All @@ -211,11 +207,11 @@ func (v *Validation) ValidateCommand(ctx context.Context, cmd Command, candidate
// now says that we need to launch a 4xlarge. It's still launching the correct number of NodeClaims, but it's just
// as expensive or possibly more so we shouldn't validate.
if !instanceTypesAreSubset(cmd.replacements[0].InstanceTypeOptions, results.NewNodeClaims[0].InstanceTypeOptions) {
return false, nil
return NewValidationError(fmt.Errorf("scheduling simulation produced new results"))
}

// Now we know:
// - current scheduling simulation says to create a new node with types T = {T_0, T_1, ..., T_n}
// - our lifecycle command says to create a node with types {U_0, U_1, ..., U_n} where U is a subset of T
return true, nil
return nil
}

0 comments on commit b1fe58a

Please sign in to comment.