Skip to content

Commit

Permalink
Merge pull request #5 from kianjones4/throttle-fix
Browse files Browse the repository at this point in the history
Throttle fix
  • Loading branch information
kianjones4 authored Nov 12, 2019
2 parents 611ed51 + 9912cd3 commit c11d6f0
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 96 deletions.
2 changes: 1 addition & 1 deletion config/default/manager_image_patch.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ spec:
spec:
containers:
# Change the value of image field below to your controller image URL
- image: keikoproj/rolling-upgrade-controller:0.1-dev
- image: keikoproj/rolling-upgrade-controller:0.2-dev
name: manager
58 changes: 34 additions & 24 deletions controllers/rollingupgrade_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,27 @@ import (
"context"
"errors"
"fmt"
"k8s.io/apimachinery/pkg/util/intstr"
"log"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/intstr"

corev1 "k8s.io/api/core/v1"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
awsclient "github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"

"github.com/go-logr/logr"
"github.com/keikoproj/upgrade-manager/pkg/log"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -75,6 +76,14 @@ const (
ShellBinary = "/bin/sh"
)

var DefaultRetryer = awsclient.DefaultRetryer{
NumMaxRetries: 250,
MinThrottleDelay: time.Second * 5,
MaxThrottleDelay: time.Second * 20,
MinRetryDelay: time.Second * 1,
MaxRetryDelay: time.Second * 5,
}

// RollingUpgradeReconciler reconciles a RollingUpgrade object
type RollingUpgradeReconciler struct {
client.Client
Expand All @@ -89,7 +98,6 @@ type RollingUpgradeReconciler struct {

func runScript(script string, background bool, objName string) (string, error) {
log.Printf("%s: Running script %s", objName, script)

if background {
log.Printf("%s: Running script in background. Logs not available.", objName)
exec.Command(ShellBinary, "-c", script).Run()
Expand Down Expand Up @@ -212,20 +220,23 @@ func (r *RollingUpgradeReconciler) CallKubectlDrain(ctx context.Context, nodeNam
}

// TerminateNode actually terminates the given node.
func (r *RollingUpgradeReconciler) TerminateNode(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string, svc ec2iface.EC2API) error {
func (r *RollingUpgradeReconciler) TerminateNode(ruObj *upgrademgrv1alpha1.RollingUpgrade, instanceID string, svc autoscalingiface.AutoScalingAPI) error {

input := &ec2.TerminateInstancesInput{
InstanceIds: []*string{
aws.String(instanceID),
},
input := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instanceID),
ShouldDecrementDesiredCapacity: aws.Bool(false),
}

result, err := svc.TerminateInstances(input)
result, err := svc.TerminateInstanceInAutoScalingGroup(input)
if err != nil {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case "InvalidInstanceID.NotFound":
log.Printf("Instance %s not found. Moving on\n", instanceID)
case autoscaling.ErrCodeScalingActivityInProgressFault:
log.Println(autoscaling.ErrCodeScalingActivityInProgressFault, aerr.Error())
case autoscaling.ErrCodeResourceContentionFault:
log.Println(autoscaling.ErrCodeResourceContentionFault, aerr.Error())
return nil
default:
log.Println(aerr.Error())
Expand Down Expand Up @@ -345,7 +356,7 @@ func loadEnvironmentVariables(ruObj *upgrademgrv1alpha1.RollingUpgrade, nodeInst
return nil
}

func (r *RollingUpgradeReconciler) runRestack(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade, svc ec2iface.EC2API, KubeCtlCall string) (int, error) {
func (r *RollingUpgradeReconciler) runRestack(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade, svc autoscalingiface.AutoScalingAPI, KubeCtlCall string) (int, error) {
// Setting default values for the Strategy in rollup object
r.setDefaultsForRollingUpdateStrategy(ruObj)

Expand Down Expand Up @@ -413,11 +424,15 @@ func (r *RollingUpgradeReconciler) Process(ctx *context.Context, ruObj *upgradem

r.setDefaults(ruObj)

sess, _ := session.NewSession(&aws.Config{
Region: aws.String(ruObj.Spec.Region)},
)
config := aws.NewConfig().WithRegion(ruObj.Spec.Region)
config = config.WithCredentialsChainVerboseErrors(true)
config = request.WithRetryer(config, log.NewRetryLogger(DefaultRetryer))
sess, err := session.NewSession(config)
if err != nil {
log.Fatalf("failed to create asg client, %v", err)
}
svc := autoscaling.New(sess)
err := r.populateAsg(ruObj, svc)
err = r.populateAsg(ruObj, svc)
if err != nil {
return r.finishExecution(StatusError, 0, ctx, ruObj)
}
Expand All @@ -443,13 +458,8 @@ func (r *RollingUpgradeReconciler) Process(ctx *context.Context, ruObj *upgradem
ruObj.Status.TotalNodes = len(asg.Instances)
r.Update(*ctx, ruObj)

ec2Sess, _ := session.NewSession(&aws.Config{
Region: aws.String(ruObj.Spec.Region)},
)
ec2Svc := ec2.New(ec2Sess)

// Run the restack that acutally performs the rolling update.
nodesProcessed, err := r.runRestack(ctx, ruObj, ec2Svc, KubeCtlBinary)
nodesProcessed, err := r.runRestack(ctx, ruObj, svc, KubeCtlBinary)
if err != nil {
return r.finishExecution(StatusError, nodesProcessed, ctx, ruObj)
}
Expand Down Expand Up @@ -623,7 +633,7 @@ func (r *RollingUpgradeReconciler) setDefaultsForRollingUpdateStrategy(ruObj *up

// RandomUpdate treats all the azs as a single unit and picks random nodes for update
// and rolls out the update based on the input parameters
func (r *RollingUpgradeReconciler) RandomUpdate(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade, svc ec2iface.EC2API, KubeCtlCall string) (int, error) {
func (r *RollingUpgradeReconciler) RandomUpdate(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade, svc autoscalingiface.AutoScalingAPI, KubeCtlCall string) (int, error) {

value, ok := r.ruObjNameToASG.Load(ruObj.Name)
if !ok {
Expand Down Expand Up @@ -694,7 +704,7 @@ func (r *RollingUpgradeReconciler) RandomUpdate(ctx *context.Context, ruObj *upg
return nodesProcessed, nil
}

func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade, i *autoscaling.Instance, currentLaunchConfigName string, KubeCtlCall string, svc ec2iface.EC2API, drainTimeout int, ch chan error) {
func (r *RollingUpgradeReconciler) UpdateInstance(ctx *context.Context, ruObj *upgrademgrv1alpha1.RollingUpgrade, i *autoscaling.Instance, currentLaunchConfigName string, KubeCtlCall string, svc autoscalingiface.AutoScalingAPI, drainTimeout int, ch chan error) {

targetLaunchConfigName := aws.StringValue(i.LaunchConfigurationName)
targetInstanceID := aws.StringValue(i.InstanceId)
Expand Down
Loading

0 comments on commit c11d6f0

Please sign in to comment.