Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AWS API Optimization - Update #124

Merged
merged 3 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1alpha1/instancegroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,9 @@ func (c *EKSConfiguration) GetTags() []map[string]string {
}
return c.Tags
}
func (c *EKSConfiguration) SetTags(tags []map[string]string) {
c.Tags = tags
}
func (c *EKSConfiguration) GetSubnets() []string {
if c.Subnets == nil {
return []string{}
Expand Down
21 changes: 21 additions & 0 deletions controllers/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,27 @@ func ContainsEqualFold(slice []string, s string) bool {
return false
}

func StringMapSliceContains(m []map[string]string, contains map[string]string) bool {
for _, obj := range m {
if reflect.DeepEqual(obj, contains) {
return true
}
}
return false
}

func StringSliceEqualFold(x []string, y []string) bool {
if len(x) != len(y) {
return false
}
for _, element := range x {
if !ContainsEqualFold(y, element) {
return false
}
}
return true
}

func SliceEmpty(slice []string) bool {
return len(slice) == 0
}
Expand Down
40 changes: 15 additions & 25 deletions controllers/providers/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,29 @@ func (w *AwsWorker) CreateScalingGroup(input *autoscaling.CreateAutoScalingGroup
return nil
}

func (w *AwsWorker) UpdateScalingGroup(input *autoscaling.UpdateAutoScalingGroupInput, upTags []*autoscaling.Tag, rmTags []*autoscaling.Tag) error {
_, err := w.AsgClient.CreateOrUpdateTags(&autoscaling.CreateOrUpdateTagsInput{
Tags: upTags,
})
if err != nil {
return err
func (w *AwsWorker) UpdateScalingGroupTags(add []*autoscaling.Tag, remove []*autoscaling.Tag) error {
if len(add) > 0 {
_, err := w.AsgClient.CreateOrUpdateTags(&autoscaling.CreateOrUpdateTagsInput{
Tags: add,
})
if err != nil {
return err
}
}

if len(rmTags) > 0 {
_, err = w.AsgClient.DeleteTags(&autoscaling.DeleteTagsInput{
Tags: rmTags,
if len(remove) > 0 {
_, err := w.AsgClient.DeleteTags(&autoscaling.DeleteTagsInput{
Tags: remove,
})
if err != nil {
return err
}
}
return nil
}

_, err = w.AsgClient.UpdateAutoScalingGroup(input)
func (w *AwsWorker) UpdateScalingGroup(input *autoscaling.UpdateAutoScalingGroupInput) error {
_, err := w.AsgClient.UpdateAutoScalingGroup(input)
if err != nil {
return err
}
Expand Down Expand Up @@ -516,21 +521,6 @@ func (w *AwsWorker) DescribeAutoscalingLaunchConfigs() ([]*autoscaling.LaunchCon
return launchConfigurations, nil
}

func (w *AwsWorker) GetAutoscalingGroup(name string) (*autoscaling.Group, error) {
var asg *autoscaling.Group
out, err := w.AsgClient.DescribeAutoScalingGroups(&autoscaling.DescribeAutoScalingGroupsInput{})
if err != nil {
return asg, err
}
for _, group := range out.AutoScalingGroups {
n := aws.StringValue(group.AutoScalingGroupName)
if strings.EqualFold(name, n) {
asg = group
}
}
return asg, nil
}

func (w *AwsWorker) EnableMetrics(asgName string, metrics []string) error {
if common.SliceEmpty(metrics) {
return nil
Expand Down
6 changes: 3 additions & 3 deletions controllers/providers/kubernetes/rollingupdate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package kubernetes

import (
awsprovider "github.com/keikoproj/instance-manager/controllers/providers/aws"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -31,7 +31,7 @@ var (

type RollingUpdateRequest struct {
AwsWorker awsprovider.AwsWorker
Kubernetes kubernetes.Interface
ClusterNodes *corev1.NodeList
ScalingGroupName string
MaxUnavailable int
DesiredCapacity int
Expand Down Expand Up @@ -61,7 +61,7 @@ func ProcessRollingUpgradeStrategy(req *RollingUpdateRequest) (bool, error) {
req.MaxUnavailable = req.DesiredCapacity
}

ok, err := IsMinNodesReady(req.Kubernetes, req.AllInstances, req.MaxUnavailable)
ok, err := IsMinNodesReady(req.ClusterNodes, req.AllInstances, req.MaxUnavailable)
if err != nil {
return false, err
}
Expand Down
14 changes: 2 additions & 12 deletions controllers/providers/kubernetes/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,7 @@ func GetUnstructuredInstanceGroup(instanceGroup *v1alpha1.InstanceGroup) (*unstr
return obj, nil
}

func IsDesiredNodesReady(kube kubernetes.Interface, instanceIds []string, desiredCount int) (bool, error) {
nodes, err := kube.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return false, err
}

func IsDesiredNodesReady(nodes *corev1.NodeList, instanceIds []string, desiredCount int) (bool, error) {
if len(instanceIds) != desiredCount {
return false, nil
}
Expand All @@ -72,12 +67,7 @@ func IsDesiredNodesReady(kube kubernetes.Interface, instanceIds []string, desire
return false, nil
}

func IsMinNodesReady(kube kubernetes.Interface, instanceIds []string, minCount int) (bool, error) {
nodes, err := kube.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return false, err
}

func IsMinNodesReady(nodes *corev1.NodeList, instanceIds []string, minCount int) (bool, error) {
// if count of instances in scaling group is not over min, requeue
if len(instanceIds) < minCount {
return false, nil
Expand Down
14 changes: 14 additions & 0 deletions controllers/provisioners/eks/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/keikoproj/instance-manager/api/v1alpha1"
kubeprovider "github.com/keikoproj/instance-manager/controllers/providers/kubernetes"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
Expand All @@ -32,6 +33,7 @@ import (
type DiscoveredState struct {
Provisioned bool
NodesReady bool
ClusterNodes *corev1.NodeList
OwnedScalingGroups []*autoscaling.Group
ScalingGroup *autoscaling.Group
LaunchConfigurations []*autoscaling.LaunchConfiguration
Expand Down Expand Up @@ -59,6 +61,12 @@ func (ctx *EksInstanceGroupContext) CloudDiscovery() error {
ResourceVersion: instanceGroup.GetResourceVersion(),
}

nodes, err := ctx.KubernetesClient.Kubernetes.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
return errors.Wrap(err, "failed to list cluster nodes")
}
state.SetClusterNodes(nodes)

var roleName, instanceProfileName string
if configuration.HasExistingRole() {
roleName = configuration.GetRoleName()
Expand Down Expand Up @@ -242,3 +250,9 @@ func (d *DiscoveredState) SetNodesReady(condition bool) {
func (d *DiscoveredState) IsNodesReady() bool {
return d.NodesReady
}
func (d *DiscoveredState) SetClusterNodes(nodes *corev1.NodeList) {
d.ClusterNodes = nodes
}
func (d *DiscoveredState) GetClusterNodes() *corev1.NodeList {
return d.ClusterNodes
}
1 change: 1 addition & 0 deletions controllers/provisioners/eks/eks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func MockScalingGroup(name string, t ...*autoscaling.TagDescription) *autoscalin
Tags: t,
MinSize: aws.Int64(3),
MaxSize: aws.Int64(6),
VPCZoneIdentifier: aws.String("subnet-1,subnet-2,subnet-3"),
}
}

Expand Down
20 changes: 10 additions & 10 deletions controllers/provisioners/eks/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,25 @@ func (ctx *EksInstanceGroupContext) GetAddedTags(asgName string) []*autoscaling.

func (ctx *EksInstanceGroupContext) GetRemovedTags(asgName string) []*autoscaling.Tag {
var (
existingTags []*autoscaling.Tag
removal []*autoscaling.Tag
state = ctx.GetDiscoveredState()
scalingGroup = state.GetScalingGroup()
addedTags = ctx.GetAddedTags(asgName)
)

// get existing tags
for _, tag := range scalingGroup.Tags {
existingTags = append(existingTags, ctx.AwsWorker.NewTag(aws.StringValue(tag.Key), aws.StringValue(tag.Value), asgName))
}

// find removals against incoming tags
for _, tag := range existingTags {
var match bool
for _, t := range addedTags {
if reflect.DeepEqual(t, tag) {
if aws.StringValue(t.Key) == aws.StringValue(tag.Key) {
match = true
}
}
if !match {
removal = append(removal, tag)
matchedTag := ctx.AwsWorker.NewTag(aws.StringValue(tag.Key), aws.StringValue(tag.Value), asgName)
removal = append(removal, matchedTag)
}
}

return removal
}

Expand Down Expand Up @@ -285,8 +280,13 @@ func (ctx *EksInstanceGroupContext) UpdateNodeReadyCondition() bool {
status = instanceGroup.GetStatus()
scalingGroup = state.GetScalingGroup()
desiredCount = int(aws.Int64Value(scalingGroup.DesiredCapacity))
nodes = state.GetClusterNodes()
)

if scalingGroup == nil {
return false
}

ctx.Log.Info("waiting for node readiness conditions", "instancegroup", instanceGroup.GetName())
if len(scalingGroup.Instances) != desiredCount {
// if instances don't match desired, a scaling activity is in progress
Expand All @@ -301,7 +301,7 @@ func (ctx *EksInstanceGroupContext) UpdateNodeReadyCondition() bool {
instances := strings.Join(instanceIds, ",")

var conditions []v1alpha1.InstanceGroupCondition
ok, err := kubeprovider.IsDesiredNodesReady(ctx.KubernetesClient.Kubernetes, instanceIds, desiredCount)
ok, err := kubeprovider.IsDesiredNodesReady(nodes, instanceIds, desiredCount)
if err != nil {
ctx.Log.Error(err, "could not update node conditions", "instancegroup", instanceGroup.GetName())
return false
Expand Down
101 changes: 83 additions & 18 deletions controllers/provisioners/eks/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package eks
import (
"fmt"
"reflect"
"strings"

"github.com/pkg/errors"

Expand Down Expand Up @@ -63,8 +64,7 @@ func (ctx *EksInstanceGroupContext) Update() error {

// we should try to bootstrap the role before we wait for nodes to be ready
// to avoid getting locked if someone made a manual change to aws-auth
err = ctx.BootstrapNodes()
if err != nil {
if err = ctx.BootstrapNodes(); err != nil {
ctx.Log.Info("failed to bootstrap role, will retry", "error", err, "instancegroup", instanceGroup.GetName())
}

Expand Down Expand Up @@ -93,25 +93,27 @@ func (ctx *EksInstanceGroupContext) UpdateScalingGroup() error {
rmTags = ctx.GetRemovedTags(asgName)
)

err := ctx.AwsWorker.UpdateScalingGroup(&autoscaling.UpdateAutoScalingGroupInput{
AutoScalingGroupName: aws.String(asgName),
LaunchConfigurationName: aws.String(state.GetActiveLaunchConfigurationName()),
MinSize: aws.Int64(spec.GetMinSize()),
MaxSize: aws.Int64(spec.GetMaxSize()),
VPCZoneIdentifier: aws.String(common.ConcatenateList(configuration.GetSubnets(), ",")),
}, tags, rmTags)
if err != nil {
return err
}
ctx.Log.Info("updated scaling group", "instancegroup", instanceGroup.GetName(), "scalinggroup", asgName)
if ctx.ScalingGroupUpdateNeeded() {
err := ctx.AwsWorker.UpdateScalingGroup(&autoscaling.UpdateAutoScalingGroupInput{
AutoScalingGroupName: aws.String(asgName),
LaunchConfigurationName: aws.String(state.GetActiveLaunchConfigurationName()),
MinSize: aws.Int64(spec.GetMinSize()),
MaxSize: aws.Int64(spec.GetMaxSize()),
VPCZoneIdentifier: aws.String(common.ConcatenateList(configuration.GetSubnets(), ",")),
})
if err != nil {
return err
}

scalingGroup, err = ctx.AwsWorker.GetAutoscalingGroup(asgName)
if err != nil {
return err
ctx.Log.Info("updated scaling group", "instancegroup", instanceGroup.GetName(), "scalinggroup", asgName)
}

if scalingGroup != nil {
state.SetScalingGroup(scalingGroup)
if ctx.TagsUpdateNeeded() {
err := ctx.AwsWorker.UpdateScalingGroupTags(tags, rmTags)
if err != nil {
return err
}
ctx.Log.Info("updated scaling group tags", "instancegroup", instanceGroup.GetName(), "scalinggroup", asgName)
}

if err := ctx.UpdateMetricsCollection(asgName); err != nil {
Expand Down Expand Up @@ -141,6 +143,69 @@ func (ctx *EksInstanceGroupContext) RotationNeeded() bool {
return false
}

func (ctx *EksInstanceGroupContext) TagsUpdateNeeded() bool {
var (
instanceGroup = ctx.GetInstanceGroup()
configuration = instanceGroup.GetEKSConfiguration()
state = ctx.GetDiscoveredState()
scalingGroup = state.GetScalingGroup()
asgName = aws.StringValue(scalingGroup.AutoScalingGroupName)
rmTags = ctx.GetRemovedTags(asgName)
)

if len(rmTags) > 0 {
return true
}

existingTags := make([]map[string]string, 0)
for _, tag := range scalingGroup.Tags {
tagSet := map[string]string{
"key": aws.StringValue(tag.Key),
"value": aws.StringValue(tag.Value),
}
existingTags = append(existingTags, tagSet)
}

for _, tag := range configuration.GetTags() {
if !common.StringMapSliceContains(existingTags, tag) {
return true
}
}

return false
}

func (ctx *EksInstanceGroupContext) ScalingGroupUpdateNeeded() bool {
var (
instanceGroup = ctx.GetInstanceGroup()
spec = instanceGroup.GetEKSSpec()
configuration = instanceGroup.GetEKSConfiguration()
state = ctx.GetDiscoveredState()
scalingGroup = state.GetScalingGroup()
zoneIdentifier = aws.StringValue(scalingGroup.VPCZoneIdentifier)
groupSubnets = strings.Split(zoneIdentifier, ",")
specSubnets = configuration.GetSubnets()
)

if state.GetActiveLaunchConfigurationName() != aws.StringValue(scalingGroup.LaunchConfigurationName) {
return true
}

if spec.GetMinSize() != aws.Int64Value(scalingGroup.MinSize) {
return true
}

if spec.GetMaxSize() != aws.Int64Value(scalingGroup.MaxSize) {
return true
}

if !common.StringSliceEqualFold(specSubnets, groupSubnets) {
return true
}

return false
}

func (ctx *EksInstanceGroupContext) LaunchConfigurationDrifted() bool {
var (
state = ctx.GetDiscoveredState()
Expand Down
Loading