Skip to content

Commit

Permalink
Merge pull request #86 from atlassian-labs/vportella/light-pending-ph…
Browse files Browse the repository at this point in the history
…ase-refactor

Light Pending phase refactor help with future work to deal with problem nodes
  • Loading branch information
vincentportella authored Aug 20, 2024
2 parents 9e73b88 + ccc6ae5 commit 65b577a
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 86 deletions.
10 changes: 10 additions & 0 deletions pkg/cloudprovider/aws/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ func (m *Autoscaling) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoS
}, nil
}

func (m *Autoscaling) AttachInstances(input *autoscaling.AttachInstancesInput) (*autoscaling.AttachInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.AutoscalingGroupName = *input.AutoScalingGroupName
}
}

return &autoscaling.AttachInstancesOutput{}, nil
}

// *************** EC2 *************** //

func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
Expand Down
58 changes: 58 additions & 0 deletions pkg/controller/cyclenoderequest/transitioner/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package transitioner
import (
"fmt"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"

v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1"
Expand Down Expand Up @@ -140,6 +142,62 @@ func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[st
return nil
}

// Find all the nodes in kube and the cloud provider that match the node selector and nodegroups
// specified in the CNR. These are two separate sets and the contents of one does not affect the
// contents of the other.
func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[string]corev1.Node, cloudProviderInstances map[string]cloudprovider.Instance, err error) {
kubeNodes, err = t.listReadyNodes(true)
if err != nil {
return kubeNodes, cloudProviderInstances, err
}

if len(kubeNodes) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("no nodes matched selector")
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string

for _, node := range kubeNodes {
nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID)
}

existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs)
if err != nil {
return kubeNodes, cloudProviderInstances, errors.Wrap(err, "failed to check instances that exist from cloud provider")
}

existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("no existing nodes in cloud provider matched selector")
}

nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames()

// Describe the node group for the request
t.rm.LogEvent(t.cycleNodeRequest, "FetchingNodeGroup", "Fetching node group: %v", nodeGroupNames)

if len(nodeGroupNames) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("must have at least one nodegroup name defined")
}

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(nodeGroupNames)
if err != nil {
return kubeNodes, cloudProviderInstances, err
}

return kubeNodes, nodeGroups.Instances(), nil
}

// newCycleNodeRequestNode converts a corev1.Node to a v1.CycleNodeRequestNode. This is done multiple
// times across the code, this function standardises the process
func newCycleNodeRequestNode(kubeNode *corev1.Node, nodeGroupName string) v1.CycleNodeRequestNode {
Expand Down
112 changes: 41 additions & 71 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"time"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -62,72 +61,51 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

kubeNodes, err := t.listReadyNodes(true)
// Start the equilibrium wait timer, if this times out as the set of nodes in kube and
// the cloud provider is not considered valid, then transition to the Healing phase as
// cycling should not proceed.
timedOut, err := t.equilibriumWaitTimedOut()
if err != nil {
return t.transitionToHealing(err)
}

if len(kubeNodes) == 0 {
return t.transitionToHealing(fmt.Errorf("no nodes matched selector"))
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string

for _, node := range kubeNodes {
nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID)
}

existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs)
if err != nil {
return t.transitionToHealing(errors.Wrap(err, "failed to check instances that exist from cloud provider"))
}

existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return t.transitionToHealing(fmt.Errorf("no existing nodes in cloud provider matched selector"))
if timedOut {
return t.transitionToHealing(fmt.Errorf(
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
))
}

nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames()

// Describe the node group for the request
t.rm.LogEvent(t.cycleNodeRequest, "FetchingNodeGroup", "Fetching node group: %v", nodeGroupNames)

if len(nodeGroupNames) == 0 {
return t.transitionToHealing(fmt.Errorf("must have at least one nodegroup name defined"))
}
// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(nodeGroupNames)
// Find all the nodes in kube and the cloud provider nodegroups selected by the CNR. These
// should be all the nodes in each, regardless of it they exist in both.
kubeNodes, nodeGroupInstances, err := t.findAllNodesForCycle()
if err != nil {
return t.transitionToHealing(err)
}

// get instances inside cloud provider node groups
nodeGroupInstances := nodeGroups.Instances()
// Find all the nodes nodes that exist in both kube and the cloud provider nodegroups. This is
// the valid set of nodes and can be worked on. This is an AND condition on the two initial
// sets of nodes.
validKubeNodes, validNodeGroupInstances := findValidNodes(kubeNodes, nodeGroupInstances)

// Find all the nodes that exist in either kube or the cloud provider nodegroups, but not both.
// The nodes in the cloud provider can either not exist or be detached from one of the nodegroups
// and this will be determined when dealt with. This is an XOR condition on the two initial sets
// of nodes.
nodesNotInCloudProvider, nodesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances)

// Do some sanity checking before we start filtering things
// Check the instance count of the node group matches the number of nodes found in Kubernetes
if len(kubeNodes) != len(nodeGroupInstances) {
if len(nodesNotInCloudProvider) > 0 || len(nodesNotInKube) > 0 {
var offendingNodesInfo string

nodesNotInCPNodeGroup, nodesNotInKube := findOffendingNodes(kubeNodes, nodeGroupInstances)

if len(nodesNotInCPNodeGroup) > 0 {
if len(nodesNotInCloudProvider) > 0 {
providerIDs := make([]string, 0)

for providerID := range nodesNotInCPNodeGroup {
for providerID := range nodesNotInCloudProvider {
providerIDs = append(providerIDs,
fmt.Sprintf("id %q", providerID),
)
Expand Down Expand Up @@ -156,22 +134,7 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er

t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch",
"node group: %v, kube: %v. %v",
len(nodeGroupInstances), len(kubeNodes), offendingNodesInfo)

// If it doesn't, then retry for a while in case something just scaled the node group
timedOut, err := t.equilibriumWaitTimedOut()
if err != nil {
return t.transitionToHealing(err)
}

if timedOut {
err := fmt.Errorf(
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
)

return t.transitionToHealing(err)
}
len(validNodeGroupInstances), len(validKubeNodes), offendingNodesInfo)

return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}
Expand All @@ -180,18 +143,18 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
if len(t.cycleNodeRequest.Spec.NodeNames) > 0 {
// If specific node names are provided, check they actually exist in the node group
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding named nodes to NodesToTerminate")
err := t.addNamedNodesToTerminate(kubeNodes, nodeGroupInstances)
err := t.addNamedNodesToTerminate(validKubeNodes, validNodeGroupInstances)
if err != nil {
return t.transitionToHealing(err)
}
} else {
// Otherwise just add all the nodes in the node group
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Adding all node group nodes to NodesToTerminate")

for _, kubeNode := range kubeNodes {
for _, kubeNode := range validKubeNodes {
// Check to ensure the kubeNode object maps to an existing node in the ASG
// If this isn't the case, this is a phantom node. Fail the cnr to be safe.
nodeGroupName, ok := nodeGroupInstances[kubeNode.Spec.ProviderID]
nodeGroupName, ok := validNodeGroupInstances[kubeNode.Spec.ProviderID]
if !ok {
return t.transitionToHealing(fmt.Errorf("kubeNode %s not found in the list of instances in the ASG", kubeNode.Name))
}
Expand All @@ -209,7 +172,7 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
}

if len(t.cycleNodeRequest.Spec.HealthChecks) > 0 {
if err = t.performInitialHealthChecks(kubeNodes); err != nil {
if err = t.performInitialHealthChecks(validKubeNodes); err != nil {
return t.transitionToHealing(err)
}
}
Expand Down Expand Up @@ -595,9 +558,16 @@ func (t *CycleNodeRequestTransitioner) transitionHealing() (reconcile.Result, er

// un-cordon after attach as well
t.rm.LogEvent(t.cycleNodeRequest, "UncordoningNodes", "Uncordoning nodes in node group: %v", node.Name)
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return k8s.UncordonNode(node.Name, t.rm.RawClient)
}); err != nil {
})

if apierrors.IsNotFound(err) {
continue
}

if err != nil {
return t.transitionToFailed(err)
}
}
Expand Down
Loading

0 comments on commit 65b577a

Please sign in to comment.