Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix node state in the Pending phase before cycling #87

Merged
merged 4 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func (p *provider) GetNodeGroups(names []string) (cloudprovider.NodeGroups, erro
}

// InstancesExist returns a list of the instances that exist
func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []string, err error) {
instanceIDSet := map[string]string{}
instanceIDs := []string{}
func (p *provider) InstancesExist(providerIDs []string) (map[string]interface{}, error) {
validProviderIDs := make(map[string]interface{})
instanceIDSet := make(map[string]string)
instanceIDs := make([]string, 0)

for _, providerID := range providerIDs {
instanceID, err := providerIDToInstanceID(providerID)
Expand All @@ -140,8 +141,12 @@ func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []stri

for _, reservation := range output.Reservations {
for _, instance := range reservation.Instances {
if *instance.State.Name == ec2.InstanceStateNameTerminated {
continue
}

if providerID, ok := instanceIDSet[aws.StringValue(instance.InstanceId)]; ok {
validProviderIDs = append(validProviderIDs, providerID)
validProviderIDs[providerID] = nil
}
}
}
Expand Down Expand Up @@ -190,7 +195,7 @@ func (a *autoscalingGroups) ReadyInstances() map[string]cloudprovider.Instance {
instances := make(map[string]cloudprovider.Instance)
for _, group := range a.groups {
for _, i := range group.Instances {
if aws.StringValue(i.LifecycleState) != "InService" {
if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService {
continue
}
providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), *i.AvailabilityZone)
Expand All @@ -214,7 +219,7 @@ func (a *autoscalingGroups) NotReadyInstances() map[string]cloudprovider.Instanc
instances := make(map[string]cloudprovider.Instance)
for _, group := range a.groups {
for _, i := range group.Instances {
if aws.StringValue(i.LifecycleState) != "InService" {
if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService {
providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), aws.StringValue(i.AvailabilityZone))
if err != nil {
a.logger.Info("[NotReadyInstances] skip instance which failed instanceID to providerID conversion: %v", aws.StringValue(i.InstanceId))
Expand Down
29 changes: 28 additions & 1 deletion pkg/cloudprovider/aws/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func generateAutoscalingInstance(instance *Instance) *autoscaling.Instance {
autoscalingInstance := &autoscaling.Instance{
InstanceId: aws.String(instance.InstanceID),
AvailabilityZone: aws.String(defaultAvailabilityZone),
LifecycleState: aws.String(autoscaling.LifecycleStateInService),
}

return autoscalingInstance
Expand All @@ -76,7 +77,13 @@ func (m *Autoscaling) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoS
continue
}

if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists {
if instance.State != ec2.InstanceStateNameRunning {
continue
}

// Ensure to continue if the ASG name matching one of the ones from the
// input. If the input is empty then match all ASGs
if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists && len(asgNameLookup) > 0 {
continue
}

Expand Down Expand Up @@ -121,6 +128,16 @@ func (m *Autoscaling) AttachInstances(input *autoscaling.AttachInstancesInput) (
return &autoscaling.AttachInstancesOutput{}, nil
}

func (m *Autoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.AutoscalingGroupName = ""
}
}

return &autoscaling.DetachInstancesOutput{}, nil
}

// *************** EC2 *************** //

func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) {
Expand Down Expand Up @@ -150,3 +167,13 @@ func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.Describ
},
}, nil
}

func (m *Ec2) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) {
for _, instanceId := range input.InstanceIds {
if instance, exists := m.Instances[*instanceId]; exists {
instance.State = ec2.InstanceStateNameTerminated
}
}

return &ec2.TerminateInstancesOutput{}, nil
}
2 changes: 1 addition & 1 deletion pkg/cloudprovider/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cloudprovider
// CloudProvider provides an interface to interact with a cloud provider, e.g. AWS, GCP etc.
type CloudProvider interface {
Name() string
InstancesExist([]string) ([]string, error)
InstancesExist([]string) (map[string]interface{}, error)
GetNodeGroups([]string) (NodeGroups, error)
TerminateInstance(string) error
}
Expand Down
62 changes: 27 additions & 35 deletions pkg/controller/cyclenoderequest/transitioner/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package transitioner
import (
"fmt"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"

v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1"
"github.com/atlassian-labs/cyclops/pkg/cloudprovider"
)

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
// listNodes lists nodes matching the node selector. By default lists nodes that have also
// not been touched by Cyclops. A label is used to determine whether nodes have been touched
// by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes := make(map[string]corev1.Node)

// Get the nodes
Expand All @@ -36,13 +35,32 @@ func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (m
}
}

// Only add "Ready" nodes
nodes[node.Spec.ProviderID] = node
}

return nodes, nil
}

// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops.
// A label is used to determine whether nodes have been touched by this CycleNodeRequest.
func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) {
nodes, err := t.listNodes(includeInProgress)
if err != nil {
return nil, err
}

for providerID, node := range nodes {
nodeReady := false

for _, cond := range node.Status.Conditions {
if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue {
nodes[node.Spec.ProviderID] = node
break
nodeReady = true
}
}

if !nodeReady {
delete(nodes, providerID)
}
}

return nodes, nil
Expand Down Expand Up @@ -146,7 +164,7 @@ func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[st
// specified in the CNR. These are two separate sets and the contents of one does not affect the
// contents of the other.
func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[string]corev1.Node, cloudProviderInstances map[string]cloudprovider.Instance, err error) {
kubeNodes, err = t.listReadyNodes(true)
kubeNodes, err = t.listNodes(true)
if err != nil {
return kubeNodes, cloudProviderInstances, err
}
Expand All @@ -155,32 +173,6 @@ func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[str
return kubeNodes, cloudProviderInstances, fmt.Errorf("no nodes matched selector")
}

// Only retain nodes which still exist inside cloud provider
var nodeProviderIDs []string

for _, node := range kubeNodes {
nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID)
}

existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs)
if err != nil {
return kubeNodes, cloudProviderInstances, errors.Wrap(err, "failed to check instances that exist from cloud provider")
}

existingKubeNodes := make(map[string]corev1.Node)

for _, validProviderID := range existingProviderIDs {
if node, found := kubeNodes[validProviderID]; found {
existingKubeNodes[node.Spec.ProviderID] = node
}
}

kubeNodes = existingKubeNodes

if len(kubeNodes) == 0 {
return kubeNodes, cloudProviderInstances, fmt.Errorf("no existing nodes in cloud provider matched selector")
}

nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames()

// Describe the node group for the request
Expand Down
14 changes: 7 additions & 7 deletions pkg/controller/cyclenoderequest/transitioner/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ type Option func(t *Transitioner)

func WithCloudProviderInstances(nodes []*mock.Node) Option {
return func(t *Transitioner) {
t.cloudProviderInstances = append(t.cloudProviderInstances, nodes...)
t.CloudProviderInstances = append(t.CloudProviderInstances, nodes...)
}
}

func WithKubeNodes(nodes []*mock.Node) Option {
return func(t *Transitioner) {
t.kubeNodes = append(t.kubeNodes, nodes...)
t.KubeNodes = append(t.KubeNodes, nodes...)
}
}

Expand All @@ -28,23 +28,23 @@ type Transitioner struct {
*CycleNodeRequestTransitioner
*mock.Client

cloudProviderInstances []*mock.Node
kubeNodes []*mock.Node
CloudProviderInstances []*mock.Node
KubeNodes []*mock.Node
}

func NewFakeTransitioner(cnr *v1.CycleNodeRequest, opts ...Option) *Transitioner {
t := &Transitioner{
// By default there are no nodes and each test will
// override these as needed
cloudProviderInstances: make([]*mock.Node, 0),
kubeNodes: make([]*mock.Node, 0),
CloudProviderInstances: make([]*mock.Node, 0),
KubeNodes: make([]*mock.Node, 0),
}

for _, opt := range opts {
opt(t)
}

t.Client = mock.NewClient(t.kubeNodes, t.cloudProviderInstances, cnr)
t.Client = mock.NewClient(t.KubeNodes, t.CloudProviderInstances, cnr)

rm := &controller.ResourceManager{
Client: t.K8sClient,
Expand Down
104 changes: 46 additions & 58 deletions pkg/controller/cyclenoderequest/transitioner/transitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,13 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result,
// 2. describes the node group and checks that the number of instances in the node group matches the number we
// are planning on terminating
func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General note: I'm worried we're making the pending transition a bit too thick with logic. It would be nice if, longer term, we can start to add more states to deal with fixing up a CNR. We could transition from pending -> the state where we are fixing node related problems -> back to pending.

We could also just work on collapsing the total vertical space used by the function and see how we feel afterwards. To me, the function is a bit longer than acceptable at this point. I wouldn't fix this up in this particular PR though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, we should do it but not in this PR. I did toy with doing it that way but I found it would be a bigger change than I wanted.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that change should happen in its own refactoring PR.

// Start the equilibrium wait timer, if this times out as the set of nodes in kube and
// the cloud provider is not considered valid, then transition to the Healing phase as
// cycling should not proceed.
timedOut, err := t.equilibriumWaitTimedOut()
if err != nil {
// Start the equilibrium wait timer, if this times out then the set of nodes in kube and
// the cloud provider is not considered valid. Transition to the Healing phase as cycling
// should not proceed.
if err := t.errorIfEquilibriumTimeoutReached(); err != nil {
return t.transitionToHealing(err)
}

if timedOut {
return t.transitionToHealing(fmt.Errorf(
"node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v",
nodeEquilibriumWaitLimit,
))
}

// Fetch the node names for the cycleNodeRequest, using the label selector provided
t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector")

Expand All @@ -95,50 +87,47 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er
// The nodes in the cloud provider can either not exist or be detached from one of the nodegroups
// and this will be determined when dealt with. This is an XOR condition on the two initial sets
// of nodes.
nodesNotInCloudProvider, nodesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances)

// Do some sanity checking before we start filtering things
// Check the instance count of the node group matches the number of nodes found in Kubernetes
if len(nodesNotInCloudProvider) > 0 || len(nodesNotInKube) > 0 {
var offendingNodesInfo string

if len(nodesNotInCloudProvider) > 0 {
providerIDs := make([]string, 0)

for providerID := range nodesNotInCloudProvider {
providerIDs = append(providerIDs,
fmt.Sprintf("id %q", providerID),
)
}

offendingNodesInfo += "nodes not in node group: "
offendingNodesInfo += strings.Join(providerIDs, ",")
nodesNotInCloudProviderNodegroup, instancesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances)

// If the node state isn't correct then go through and attempt to fix it. The steps in this block
// attempt to fix the node state and then requeues the Pending phase to re-check. It is very
// possible that the node state changes during the steps and it cannot be fixed. Hopefully after
// a few runs the state can be fixed.
if len(nodesNotInCloudProviderNodegroup) > 0 || len(instancesNotInKube) > 0 {
t.logProblemNodes(nodesNotInCloudProviderNodegroup, instancesNotInKube)

// Try to fix the case where there are 1 or more instances matching the node selector for the
// nodegroup in kube but are not attached to the nodegroup in the cloud provider by
// re-attaching them.
if err := t.reattachAnyDetachedInstances(nodesNotInCloudProviderNodegroup); err != nil {
return t.transitionToHealing(err)
}

if len(nodesNotInKube) > 0 {
if offendingNodesInfo != "" {
offendingNodesInfo += ";"
}

providerIDs := make([]string, 0)

for providerID, node := range nodesNotInKube {
providerIDs = append(providerIDs,
fmt.Sprintf("id %q in %q", providerID, node.NodeGroupName()),
)
}

offendingNodesInfo += "nodes not inside cluster: "
offendingNodesInfo += strings.Join(providerIDs, ",")
// Try to fix the case where there are 1 or more kube node objects without any matching
// running instances in the cloud provider. This could be because of the finalizer that
// was added during a previous failed cycle.
if err := t.deleteAnyOrphanedKubeNodes(nodesNotInCloudProviderNodegroup); err != nil {
return t.transitionToHealing(err)
}

t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch",
"node group: %v, kube: %v. %v",
len(validNodeGroupInstances), len(validKubeNodes), offendingNodesInfo)
// After working through these attempts, requeue to run through the Pending phase from the
// beginning to check the full state of nodes again. If there are any problem nodes we should
// not proceed and keep requeuing until the state is fixed or the timeout has been reached.
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}

valid, err := t.validateInstanceState(validNodeGroupInstances)
if err != nil {
return t.transitionToHealing(err)
}

if !valid {
t.rm.Logger.Info("instance state not valid, requeuing")
return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil
}

t.rm.Logger.Info("instance state valid, proceeding")

// make a list of the nodes to terminate
if len(t.cycleNodeRequest.Spec.NodeNames) > 0 {
// If specific node names are provided, check they actually exist in the node group
Expand Down Expand Up @@ -281,6 +270,14 @@ func (t *CycleNodeRequestTransitioner) transitionInitialised() (reconcile.Result
return t.transitionToHealing(err)
}

t.rm.Logger.Info("Adding annotation to node", "node", node.Name)

// Add the nodegroup annotation to the node before detaching it
if err := t.rm.AddNodegroupAnnotationToNode(node.Name, node.NodeGroupName); err != nil {
t.rm.LogEvent(t.cycleNodeRequest, "AddAnnotationToNodeError", err.Error())
return t.transitionToHealing(err)
}

alreadyDetaching, err := nodeGroups.DetachInstance(node.ProviderID)

if alreadyDetaching {
Expand Down Expand Up @@ -345,16 +342,7 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result,

// Increase the kubeNode count requirement by the number of nodes which are observed to have been removed prematurely
for _, node := range t.cycleNodeRequest.Status.CurrentNodes {
var instanceFound bool = false

for _, kubeNode := range kubeNodes {
if node.Name == kubeNode.Name {
instanceFound = true
break
}
}

if !instanceFound {
if _, instanceFound := kubeNodes[node.ProviderID]; !instanceFound {
nodesToRemove = append(nodesToRemove, node)
numKubeNodesReady++
}
Expand Down
Loading
Loading