Skip to content

Commit

Permalink
Merge pull request #295 from enxebre/fix-1796595-4.2
Browse files Browse the repository at this point in the history
Bug 1796595: [release-4.2] Mitigate aws eventual consistency issues
  • Loading branch information
openshift-merge-robot authored Feb 20, 2020
2 parents 14901e6 + 4f7fbd0 commit 75ed797
Show file tree
Hide file tree
Showing 50 changed files with 11,489 additions and 286 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
k8s.io/klog v0.3.1
k8s.io/kube-openapi v0.0.0-20190709113604-33be087ad058 // indirect
k8s.io/utils v0.0.0-20190506122338-8fab8cb257d5
sigs.k8s.io/controller-runtime v0.0.0-20190520212815-96b67f231945
sigs.k8s.io/controller-tools v0.2.0
)
145 changes: 79 additions & 66 deletions pkg/actuators/machine/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,25 @@ import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/golang/glog"

clusterv1 "github.com/openshift/cluster-api/pkg/apis/cluster/v1alpha1"
machinev1 "github.com/openshift/cluster-api/pkg/apis/machine/v1beta1"
clustererror "github.com/openshift/cluster-api/pkg/controller/error"
machinecontroller "github.com/openshift/cluster-api/pkg/controller/machine"
mapierrors "github.com/openshift/cluster-api/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimachineryerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
errorutil "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/klog"

clusterv1 "github.com/openshift/cluster-api/pkg/apis/cluster/v1alpha1"
machinev1 "github.com/openshift/cluster-api/pkg/apis/machine/v1beta1"
clustererror "github.com/openshift/cluster-api/pkg/controller/error"
apierrors "github.com/openshift/cluster-api/pkg/errors"
providerconfigv1 "sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsproviderconfig/v1beta1"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/ec2"

awsclient "sigs.k8s.io/cluster-api-provider-aws/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client"

machinecontroller "github.com/openshift/cluster-api/pkg/controller/machine"
)

const (
Expand Down Expand Up @@ -99,12 +95,12 @@ const (

// Set corresponding event based on error. It also returns the original error
// for convenience, so callers can do "return handleMachineError(...)".
func (a *Actuator) handleMachineError(machine *machinev1.Machine, err *apierrors.MachineError, eventAction string) error {
func (a *Actuator) handleMachineError(machine *machinev1.Machine, err error, eventAction string) error {
if eventAction != noEventAction {
a.eventRecorder.Eventf(machine, corev1.EventTypeWarning, "Failed"+eventAction, "%v", err.Reason)
a.eventRecorder.Eventf(machine, corev1.EventTypeWarning, "Failed"+eventAction, "%v", err)
}

glog.Errorf("%s: Machine error: %v", machine.Name, err.Message)
glog.Errorf("%s: Machine error: %v", machine.Name, err)
return err
}

Expand Down Expand Up @@ -134,7 +130,6 @@ func (a *Actuator) Create(context context.Context, cluster *clusterv1.Cluster, m
return a.updateStatus(updatedMachine, instance)
}

// updateProviderID adds providerID in the machine spec
func (a *Actuator) setMachineCloudProviderSpecifics(machine *machinev1.Machine, instance *ec2.Instance) (*machinev1.Machine, error) {
if instance == nil {
return machine, nil
Expand Down Expand Up @@ -267,7 +262,7 @@ func (a *Actuator) updateMachineProviderConditions(machine *machinev1.Machine, c
func (a *Actuator) CreateMachine(cluster *clusterv1.Cluster, machine *machinev1.Machine) (*ec2.Instance, error) {
machineProviderConfig, err := providerConfigFromMachine(machine, a.codec)
if err != nil {
return nil, a.handleMachineError(machine, apierrors.InvalidMachineConfiguration("error decoding MachineProviderConfig: %v", err), createEventAction)
return nil, a.handleMachineError(machine, mapierrors.InvalidMachineConfiguration("error decoding MachineProviderConfig: %v", err), createEventAction)
}

credentialsSecretName := ""
Expand All @@ -276,8 +271,7 @@ func (a *Actuator) CreateMachine(cluster *clusterv1.Cluster, machine *machinev1.
}
awsClient, err := a.awsClientBuilder(a.client, credentialsSecretName, machine.Namespace, machineProviderConfig.Placement.Region)
if err != nil {
glog.Errorf("%s: unable to obtain AWS client: %v", machine.Name, err)
return nil, a.handleMachineError(machine, apierrors.CreateMachine("error creating aws services: %v", err), createEventAction)
return nil, a.handleMachineError(machine, err, createEventAction)
}

// We explicitly do NOT want to remove stopped masters.
Expand All @@ -300,34 +294,45 @@ func (a *Actuator) CreateMachine(cluster *clusterv1.Cluster, machine *machinev1.
}
}

userData := []byte{}
if machineProviderConfig.UserDataSecret != nil {
var userDataSecret corev1.Secret
err := a.client.Get(context.Background(), client.ObjectKey{Namespace: machine.Namespace, Name: machineProviderConfig.UserDataSecret.Name}, &userDataSecret)
if err != nil {
return nil, a.handleMachineError(machine, apierrors.CreateMachine("error getting user data secret %s: %v", machineProviderConfig.UserDataSecret.Name, err), createEventAction)
}
if data, exists := userDataSecret.Data[userDataSecretKey]; exists {
userData = data
} else {
glog.Warningf("%s: Secret %v/%v does not have %q field set. Thus, no user data applied when creating an instance.", machine.Name, machine.Namespace, machineProviderConfig.UserDataSecret.Name, userDataSecretKey)
}
userData, err := a.getUserData(machine, machineProviderConfig)
if err != nil {
return nil, err
}

instance, err := launchInstance(machine, machineProviderConfig, userData, awsClient)
if err != nil {
return nil, a.handleMachineError(machine, apierrors.CreateMachine("error launching instance: %v", err), createEventAction)
return nil, a.handleMachineError(machine, err, createEventAction)
}

err = a.updateLoadBalancers(awsClient, machineProviderConfig, instance, machine.Name)
if err != nil {
return nil, a.handleMachineError(machine, apierrors.CreateMachine("error updating load balancers: %v", err), createEventAction)
return nil, a.handleMachineError(machine, err, createEventAction)
}

a.eventRecorder.Eventf(machine, corev1.EventTypeNormal, "Created", "Created Machine %v", machine.Name)
return instance, nil
}

func (a *Actuator) getUserData(machine *machinev1.Machine, machineProviderConfig *providerconfigv1.AWSMachineProviderConfig) ([]byte, error) {
if machineProviderConfig.UserDataSecret == nil {
return []byte{}, nil
}

var userDataSecret corev1.Secret
err := a.client.Get(context.Background(), client.ObjectKey{Namespace: machine.Namespace, Name: machineProviderConfig.UserDataSecret.Name}, &userDataSecret)
if err != nil {
if apimachineryerrors.IsNotFound(err) {
return nil, a.handleMachineError(machine, mapierrors.InvalidMachineConfiguration("user data secret %s: %v not found", machineProviderConfig.UserDataSecret.Name, err), createEventAction)
}
return nil, a.handleMachineError(machine, mapierrors.CreateMachine("error getting user data secret %s: %v", machineProviderConfig.UserDataSecret.Name, err), createEventAction)
}
userData, exists := userDataSecret.Data[userDataSecretKey]
if !exists {
return nil, a.handleMachineError(machine, mapierrors.InvalidMachineConfiguration("%s: Secret %v/%v does not have %q field set. Thus, no user data applied when creating an instance.", machine.Name, machine.Namespace, machineProviderConfig.UserDataSecret.Name, userDataSecretKey), createEventAction)
}
return userData, nil
}

// Delete deletes a machine and updates its finalizer
func (a *Actuator) Delete(context context.Context, cluster *clusterv1.Cluster, machine *machinev1.Machine) error {
glog.Infof("%s: deleting machine", machine.Name)
Expand All @@ -352,7 +357,7 @@ func (gl *glogLogger) Logf(format string, v ...interface{}) {
func (a *Actuator) DeleteMachine(cluster *clusterv1.Cluster, machine *machinev1.Machine) error {
machineProviderConfig, err := providerConfigFromMachine(machine, a.codec)
if err != nil {
return a.handleMachineError(machine, apierrors.InvalidMachineConfiguration("error decoding MachineProviderConfig: %v", err), deleteEventAction)
return a.handleMachineError(machine, mapierrors.InvalidMachineConfiguration("error decoding MachineProviderConfig: %v", err), deleteEventAction)
}

region := machineProviderConfig.Placement.Region
Expand All @@ -362,9 +367,7 @@ func (a *Actuator) DeleteMachine(cluster *clusterv1.Cluster, machine *machinev1.
}
client, err := a.awsClientBuilder(a.client, credentialsSecretName, machine.Namespace, region)
if err != nil {
errMsg := fmt.Errorf("%s: error getting EC2 client: %v", machine.Name, err)
glog.Error(errMsg)
return errMsg
return a.handleMachineError(machine, err, deleteEventAction)
}

// Get all instances not terminated.
Expand All @@ -382,7 +385,7 @@ func (a *Actuator) DeleteMachine(cluster *clusterv1.Cluster, machine *machinev1.

terminatingInstances, err := terminateInstances(client, existingInstances)
if err != nil {
return a.handleMachineError(machine, apierrors.DeleteMachine(err.Error()), noEventAction)
return a.handleMachineError(machine, mapierrors.DeleteMachine(err.Error()), noEventAction)
}

if len(terminatingInstances) == 1 {
Expand All @@ -406,7 +409,7 @@ func (a *Actuator) Update(context context.Context, cluster *clusterv1.Cluster, m

machineProviderConfig, err := providerConfigFromMachine(machine, a.codec)
if err != nil {
return a.handleMachineError(machine, apierrors.InvalidMachineConfiguration("error decoding MachineProviderConfig: %v", err), updateEventAction)
return a.handleMachineError(machine, mapierrors.InvalidMachineConfiguration("error decoding MachineProviderConfig: %v", err), updateEventAction)
}

region := machineProviderConfig.Placement.Region
Expand All @@ -417,12 +420,10 @@ func (a *Actuator) Update(context context.Context, cluster *clusterv1.Cluster, m
}
client, err := a.awsClientBuilder(a.client, credentialsSecretName, machine.Namespace, region)
if err != nil {
errMsg := fmt.Errorf("%s: error getting EC2 client: %v", machine.Name, err)
glog.Error(errMsg)
return errMsg
return a.handleMachineError(machine, err, updateEventAction)
}
// Get all instances not terminated.
existingInstances, err := getExistingInstances(machine, client)
existingInstances, err := a.getMachineInstances(cluster, machine)
if err != nil {
glog.Errorf("%s: error getting existing instances: %v", machine.Name, err)
return err
Expand All @@ -433,9 +434,14 @@ func (a *Actuator) Update(context context.Context, cluster *clusterv1.Cluster, m
// Parent controller should prevent this from ever happening by calling Exists and then Create,
// but instance could be deleted between the two calls.
if existingLen == 0 {
if machine.Spec.ProviderID != nil && *machine.Spec.ProviderID != "" && (machine.Status.LastUpdated == nil || machine.Status.LastUpdated.Add(requeueAfterSeconds*time.Second).After(time.Now())) {
glog.Infof("%s: Possible eventual-consistency discrepancy; returning an error to requeue", machine.Name)
return &clustererror.RequeueAfterError{RequeueAfter: requeueAfterSeconds * time.Second}
}

glog.Warningf("%s: attempted to update machine but no instances found", machine.Name)

a.handleMachineError(machine, apierrors.UpdateMachine("no instance found, reason unknown"), updateEventAction)
a.handleMachineError(machine, mapierrors.UpdateMachine("no instance found, reason unknown"), updateEventAction)

// Update status to clear out machine details.
if err := a.updateStatus(machine, nil); err != nil {
Expand All @@ -457,7 +463,7 @@ func (a *Actuator) Update(context context.Context, cluster *clusterv1.Cluster, m

err = a.updateLoadBalancers(client, machineProviderConfig, newestInstance, machine.Name)
if err != nil {
a.handleMachineError(machine, apierrors.CreateMachine("Error updating load balancers: %v", err), updateEventAction)
a.handleMachineError(machine, mapierrors.CreateMachine("Error updating load balancers: %v", err), updateEventAction)
return err
}
} else {
Expand All @@ -484,24 +490,11 @@ func (a *Actuator) Update(context context.Context, cluster *clusterv1.Cluster, m
return a.updateStatus(machine, newestInstance)
}

// Exists determines if the given machine currently exists. For AWS we query for instances in
// running state, with a matching name tag, to determine a match.
// Exists determines if the given machine currently exists.
// A machine which is not terminated is considered as existing.
func (a *Actuator) Exists(context context.Context, cluster *clusterv1.Cluster, machine *machinev1.Machine) (bool, error) {
glog.Infof("%s: Checking if machine exists", machine.Name)

instances, err := a.getMachineInstances(cluster, machine)
if err != nil {
glog.Errorf("%s: Error getting running instances: %v", machine.Name, err)
return false, err
}
if len(instances) == 0 {
glog.Infof("%s: Instance does not exist", machine.Name)
return false, nil
}

// If more than one result was returned, it will be handled in Update.
glog.Infof("%s: Instance exists as %q", machine.Name, *instances[0].InstanceId)
return true, nil
instance, err := a.Describe(cluster, machine)
return instance != nil, err
}

// Describe provides information about machine's instance(s)
Expand All @@ -510,10 +503,15 @@ func (a *Actuator) Describe(cluster *clusterv1.Cluster, machine *machinev1.Machi

instances, err := a.getMachineInstances(cluster, machine)
if err != nil {
glog.Errorf("%s: Error getting running instances: %v", machine.Name, err)
glog.Errorf("%s: Error getting existing instances: %v", machine.Name, err)
return nil, err
}
if len(instances) == 0 {
if machine.Spec.ProviderID != nil && *machine.Spec.ProviderID != "" && (machine.Status.LastUpdated == nil || machine.Status.LastUpdated.Add(requeueAfterSeconds*time.Second).After(time.Now())) {
glog.Infof("%s: Possible eventual-consistency discrepancy; returning an error to requeue", machine.Name)
return nil, &clustererror.RequeueAfterError{RequeueAfter: requeueAfterSeconds * time.Second}
}

glog.Infof("%s: Instance does not exist", machine.Name)
return nil, nil
}
Expand All @@ -535,9 +533,24 @@ func (a *Actuator) getMachineInstances(cluster *clusterv1.Cluster, machine *mach
}
client, err := a.awsClientBuilder(a.client, credentialsSecretName, machine.Namespace, region)
if err != nil {
errMsg := fmt.Sprintf("%s: Error getting EC2 client: %v", machine.Name, err)
glog.Errorf(errMsg)
return nil, fmt.Errorf(errMsg)
glog.Errorf("%s: Error getting EC2 client: %v", machine.Name, err)
return nil, err
}

status := &providerconfigv1.AWSMachineProviderStatus{}
err = a.codec.DecodeProviderStatus(machine.Status.ProviderStatus, status)

// If the status was decoded successfully, and there is a non-empty instance
// ID, search using that, otherwise fallback to filtering based on tags.
if err == nil && status.InstanceID != nil && *status.InstanceID != "" {
i, err := getExistingInstanceByID(*status.InstanceID, client)
if err != nil {
glog.Warningf("%s: Failed to find existing instance by id %s: %v",
machine.Name, *status.InstanceID, err)
} else {
glog.Infof("%s: Found instance by id: %s", machine.Name, *status.InstanceID)
return []*ec2.Instance{i}, nil
}
}

return getExistingInstances(machine, client)
Expand Down
Loading

0 comments on commit 75ed797

Please sign in to comment.