diff --git a/docs/cycling/README.md b/docs/cycling/README.md index 4f374e9..fa338e8 100644 --- a/docs/cycling/README.md +++ b/docs/cycling/README.md @@ -28,7 +28,7 @@ The CycleNodeRequest CRD handles a request to cycle nodes belonging to a specifi 2. Validate the CycleNodeRequest object's parameters, and if valid, transition the object to **Pending**. -3. In the **Pending** phase, store the nodes that will need to be cycled so we can keep track of them. Describe the node group in the cloud provider and check it to ensure it matches the nodes in Kubernetes. It will wait for a brief period for the nodes to match, in case the cluster has just scaled up or down. Transition the object to **Initialised**. +3. In the **Pending** phase, store the nodes that will need to be cycled so we can keep track of them. Describe the node group in the cloud provider and check it to ensure it matches the nodes in Kubernetes. It will wait for a brief period and proactively clean up any orphaned node objects, re-attach any instances that have been detached from the cloud provider node group, and then wait for the nodes to match in case the cluster has just scaled up or down. Transition the object to **Initialised**. 4. In the **Initialised** phase, detach a number of nodes (governed by the concurrency of the CycleNodeRequest) from the node group. This will trigger the cloud provider to add replacement nodes for each. Transition the object to **ScalingUp**. If there are no more nodes to cycle then transition to **Successful**. diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index 3d48672..7aa6beb 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -114,9 +114,10 @@ func (p *provider) GetNodeGroups(names []string) (cloudprovider.NodeGroups, erro } // InstancesExist returns a list of the instances that exist -func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []string, err error) { - instanceIDSet := map[string]string{} - instanceIDs := []string{} +func (p *provider) InstancesExist(providerIDs []string) (map[string]interface{}, error) { + validProviderIDs := make(map[string]interface{}) + instanceIDSet := make(map[string]string) + instanceIDs := make([]string, 0) for _, providerID := range providerIDs { instanceID, err := providerIDToInstanceID(providerID) @@ -140,8 +141,12 @@ func (p *provider) InstancesExist(providerIDs []string) (validProviderIDs []stri for _, reservation := range output.Reservations { for _, instance := range reservation.Instances { + if *instance.State.Name == ec2.InstanceStateNameTerminated { + continue + } + if providerID, ok := instanceIDSet[aws.StringValue(instance.InstanceId)]; ok { - validProviderIDs = append(validProviderIDs, providerID) + validProviderIDs[providerID] = nil } } } @@ -190,7 +195,7 @@ func (a *autoscalingGroups) ReadyInstances() map[string]cloudprovider.Instance { instances := make(map[string]cloudprovider.Instance) for _, group := range a.groups { for _, i := range group.Instances { - if aws.StringValue(i.LifecycleState) != "InService" { + if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService { continue } providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), *i.AvailabilityZone) @@ -214,7 +219,7 @@ func (a *autoscalingGroups) NotReadyInstances() map[string]cloudprovider.Instanc instances := make(map[string]cloudprovider.Instance) for _, group := range a.groups { for _, i := range group.Instances { - if aws.StringValue(i.LifecycleState) != "InService" { + if aws.StringValue(i.LifecycleState) != autoscaling.LifecycleStateInService { providerID, err := instanceIDToProviderID(aws.StringValue(i.InstanceId), aws.StringValue(i.AvailabilityZone)) if err != nil { a.logger.Info("[NotReadyInstances] skip instance which failed instanceID to providerID conversion: %v", aws.StringValue(i.InstanceId)) diff --git a/pkg/cloudprovider/aws/fake/fake.go b/pkg/cloudprovider/aws/fake/fake.go index 60435fc..2e5194a 100644 --- a/pkg/cloudprovider/aws/fake/fake.go +++ b/pkg/cloudprovider/aws/fake/fake.go @@ -55,6 +55,7 @@ func generateAutoscalingInstance(instance *Instance) *autoscaling.Instance { autoscalingInstance := &autoscaling.Instance{ InstanceId: aws.String(instance.InstanceID), AvailabilityZone: aws.String(defaultAvailabilityZone), + LifecycleState: aws.String(autoscaling.LifecycleStateInService), } return autoscalingInstance @@ -76,7 +77,13 @@ func (m *Autoscaling) DescribeAutoScalingGroups(input *autoscaling.DescribeAutoS continue } - if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists { + if instance.State != ec2.InstanceStateNameRunning { + continue + } + + // Ensure to continue if the ASG name matching one of the ones from the + // input. If the input is empty then match all ASGs + if _, exists := asgNameLookup[instance.AutoscalingGroupName]; !exists && len(asgNameLookup) > 0 { continue } @@ -121,6 +128,16 @@ func (m *Autoscaling) AttachInstances(input *autoscaling.AttachInstancesInput) ( return &autoscaling.AttachInstancesOutput{}, nil } +func (m *Autoscaling) DetachInstances(input *autoscaling.DetachInstancesInput) (*autoscaling.DetachInstancesOutput, error) { + for _, instanceId := range input.InstanceIds { + if instance, exists := m.Instances[*instanceId]; exists { + instance.AutoscalingGroupName = "" + } + } + + return &autoscaling.DetachInstancesOutput{}, nil +} + // *************** EC2 *************** // func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.DescribeInstancesOutput, error) { @@ -150,3 +167,13 @@ func (m *Ec2) DescribeInstances(input *ec2.DescribeInstancesInput) (*ec2.Describ }, }, nil } + +func (m *Ec2) TerminateInstances(input *ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) { + for _, instanceId := range input.InstanceIds { + if instance, exists := m.Instances[*instanceId]; exists { + instance.State = ec2.InstanceStateNameTerminated + } + } + + return &ec2.TerminateInstancesOutput{}, nil +} diff --git a/pkg/cloudprovider/interface.go b/pkg/cloudprovider/interface.go index 53bc76d..9a6b778 100644 --- a/pkg/cloudprovider/interface.go +++ b/pkg/cloudprovider/interface.go @@ -3,7 +3,7 @@ package cloudprovider // CloudProvider provides an interface to interact with a cloud provider, e.g. AWS, GCP etc. type CloudProvider interface { Name() string - InstancesExist([]string) ([]string, error) + InstancesExist([]string) (map[string]interface{}, error) GetNodeGroups([]string) (NodeGroups, error) TerminateInstance(string) error } diff --git a/pkg/controller/cyclenoderequest/transitioner/node.go b/pkg/controller/cyclenoderequest/transitioner/node.go index 08ee3d7..c509591 100644 --- a/pkg/controller/cyclenoderequest/transitioner/node.go +++ b/pkg/controller/cyclenoderequest/transitioner/node.go @@ -3,17 +3,16 @@ package transitioner import ( "fmt" - "github.com/pkg/errors" - corev1 "k8s.io/api/core/v1" v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1" "github.com/atlassian-labs/cyclops/pkg/cloudprovider" ) -// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops. -// A label is used to determine whether nodes have been touched by this CycleNodeRequest. -func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) { +// listNodes lists nodes matching the node selector. By default lists nodes that have also +// not been touched by Cyclops. A label is used to determine whether nodes have been touched +// by this CycleNodeRequest. +func (t *CycleNodeRequestTransitioner) listNodes(includeInProgress bool) (map[string]corev1.Node, error) { nodes := make(map[string]corev1.Node) // Get the nodes @@ -36,13 +35,32 @@ func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (m } } - // Only add "Ready" nodes + nodes[node.Spec.ProviderID] = node + } + + return nodes, nil +} + +// listReadyNodes lists nodes that are "ready". By default lists nodes that have also not been touched by Cyclops. +// A label is used to determine whether nodes have been touched by this CycleNodeRequest. +func (t *CycleNodeRequestTransitioner) listReadyNodes(includeInProgress bool) (map[string]corev1.Node, error) { + nodes, err := t.listNodes(includeInProgress) + if err != nil { + return nil, err + } + + for providerID, node := range nodes { + nodeReady := false + for _, cond := range node.Status.Conditions { if cond.Type == corev1.NodeReady && cond.Status == corev1.ConditionTrue { - nodes[node.Spec.ProviderID] = node - break + nodeReady = true } } + + if !nodeReady { + delete(nodes, providerID) + } } return nodes, nil @@ -146,7 +164,7 @@ func (t *CycleNodeRequestTransitioner) addNamedNodesToTerminate(kubeNodes map[st // specified in the CNR. These are two separate sets and the contents of one does not affect the // contents of the other. func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[string]corev1.Node, cloudProviderInstances map[string]cloudprovider.Instance, err error) { - kubeNodes, err = t.listReadyNodes(true) + kubeNodes, err = t.listNodes(true) if err != nil { return kubeNodes, cloudProviderInstances, err } @@ -155,32 +173,6 @@ func (t *CycleNodeRequestTransitioner) findAllNodesForCycle() (kubeNodes map[str return kubeNodes, cloudProviderInstances, fmt.Errorf("no nodes matched selector") } - // Only retain nodes which still exist inside cloud provider - var nodeProviderIDs []string - - for _, node := range kubeNodes { - nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID) - } - - existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs) - if err != nil { - return kubeNodes, cloudProviderInstances, errors.Wrap(err, "failed to check instances that exist from cloud provider") - } - - existingKubeNodes := make(map[string]corev1.Node) - - for _, validProviderID := range existingProviderIDs { - if node, found := kubeNodes[validProviderID]; found { - existingKubeNodes[node.Spec.ProviderID] = node - } - } - - kubeNodes = existingKubeNodes - - if len(kubeNodes) == 0 { - return kubeNodes, cloudProviderInstances, fmt.Errorf("no existing nodes in cloud provider matched selector") - } - nodeGroupNames := t.cycleNodeRequest.GetNodeGroupNames() // Describe the node group for the request diff --git a/pkg/controller/cyclenoderequest/transitioner/test_helpers.go b/pkg/controller/cyclenoderequest/transitioner/test_helpers.go index 660ae16..7e238d0 100644 --- a/pkg/controller/cyclenoderequest/transitioner/test_helpers.go +++ b/pkg/controller/cyclenoderequest/transitioner/test_helpers.go @@ -12,13 +12,13 @@ type Option func(t *Transitioner) func WithCloudProviderInstances(nodes []*mock.Node) Option { return func(t *Transitioner) { - t.cloudProviderInstances = append(t.cloudProviderInstances, nodes...) + t.CloudProviderInstances = append(t.CloudProviderInstances, nodes...) } } func WithKubeNodes(nodes []*mock.Node) Option { return func(t *Transitioner) { - t.kubeNodes = append(t.kubeNodes, nodes...) + t.KubeNodes = append(t.KubeNodes, nodes...) } } @@ -28,23 +28,23 @@ type Transitioner struct { *CycleNodeRequestTransitioner *mock.Client - cloudProviderInstances []*mock.Node - kubeNodes []*mock.Node + CloudProviderInstances []*mock.Node + KubeNodes []*mock.Node } func NewFakeTransitioner(cnr *v1.CycleNodeRequest, opts ...Option) *Transitioner { t := &Transitioner{ // By default there are no nodes and each test will // override these as needed - cloudProviderInstances: make([]*mock.Node, 0), - kubeNodes: make([]*mock.Node, 0), + CloudProviderInstances: make([]*mock.Node, 0), + KubeNodes: make([]*mock.Node, 0), } for _, opt := range opts { opt(t) } - t.Client = mock.NewClient(t.kubeNodes, t.cloudProviderInstances, cnr) + t.Client = mock.NewClient(t.KubeNodes, t.CloudProviderInstances, cnr) rm := &controller.ResourceManager{ Client: t.K8sClient, diff --git a/pkg/controller/cyclenoderequest/transitioner/transitions.go b/pkg/controller/cyclenoderequest/transitioner/transitions.go index b439544..310c7e6 100644 --- a/pkg/controller/cyclenoderequest/transitioner/transitions.go +++ b/pkg/controller/cyclenoderequest/transitioner/transitions.go @@ -61,21 +61,13 @@ func (t *CycleNodeRequestTransitioner) transitionUndefined() (reconcile.Result, // 2. describes the node group and checks that the number of instances in the node group matches the number we // are planning on terminating func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, error) { - // Start the equilibrium wait timer, if this times out as the set of nodes in kube and - // the cloud provider is not considered valid, then transition to the Healing phase as - // cycling should not proceed. - timedOut, err := t.equilibriumWaitTimedOut() - if err != nil { + // Start the equilibrium wait timer, if this times out then the set of nodes in kube and + // the cloud provider is not considered valid. Transition to the Healing phase as cycling + // should not proceed. + if err := t.errorIfEquilibriumTimeoutReached(); err != nil { return t.transitionToHealing(err) } - if timedOut { - return t.transitionToHealing(fmt.Errorf( - "node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v", - nodeEquilibriumWaitLimit, - )) - } - // Fetch the node names for the cycleNodeRequest, using the label selector provided t.rm.LogEvent(t.cycleNodeRequest, "SelectingNodes", "Selecting nodes with label selector") @@ -95,50 +87,47 @@ func (t *CycleNodeRequestTransitioner) transitionPending() (reconcile.Result, er // The nodes in the cloud provider can either not exist or be detached from one of the nodegroups // and this will be determined when dealt with. This is an XOR condition on the two initial sets // of nodes. - nodesNotInCloudProvider, nodesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances) - - // Do some sanity checking before we start filtering things - // Check the instance count of the node group matches the number of nodes found in Kubernetes - if len(nodesNotInCloudProvider) > 0 || len(nodesNotInKube) > 0 { - var offendingNodesInfo string - - if len(nodesNotInCloudProvider) > 0 { - providerIDs := make([]string, 0) - - for providerID := range nodesNotInCloudProvider { - providerIDs = append(providerIDs, - fmt.Sprintf("id %q", providerID), - ) - } - - offendingNodesInfo += "nodes not in node group: " - offendingNodesInfo += strings.Join(providerIDs, ",") + nodesNotInCloudProviderNodegroup, instancesNotInKube := findProblemNodes(kubeNodes, nodeGroupInstances) + + // If the node state isn't correct then go through and attempt to fix it. The steps in this block + // attempt to fix the node state and then requeues the Pending phase to re-check. It is very + // possible that the node state changes during the steps and it cannot be fixed. Hopefully after + // a few runs the state can be fixed. + if len(nodesNotInCloudProviderNodegroup) > 0 || len(instancesNotInKube) > 0 { + t.logProblemNodes(nodesNotInCloudProviderNodegroup, instancesNotInKube) + + // Try to fix the case where there are 1 or more instances matching the node selector for the + // nodegroup in kube but are not attached to the nodegroup in the cloud provider by + // re-attaching them. + if err := t.reattachAnyDetachedInstances(nodesNotInCloudProviderNodegroup); err != nil { + return t.transitionToHealing(err) } - if len(nodesNotInKube) > 0 { - if offendingNodesInfo != "" { - offendingNodesInfo += ";" - } - - providerIDs := make([]string, 0) - - for providerID, node := range nodesNotInKube { - providerIDs = append(providerIDs, - fmt.Sprintf("id %q in %q", providerID, node.NodeGroupName()), - ) - } - - offendingNodesInfo += "nodes not inside cluster: " - offendingNodesInfo += strings.Join(providerIDs, ",") + // Try to fix the case where there are 1 or more kube node objects without any matching + // running instances in the cloud provider. This could be because of the finalizer that + // was added during a previous failed cycle. + if err := t.deleteAnyOrphanedKubeNodes(nodesNotInCloudProviderNodegroup); err != nil { + return t.transitionToHealing(err) } - t.rm.LogEvent(t.cycleNodeRequest, "NodeCountMismatch", - "node group: %v, kube: %v. %v", - len(validNodeGroupInstances), len(validKubeNodes), offendingNodesInfo) + // After working through these attempts, requeue to run through the Pending phase from the + // beginning to check the full state of nodes again. If there are any problem nodes we should + // not proceed and keep requeuing until the state is fixed or the timeout has been reached. + return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil + } + + valid, err := t.validateInstanceState(validNodeGroupInstances) + if err != nil { + return t.transitionToHealing(err) + } + if !valid { + t.rm.Logger.Info("instance state not valid, requeuing") return reconcile.Result{Requeue: true, RequeueAfter: requeueDuration}, nil } + t.rm.Logger.Info("instance state valid, proceeding") + // make a list of the nodes to terminate if len(t.cycleNodeRequest.Spec.NodeNames) > 0 { // If specific node names are provided, check they actually exist in the node group @@ -281,6 +270,14 @@ func (t *CycleNodeRequestTransitioner) transitionInitialised() (reconcile.Result return t.transitionToHealing(err) } + t.rm.Logger.Info("Adding annotation to node", "node", node.Name) + + // Add the nodegroup annotation to the node before detaching it + if err := t.rm.AddNodegroupAnnotationToNode(node.Name, node.NodeGroupName); err != nil { + t.rm.LogEvent(t.cycleNodeRequest, "AddAnnotationToNodeError", err.Error()) + return t.transitionToHealing(err) + } + alreadyDetaching, err := nodeGroups.DetachInstance(node.ProviderID) if alreadyDetaching { @@ -345,16 +342,7 @@ func (t *CycleNodeRequestTransitioner) transitionScalingUp() (reconcile.Result, // Increase the kubeNode count requirement by the number of nodes which are observed to have been removed prematurely for _, node := range t.cycleNodeRequest.Status.CurrentNodes { - var instanceFound bool = false - - for _, kubeNode := range kubeNodes { - if node.Name == kubeNode.Name { - instanceFound = true - break - } - } - - if !instanceFound { + if _, instanceFound := kubeNodes[node.ProviderID]; !instanceFound { nodesToRemove = append(nodesToRemove, node) numKubeNodesReady++ } diff --git a/pkg/controller/cyclenoderequest/transitioner/transitions_initialized_test.go b/pkg/controller/cyclenoderequest/transitioner/transitions_initialized_test.go new file mode 100644 index 0000000..e39e869 --- /dev/null +++ b/pkg/controller/cyclenoderequest/transitioner/transitions_initialized_test.go @@ -0,0 +1,94 @@ +package transitioner + +import ( + "context" + "testing" + + v1 "github.com/atlassian-labs/cyclops/pkg/apis/atlassian/v1" + "github.com/atlassian-labs/cyclops/pkg/mock" + "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/stretchr/testify/assert" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Base case of the Initialized phase. Start cycling by detaching an instance +// and adding the cycling finalizer and annotation to it. +func TestInitializedSimpleCase(t *testing.T) { + nodegroup, err := mock.NewNodegroup("ng-1", 1) + if err != nil { + assert.NoError(t, err) + } + + // CNR straight after being transitioned from Pending + cnr := &v1.CycleNodeRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cnr-1", + Namespace: "kube-system", + }, + Spec: v1.CycleNodeRequestSpec{ + NodeGroupsList: []string{"ng-1"}, + CycleSettings: v1.CycleSettings{ + Concurrency: 1, + Method: v1.CycleNodeRequestMethodDrain, + }, + Selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "customer": "kitt", + }, + }, + }, + Status: v1.CycleNodeRequestStatus{ + Phase: v1.CycleNodeRequestInitialised, + NodesToTerminate: []v1.CycleNodeRequestNode{ + { + Name: nodegroup[0].Name, + NodeGroupName: nodegroup[0].Nodegroup, + }, + }, + NodesAvailable: []v1.CycleNodeRequestNode{ + { + Name: nodegroup[0].Name, + NodeGroupName: nodegroup[0].Nodegroup, + }, + }, + }, + } + + fakeTransitioner := NewFakeTransitioner(cnr, + WithKubeNodes(nodegroup), + WithCloudProviderInstances(nodegroup), + ) + + // Populate the provider id because it gets generated in NewFakeTransitioner + cnr.Status.NodesToTerminate[0].ProviderID = fakeTransitioner.KubeNodes[0].ProviderID + cnr.Status.NodesAvailable[0].ProviderID = fakeTransitioner.KubeNodes[0].ProviderID + + // Execute the Initialized phase + _, err = fakeTransitioner.Run() + assert.NoError(t, err) + assert.Equal(t, v1.CycleNodeRequestScalingUp, cnr.Status.Phase) + assert.Len(t, cnr.Status.NodesToTerminate, 1) + assert.Len(t, cnr.Status.NodesAvailable, 0) + + // Ensure that the instance is detached from the ASG + output, err := fakeTransitioner.Autoscaling.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{}) + assert.NoError(t, err) + + // Quirk of the mocking, it constructs the list of ASGs from the list of + // nodes so 0 ASGs here means that the node has been detached + assert.Equal(t, 0, len(output.AutoScalingGroups)) + + // Ensure the node has the cycling finalizer applied to it + node, err := fakeTransitioner.rm.RawClient.CoreV1().Nodes().Get( + context.TODO(), nodegroup[0].Name, metav1.GetOptions{}, + ) + + assert.NoError(t, err) + assert.Len(t, node.Finalizers, 1) + + // Ensure the node has the cycling annotation applied to it + nodegroupName, err := fakeTransitioner.rm.GetNodegroupFromNodeAnnotation(nodegroup[0].Name) + assert.NoError(t, err) + assert.Equal(t, "ng-1", nodegroupName) +} diff --git a/pkg/controller/cyclenoderequest/transitioner/transitions_pending_test.go b/pkg/controller/cyclenoderequest/transitioner/transitions_pending_test.go index 456b511..f44231f 100644 --- a/pkg/controller/cyclenoderequest/transitioner/transitions_pending_test.go +++ b/pkg/controller/cyclenoderequest/transitioner/transitions_pending_test.go @@ -8,6 +8,7 @@ import ( "github.com/atlassian-labs/cyclops/pkg/mock" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/ec2" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -293,10 +294,65 @@ func TestPendingNoKubeNodes(t *testing.T) { assert.Equal(t, v1.CycleNodeRequestHealing, cnr.Status.Phase) } -// Test to ensure that Cyclops will not proceed if there is node detached from -// the nodegroup on the cloud provider. It should try to wait for the issue to -// resolve to transition to the Healing phase if it doesn't. -func TestPendingDetachedCloudProviderNode(t *testing.T) { +// Test to ensure that node objects that exist in the cluster without a matching +// instance in the cloud provider are cleaned up and then cycling can proceed +// as normal. +func TestPendingOrphanedKubeNodes(t *testing.T) { + nodegroup, err := mock.NewNodegroup("ng-1", 2) + if err != nil { + assert.NoError(t, err) + } + + cnr := &v1.CycleNodeRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cnr-1", + Namespace: "kube-system", + }, + Spec: v1.CycleNodeRequestSpec{ + NodeGroupsList: []string{"ng-1"}, + CycleSettings: v1.CycleSettings{ + Concurrency: 1, + Method: v1.CycleNodeRequestMethodDrain, + }, + Selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "customer": "kitt", + }, + }, + }, + Status: v1.CycleNodeRequestStatus{ + Phase: v1.CycleNodeRequestPending, + }, + } + + fakeTransitioner := NewFakeTransitioner(cnr, + WithKubeNodes(nodegroup), + WithCloudProviderInstances(nodegroup[:1]), + ) + + // The first run of the transitioner should go through and try to fix the + // inconsistency between kube and the cloud provider and will requeue the + // Pending phase to check again. + _, err = fakeTransitioner.Run() + assert.NoError(t, err) + assert.Equal(t, v1.CycleNodeRequestPending, cnr.Status.Phase) + + // The existing instance in the cloud provider should still be the only one + output, err := fakeTransitioner.Ec2.DescribeInstances(&ec2.DescribeInstancesInput{}) + assert.NoError(t, err) + assert.Equal(t, 1, len(output.Reservations[0].Instances)) + + // After running again, the orphaned kube node is observed to have been + // removed and the CNR was transitioned to the Initialized phase + _, err = fakeTransitioner.Run() + assert.NoError(t, err) + assert.Equal(t, v1.CycleNodeRequestInitialised, cnr.Status.Phase) + assert.Len(t, cnr.Status.NodesToTerminate, 1) +} + +// Test to ensure that an instance detached from one of the CNR named cloud +// provider nodegroups is re-attached before proceeding with cycling. +func TestPendingDetachedCloudProviderInstance(t *testing.T) { nodegroup, err := mock.NewNodegroup("ng-1", 2) if err != nil { assert.NoError(t, err) @@ -332,27 +388,94 @@ func TestPendingDetachedCloudProviderNode(t *testing.T) { WithCloudProviderInstances(nodegroup), ) - // Should requeue while it tries to wait + // Ensure there's only one instance in the ASG + output, err := fakeTransitioner.Autoscaling.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{}) + assert.NoError(t, err) + assert.Equal(t, 1, len(output.AutoScalingGroups)) + assert.Equal(t, 1, len(output.AutoScalingGroups[0].Instances)) + + // The first run of the transitioner should go through and try to fix the + // inconsistency between kube and the cloud provider and will requeue the + // Pending phase to check again. _, err = fakeTransitioner.Run() assert.NoError(t, err) assert.Equal(t, v1.CycleNodeRequestPending, cnr.Status.Phase) - // Simulate waiting for 1s more than the wait limit - cnr.Status.EquilibriumWaitStarted = &metav1.Time{ - Time: time.Now().Add(-nodeEquilibriumWaitLimit - time.Second), + // Ensure both instance are now in the ASG + output, err = fakeTransitioner.Autoscaling.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{}) + assert.NoError(t, err) + assert.Equal(t, 1, len(output.AutoScalingGroups)) + assert.Equal(t, 2, len(output.AutoScalingGroups[0].Instances)) + + // This time should transition to the initialized phase + _, err = fakeTransitioner.Run() + assert.NoError(t, err) + assert.Equal(t, v1.CycleNodeRequestInitialised, cnr.Status.Phase) + assert.Len(t, cnr.Status.NodesToTerminate, 2) +} + +// Test to ensure that a detached instance with a matching kube object that does +// not have the cycling annotation to identify it's original cloud provider +// nodegroup should cause the cycling to fail immediately since this is a case +// that cannot be fixed automatically. +func TestPendingDetachedCloudProviderInstanceNoAnnotation(t *testing.T) { + nodegroup, err := mock.NewNodegroup("ng-1", 2) + if err != nil { + assert.NoError(t, err) } - // This time should transition to the healing phase + // "detach" one instance from the asg and simulate the node not having + // the annotation + nodegroup[0].Nodegroup = "" + nodegroup[0].AnnotationKey = "" + nodegroup[0].AnnotationValue = "" + + cnr := &v1.CycleNodeRequest{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cnr-1", + Namespace: "kube-system", + }, + Spec: v1.CycleNodeRequestSpec{ + NodeGroupsList: []string{"ng-1"}, + CycleSettings: v1.CycleSettings{ + Concurrency: 1, + Method: v1.CycleNodeRequestMethodDrain, + }, + Selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "customer": "kitt", + }, + }, + }, + Status: v1.CycleNodeRequestStatus{ + Phase: v1.CycleNodeRequestPending, + }, + } + + fakeTransitioner := NewFakeTransitioner(cnr, + WithKubeNodes(nodegroup), + WithCloudProviderInstances(nodegroup), + ) + + // Ensure there's only one instance in the ASG + output, err := fakeTransitioner.Autoscaling.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{}) + assert.NoError(t, err) + assert.Equal(t, 1, len(output.AutoScalingGroups)) + assert.Equal(t, 1, len(output.AutoScalingGroups[0].Instances)) + + // The first run of the transitioner should go through and try to fix the + // inconsistency between kube and the cloud provider. However, the node + // object for the detached instance does not have the nodegroup annotation + // so it cannot be re-attached. The CNR should fail because this cannot be + // fixed. _, err = fakeTransitioner.Run() assert.Error(t, err) assert.Equal(t, v1.CycleNodeRequestHealing, cnr.Status.Phase) } -// Test to ensure that Cyclops will not proceed if there is node detached from -// the nodegroup on the cloud provider. It should try to wait for the issue to -// resolve and transition to Initialised when it does before reaching the -// timeout period. -func TestPendingReattachedCloudProviderNode(t *testing.T) { +// Test to ensure that if the instance state cannot be fixed during the +// equilibrium timeout period, then cycling is failed. +func TestPendingTimeoutReached(t *testing.T) { nodegroup, err := mock.NewNodegroup("ng-1", 2) if err != nil { assert.NoError(t, err) @@ -388,32 +511,39 @@ func TestPendingReattachedCloudProviderNode(t *testing.T) { WithCloudProviderInstances(nodegroup), ) - // Should requeue while it tries to wait + output, err := fakeTransitioner.Autoscaling.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{}) + assert.NoError(t, err) + assert.Equal(t, 1, len(output.AutoScalingGroups)) + assert.Equal(t, 1, len(output.AutoScalingGroups[0].Instances)) + + // The first run of the transitioner should go through and try to fix the + // inconsistency between kube and the cloud provider and will requeue the + // Pending phase to check again. _, err = fakeTransitioner.Run() assert.NoError(t, err) assert.Equal(t, v1.CycleNodeRequestPending, cnr.Status.Phase) - // Simulate waiting for 1s less than the wait limit - cnr.Status.EquilibriumWaitStarted = &metav1.Time{ - Time: time.Now().Add(-nodeEquilibriumWaitLimit + time.Second), - } - - _, err = fakeTransitioner.Autoscaling.AttachInstances(&autoscaling.AttachInstancesInput{ - AutoScalingGroupName: aws.String("ng-1"), - InstanceIds: aws.StringSlice([]string{nodegroup[0].InstanceID}), + // Keep the node detached + _, err = fakeTransitioner.Autoscaling.DetachInstances(&autoscaling.DetachInstancesInput{ + InstanceIds: aws.StringSlice([]string{nodegroup[0].InstanceID}), }) assert.NoError(t, err) - // "re-attach" the instance to the asg - fakeTransitioner.cloudProviderInstances[0].Nodegroup = "ng-1" + output, err = fakeTransitioner.Autoscaling.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{}) + assert.NoError(t, err) + assert.Equal(t, 1, len(output.AutoScalingGroups)) + assert.Equal(t, 1, len(output.AutoScalingGroups[0].Instances)) - // The CNR should transition to the Initialised phase because the state of - // the nodes is now correct and this happened within the timeout period. + // Simulate waiting for 1s more than the wait limit + cnr.Status.EquilibriumWaitStarted = &metav1.Time{ + Time: time.Now().Add(-nodeEquilibriumWaitLimit - time.Second), + } + + // This time should transition to the healing phase _, err = fakeTransitioner.Run() - assert.NoError(t, err) - assert.Equal(t, v1.CycleNodeRequestInitialised, cnr.Status.Phase) - assert.Len(t, cnr.Status.NodesToTerminate, 2) + assert.Error(t, err) + assert.Equal(t, v1.CycleNodeRequestHealing, cnr.Status.Phase) } // Test to ensure that Cyclops will not proceed if there is node detached from @@ -466,16 +596,6 @@ func TestPendingReattachedCloudProviderNodeTooLate(t *testing.T) { Time: time.Now().Add(-nodeEquilibriumWaitLimit - time.Second), } - _, err = fakeTransitioner.Autoscaling.AttachInstances(&autoscaling.AttachInstancesInput{ - AutoScalingGroupName: aws.String("ng-1"), - InstanceIds: aws.StringSlice([]string{nodegroup[0].InstanceID}), - }) - - assert.NoError(t, err) - - // "re-attach" the instance to the asg - fakeTransitioner.cloudProviderInstances[0].Nodegroup = "ng-1" - // This time should transition to the healing phase even though the state // is correct because the timeout check happens first _, err = fakeTransitioner.Run() diff --git a/pkg/controller/cyclenoderequest/transitioner/util.go b/pkg/controller/cyclenoderequest/transitioner/util.go index ef429bc..5a0ba55 100644 --- a/pkg/controller/cyclenoderequest/transitioner/util.go +++ b/pkg/controller/cyclenoderequest/transitioner/util.go @@ -3,9 +3,12 @@ package transitioner import ( "context" "fmt" + "strings" "time" + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" @@ -32,6 +35,35 @@ func (t *CycleNodeRequestTransitioner) transitionToFailed(err error) (reconcile. return t.transitionToUnsuccessful(v1.CycleNodeRequestFailed, err) } +// transitionToUnsuccessful transitions the current cycleNodeRequest to healing/failed +func (t *CycleNodeRequestTransitioner) transitionToUnsuccessful(phase v1.CycleNodeRequestPhase, err error) (reconcile.Result, error) { + t.cycleNodeRequest.Status.Phase = phase + // don't try to append message if it's nil + if err != nil { + if t.cycleNodeRequest.Status.Message != "" { + t.cycleNodeRequest.Status.Message += ", " + } + + t.cycleNodeRequest.Status.Message += err.Error() + } + + // handle conflicts before complaining + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + return t.rm.UpdateObject(t.cycleNodeRequest) + }); err != nil { + t.rm.Logger.Error(err, "unable to update cycleNodeRequest") + } + + // Notify that the cycling has transitioned phase + if t.rm.Notifier != nil { + if err := t.rm.Notifier.PhaseTransitioned(t.cycleNodeRequest); err != nil { + t.rm.Logger.Error(err, "Unable to post message to messaging provider", "phase", t.cycleNodeRequest.Status.Phase) + } + } + + return reconcile.Result{}, err +} + // transitionToSuccessful transitions the current cycleNodeRequest to successful func (t *CycleNodeRequestTransitioner) transitionToSuccessful() (reconcile.Result, error) { t.rm.LogEvent(t.cycleNodeRequest, "Successful", "Successfully cycled nodes") @@ -74,12 +106,15 @@ func (t *CycleNodeRequestTransitioner) equilibriumWaitTimedOut() (bool, error) { // If the timer isn't initialised, initialise it and save it to the object if t.cycleNodeRequest.Status.EquilibriumWaitStarted.IsZero() { t.rm.Logger.Info("started equilibrium wait") + currentTime := metav1.Now() t.cycleNodeRequest.Status.EquilibriumWaitStarted = ¤tTime + if err := t.rm.UpdateObject(t.cycleNodeRequest); err != nil { return false, err } } + return time.Now().After(t.cycleNodeRequest.Status.EquilibriumWaitStarted.Time.Add(nodeEquilibriumWaitLimit)), nil } @@ -295,31 +330,202 @@ func findProblemNodes(kubeNodes map[string]corev1.Node, nodeGroupInstances map[s return problemKubeNodes, problemNodegroupInstances } -// transitionToUnsuccessful transitions the current cycleNodeRequest to healing/failed -func (t *CycleNodeRequestTransitioner) transitionToUnsuccessful(phase v1.CycleNodeRequestPhase, err error) (reconcile.Result, error) { - t.cycleNodeRequest.Status.Phase = phase - // don't try to append message if it's nil +// reattachAnyDetachedInstances re-attaches any instances which are detached from the cloud +// provider nodegroups defined in the CNR using the cycling annotation to identify which one. +func (t *CycleNodeRequestTransitioner) reattachAnyDetachedInstances(nodesNotInCloudProviderNodegroup map[string]corev1.Node) error { + var nodeProviderIDs []string + + if len(nodesNotInCloudProviderNodegroup) == 0 { + return nil + } + + for _, node := range nodesNotInCloudProviderNodegroup { + nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID) + } + + existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs) if err != nil { - if t.cycleNodeRequest.Status.Message != "" { - t.cycleNodeRequest.Status.Message += ", " + return errors.Wrap(err, "failed to check instances that exist from cloud provider") + } + + nodeGroups, err := t.rm.CloudProvider.GetNodeGroups(t.cycleNodeRequest.GetNodeGroupNames()) + if err != nil { + return err + } + + for providerID, node := range nodesNotInCloudProviderNodegroup { + _, instanceExists := existingProviderIDs[providerID] + + if !instanceExists { + continue } - t.cycleNodeRequest.Status.Message += err.Error() + // The kube node is now established to be backed by an instance in the cloud provider + // that is detached from it's nodegroup. Use the nodegroup annotation from the kube + // node set as part of a previous cycle to re-attach it. + nodegroupName, err := t.rm.GetNodegroupFromNodeAnnotation(node.Name) + + // If there is an error because the kube node no longer exists then simply skip any + // more action on the node and go to the next one. This case can be fixed in the next + // run of the Pending phase. + if apierrors.IsNotFound(err) { + continue + } + + // Otherwise error out to end the cycle. This includes if the cycling annotation is + // missing since there is no link between the original cloud provider instance and it's + // nodegroup so there's no way to re-attach it. + if err != nil { + return err + } + + // AttachInstance does not error out if the instance does not exist so no need to handle + // it here. Error out on any error that can't be fixed by repeating the attempts to fix + // the instance state. + alreadyAttached, err := nodeGroups.AttachInstance(node.Spec.ProviderID, nodegroupName) + if err != nil && !alreadyAttached { + return err + } } - // handle conflicts before complaining - if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - return t.rm.UpdateObject(t.cycleNodeRequest) - }); err != nil { - t.rm.Logger.Error(err, "unable to update cycleNodeRequest") + return nil +} + +// deleteAnyOrphanedKubeNodes filters through the kube nodes without instances in the cloud provider +// nodegroups which don't have an instance in the cloud provider at all. It removes the cycling +// finalizer and deletes the node. +func (t *CycleNodeRequestTransitioner) deleteAnyOrphanedKubeNodes(nodesNotInCloudProviderNodegroup map[string]corev1.Node) error { + var nodeProviderIDs []string + + if len(nodesNotInCloudProviderNodegroup) == 0 { + return nil } - // Notify that the cycling has transitioned phase - if t.rm.Notifier != nil { - if err := t.rm.Notifier.PhaseTransitioned(t.cycleNodeRequest); err != nil { - t.rm.Logger.Error(err, "Unable to post message to messaging provider", "phase", t.cycleNodeRequest.Status.Phase) + for _, node := range nodesNotInCloudProviderNodegroup { + nodeProviderIDs = append(nodeProviderIDs, node.Spec.ProviderID) + } + + existingProviderIDs, err := t.rm.CloudProvider.InstancesExist(nodeProviderIDs) + if err != nil { + return errors.Wrap(err, "failed to check instances that exist from cloud provider") + } + + // Find all the orphaned kube nodes from the set of nodes without matching instance in the + // cloud provider nodegroup. + for providerID, node := range nodesNotInCloudProviderNodegroup { + _, instanceExists := existingProviderIDs[providerID] + + if instanceExists { + continue + } + + // The kube node is now established to be orphaned. Check the finalizers on the node + // object and ensure that only the Cyclops finalizer exists on it. If another finalizer + // exists then it's from another controller and it should not be deleted. + containsNonCyclingFinalizer, err := t.rm.NodeContainsNonCyclingFinalizer(node.Name) + if err != nil { + return errors.Wrap(err, + fmt.Sprintf("failed to check if node %s contains a non-cycling finalizer", node.Name), + ) + } + + if containsNonCyclingFinalizer { + return fmt.Errorf("can't delete node %s because it contains non-cycling finalizers: %v", + node.Name, + node.Finalizers, + ) + } + + // The cycling finalizer is the only one on the node so remove it. + if err := t.rm.RemoveFinalizerFromNode(node.Name); err != nil { + return err + } + + // Delete the node to ensure it gets removed from kube. It is possible that the finalizer + // was preventing the node object from being deleted and the node is deleted by the time + // this delete call is reached. Don't if the node is already deleted since that is desired + // effect. + if err := t.rm.DeleteNode(node.Name); err != nil { + return err } } - return reconcile.Result{}, err + return nil +} + +// errorIfEquilibriumTimeoutReached reduces the footprint of this check in the +// Pending transition +func (t *CycleNodeRequestTransitioner) errorIfEquilibriumTimeoutReached() error { + timedOut, err := t.equilibriumWaitTimedOut() + if err != nil { + return err + } + + if timedOut { + return fmt.Errorf( + "node count mismatch, number of kubernetes nodes does not match number of cloud provider instances after %v", + nodeEquilibriumWaitLimit, + ) + } + + return nil +} + +// logProblemNodes generates event message describing any issues with the node state prior +// to cycling. +func (t *CycleNodeRequestTransitioner) logProblemNodes(nodesNotInCloudProviderNodegroup map[string]corev1.Node, instancesNotInKube map[string]cloudprovider.Instance) { + var offendingNodesInfo string + + if len(nodesNotInCloudProviderNodegroup) > 0 { + providerIDs := make([]string, 0) + + for providerID := range nodesNotInCloudProviderNodegroup { + providerIDs = append(providerIDs, + fmt.Sprintf("id %q", providerID), + ) + } + + offendingNodesInfo += "nodes not in node group: " + offendingNodesInfo += strings.Join(providerIDs, ",") + } + + if len(instancesNotInKube) > 0 { + if offendingNodesInfo != "" { + offendingNodesInfo += ";" + } + + providerIDs := make([]string, 0) + + for providerID, node := range instancesNotInKube { + providerIDs = append(providerIDs, + fmt.Sprintf("id %q in %q", providerID, node.NodeGroupName()), + ) + } + + offendingNodesInfo += "nodes not inside cluster: " + offendingNodesInfo += strings.Join(providerIDs, ",") + } + + t.rm.LogEvent(t.cycleNodeRequest, "NodeStateInvalid", + "instances missing: %v, kube nodes missing: %v. %v", + len(nodesNotInCloudProviderNodegroup), len(instancesNotInKube), offendingNodesInfo, + ) +} + +// validateInstanceState performs final validation on the nodegroup to ensure +// that all the cloud provider instances are ready in the nodegroup. +func (t *CycleNodeRequestTransitioner) validateInstanceState(validNodeGroupInstances map[string]cloudprovider.Instance) (bool, error) { + nodeGroups, err := t.rm.CloudProvider.GetNodeGroups( + t.cycleNodeRequest.GetNodeGroupNames(), + ) + + if err != nil { + return false, err + } + + if len(nodeGroups.ReadyInstances()) == len(validNodeGroupInstances) { + return true, nil + } + + return false, nil } diff --git a/pkg/controller/node.go b/pkg/controller/node.go index ababb3b..d992b17 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -2,6 +2,7 @@ package controller import ( "context" + "fmt" "time" "github.com/atlassian-labs/cyclops/pkg/k8s" @@ -15,7 +16,8 @@ import ( ) const ( - nodeFinalizerName = "cyclops.atlassian.com/finalizer" + nodeFinalizerName = "cyclops.atlassian.com/finalizer" + nodegroupAnnotationName = "cyclops.atlassian.com/nodegroup" ) // ListNodes lists nodes from Kubernetes, optionally filtered by a selector. @@ -96,6 +98,36 @@ func (rm *ResourceManager) DrainPods(nodeName string, unhealthyAfter time.Durati return false, k8s.DrainPods(pods, rm.RawClient, unhealthyAfter) } +func (rm *ResourceManager) AddNodegroupAnnotationToNode(nodeName, nodegroupName string) error { + return rm.AddAnnotationToNode(nodeName, nodegroupAnnotationName, nodegroupName) +} + +func (rm *ResourceManager) GetNodegroupFromNodeAnnotation(nodeName string) (string, error) { + // Get the node + node, err := rm.GetNode(nodeName) + if err != nil { + return "", err + } + + nodegroupName, exists := node.Annotations[nodegroupAnnotationName] + + if !exists { + return "", fmt.Errorf("node %s does not contain the %s annotation", + nodeName, + nodegroupAnnotationName, + ) + } + + return nodegroupName, nil +} + +func (rm *ResourceManager) AddAnnotationToNode(nodeName, annotationName, annotationValue string) error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { + // Let the caller account for the node not being found + return k8s.AddAnnotationToNode(nodeName, annotationName, annotationValue, rm.RawClient) + }) +} + func (rm *ResourceManager) AddFinalizerToNode(nodeName string) error { return rm.manageFinalizerOnNode(nodeName, k8s.AddFinalizerToNode) } @@ -104,6 +136,24 @@ func (rm *ResourceManager) RemoveFinalizerFromNode(nodeName string) error { return rm.manageFinalizerOnNode(nodeName, k8s.RemoveFinalizerFromNode) } +func (rm *ResourceManager) NodeContainsNonCyclingFinalizer(nodeName string) (bool, error) { + node, err := rm.GetNode(nodeName) + + // If the node is not found then skip the finalizer check + if err != nil && apierrors.IsNotFound(err) { + rm.Logger.Info("node deleted, skip adding finalizer", "node", nodeName) + return false, nil + } + + for _, finalizer := range node.Finalizers { + if finalizer != nodeFinalizerName { + return true, nil + } + } + + return false, nil +} + func (rm *ResourceManager) manageFinalizerOnNode(nodeName string, fn func(*v1.Node, string, kubernetes.Interface) error) error { return retry.RetryOnConflict(retry.DefaultRetry, func() error { // Get the node diff --git a/pkg/k8s/node.go b/pkg/k8s/node.go index 4fa59b8..83ec2eb 100644 --- a/pkg/k8s/node.go +++ b/pkg/k8s/node.go @@ -61,6 +61,19 @@ func AddLabelToNode(nodeName string, labelName string, labelValue string, client return PatchNode(nodeName, patches, client) } +// AddAnnotationToNode performs a patch operation on a node to add an annotation to the node +func AddAnnotationToNode(nodeName string, annotationName string, annotationValue string, client kubernetes.Interface) error { + patches := []Patch{ + { + Op: "add", + // json patch spec maps "~1" to "/" as an escape sequence, so we do the translation here... + Path: fmt.Sprintf("/metadata/annotations/%s", strings.Replace(annotationName, "/", "~1", -1)), + Value: annotationValue, + }, + } + return PatchNode(nodeName, patches, client) +} + // AddFinalizerToNode updates a node to add a finalizer to it func AddFinalizerToNode(node *v1.Node, finalizerName string, client kubernetes.Interface) error { controllerutil.AddFinalizer(node, finalizerName) diff --git a/pkg/mock/client.go b/pkg/mock/client.go index 0763af1..9505f6c 100644 --- a/pkg/mock/client.go +++ b/pkg/mock/client.go @@ -26,13 +26,16 @@ import ( type Node struct { Name string - LabelKey string - LabelValue string Creation time.Time Tainted bool Nodegroup string InstanceID string + LabelKey string + LabelValue string + AnnotationKey string + AnnotationValue string + NodeReady corev1.ConditionStatus CPU int64 @@ -151,6 +154,9 @@ func buildKubeNode(node *Node) *corev1.Node { Labels: map[string]string{ node.LabelKey: node.LabelValue, }, + Annotations: map[string]string{ + node.AnnotationKey: node.AnnotationValue, + }, CreationTimestamp: metav1.NewTime(node.Creation), }, Spec: corev1.NodeSpec{ diff --git a/pkg/mock/test_helpers.go b/pkg/mock/test_helpers.go index decbd91..5efaf9b 100644 --- a/pkg/mock/test_helpers.go +++ b/pkg/mock/test_helpers.go @@ -22,7 +22,7 @@ func GenerateRandomInstanceId() (string, error) { return "i-" + hexString, nil } -func NewNodegroup(name string, num int) ([]*Node, error) { +func NewNodegroup(nodegroupName string, num int) ([]*Node, error) { nodes := make([]*Node, 0) for i := 0; i < num; i++ { @@ -32,13 +32,15 @@ func NewNodegroup(name string, num int) ([]*Node, error) { } node := &Node{ - Name: fmt.Sprintf("%s-node-%d", name, i), + Name: fmt.Sprintf("%s-node-%d", nodegroupName, i), LabelKey: "customer", LabelValue: "kitt", + AnnotationKey: "cyclops.atlassian.com/nodegroup", + AnnotationValue: nodegroupName, Creation: time.Now(), Tainted: false, NodeReady: corev1.ConditionTrue, - Nodegroup: name, + Nodegroup: nodegroupName, InstanceID: instanceID, CloudProviderState: "running", }