Skip to content

Commit

Permalink
Centralize requests, replace "updates" with one "patch"
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Demichev committed Jan 22, 2020
1 parent c22ade5 commit e6a5716
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 107 deletions.
183 changes: 89 additions & 94 deletions pkg/actuators/machine/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ import (
machinecontroller "github.com/openshift/machine-api-operator/pkg/controller/machine"
mapierrors "github.com/openshift/machine-api-operator/pkg/controller/machine"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimachineryerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
errorutil "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -105,140 +103,121 @@ func (a *Actuator) handleMachineError(machine *machinev1.Machine, err error, eve
// Create runs a new EC2 instance
func (a *Actuator) Create(context context.Context, machine *machinev1.Machine) error {
glog.Infof("%s: creating machine", machine.Name)

machineToBePatched := client.MergeFrom(machine.DeepCopy())

instance, err := a.CreateMachine(machine)
if err != nil {
glog.Errorf("%s: error creating machine: %v", machine.Name, err)
updateConditionError := a.updateMachineProviderConditions(machine, providerconfigv1.MachineCreation, MachineCreationFailed, err.Error())
updateConditionError := a.setMachineProviderConditions(machine, providerconfigv1.MachineCreation, MachineCreationFailed, err.Error())
if updateConditionError != nil {
glog.Errorf("%s: error updating machine conditions: %v", machine.Name, updateConditionError)
}
patchErr := a.patchMachine(context, machine, machineToBePatched)
if patchErr != nil {
return fmt.Errorf("%s: failed to patch machine status: %v", machine.Name, err)
}
return err
}
updatedMachine, err := a.updateProviderID(machine, instance)

err = a.setProviderID(machine, instance)
if err != nil {
return fmt.Errorf("%s: failed to update machine object with providerID: %v", machine.Name, err)
}
machine = updatedMachine

updatedMachine, err = a.setMachineCloudProviderSpecifics(machine, instance)
err = a.setMachineCloudProviderSpecifics(machine, instance)
if err != nil {
return fmt.Errorf("%s: failed to set machine cloud provider specifics: %v", machine.Name, err)
}

return a.updateStatus(updatedMachine, instance)
err = a.setStatus(machine, instance)
if err != nil {
return fmt.Errorf("%s: failed to set machine status: %v", machine.Name, err)
}

err = a.patchMachine(context, machine, machineToBePatched)
if err != nil {
return fmt.Errorf("%s: failed to patch machine: %v", machine.Name, err)
}

return nil
}

func (a *Actuator) setMachineCloudProviderSpecifics(machine *machinev1.Machine, instance *ec2.Instance) (*machinev1.Machine, error) {
func (a *Actuator) setMachineCloudProviderSpecifics(machine *machinev1.Machine, instance *ec2.Instance) error {
if instance == nil {
return machine, nil
return nil
}
machineCopy := machine.DeepCopy()

if machineCopy.Labels == nil {
machineCopy.Labels = make(map[string]string)
if machine.Labels == nil {
machine.Labels = make(map[string]string)
}

if machineCopy.Annotations == nil {
machineCopy.Annotations = make(map[string]string)
if machine.Annotations == nil {
machine.Annotations = make(map[string]string)
}

// Reaching to machine provider config since the region is not directly
// providing by *ec2.Instance object
machineProviderConfig, err := providerConfigFromMachine(machine, a.codec)
if err != nil {
return nil, fmt.Errorf("error decoding MachineProviderConfig: %v", err)
return fmt.Errorf("error decoding MachineProviderConfig: %v", err)
}

machineCopy.Labels[machinecontroller.MachineRegionLabelName] = machineProviderConfig.Placement.Region
machine.Labels[machinecontroller.MachineRegionLabelName] = machineProviderConfig.Placement.Region

if instance.Placement != nil {
machineCopy.Labels[machinecontroller.MachineAZLabelName] = aws.StringValue(instance.Placement.AvailabilityZone)
machine.Labels[machinecontroller.MachineAZLabelName] = aws.StringValue(instance.Placement.AvailabilityZone)
}

if instance.InstanceType != nil {
machineCopy.Labels[machinecontroller.MachineInstanceTypeLabelName] = aws.StringValue(instance.InstanceType)
machine.Labels[machinecontroller.MachineInstanceTypeLabelName] = aws.StringValue(instance.InstanceType)
}

if instance.State != nil && instance.State.Name != nil {
machineCopy.Annotations[machinecontroller.MachineInstanceStateAnnotationName] = aws.StringValue(instance.State.Name)
}

if err := a.client.Update(context.Background(), machineCopy); err != nil {
return nil, fmt.Errorf("%s: error updating machine spec: %v", machine.Name, err)
machine.Annotations[machinecontroller.MachineInstanceStateAnnotationName] = aws.StringValue(instance.State.Name)
}

return machineCopy, nil
return nil
}

// updateProviderID adds providerID in the machine spec
func (a *Actuator) updateProviderID(machine *machinev1.Machine, instance *ec2.Instance) (*machinev1.Machine, error) {
// setProviderID adds providerID in the machine spec
func (a *Actuator) setProviderID(machine *machinev1.Machine, instance *ec2.Instance) error {
existingProviderID := machine.Spec.ProviderID
machineCopy := machine.DeepCopy()
if instance != nil {
availabilityZone := ""
if instance.Placement != nil {
availabilityZone = aws.StringValue(instance.Placement.AvailabilityZone)
}
providerID := fmt.Sprintf("aws:///%s/%s", availabilityZone, aws.StringValue(instance.InstanceId))
if instance == nil {
return nil
}
availabilityZone := ""
if instance.Placement != nil {
availabilityZone = aws.StringValue(instance.Placement.AvailabilityZone)
}
providerID := fmt.Sprintf("aws:///%s/%s", availabilityZone, aws.StringValue(instance.InstanceId))

if existingProviderID != nil && *existingProviderID == providerID {
glog.Infof("%s: ProviderID already set in the machine Spec with value:%s", machine.Name, *existingProviderID)
return machine, nil
}
machineCopy.Spec.ProviderID = &providerID
if err := a.client.Update(context.Background(), machineCopy); err != nil {
return nil, fmt.Errorf("%s: error updating machine spec ProviderID: %v", machine.Name, err)
}
glog.Infof("%s: ProviderID updated at machine spec: %s", machine.Name, providerID)
} else {
machineCopy.Spec.ProviderID = nil
if err := a.client.Update(context.Background(), machineCopy); err != nil {
return nil, fmt.Errorf("%s: error updating ProviderID in machine spec: %v", machine.Name, err)
}
glog.Infof("%s: No instance found so clearing ProviderID field in the machine spec", machine.Name)
if existingProviderID != nil && *existingProviderID == providerID {
glog.Infof("%s: ProviderID already set in the machine Spec with value:%s", machine.Name, *existingProviderID)
return nil
}
return machineCopy, nil
machine.Spec.ProviderID = &providerID
glog.Infof("%s: ProviderID set at machine spec: %s", machine.Name, providerID)
return nil
}

func (a *Actuator) updateMachineStatus(machine *machinev1.Machine, awsStatus *providerconfigv1.AWSMachineProviderStatus, networkAddresses []corev1.NodeAddress) error {
func (a *Actuator) setMachineStatus(machine *machinev1.Machine, awsStatus *providerconfigv1.AWSMachineProviderStatus, networkAddresses []corev1.NodeAddress) error {
awsStatusRaw, err := a.codec.EncodeProviderStatus(awsStatus)
if err != nil {
glog.Errorf("%s: error encoding AWS provider status: %v", machine.Name, err)
return err
}

machineCopy := machine.DeepCopy()
machineCopy.Status.ProviderStatus = awsStatusRaw
machine.Status.ProviderStatus = awsStatusRaw
if networkAddresses != nil {
machineCopy.Status.Addresses = networkAddresses
}

oldAWSStatus := &providerconfigv1.AWSMachineProviderStatus{}
if err := a.codec.DecodeProviderStatus(machine.Status.ProviderStatus, oldAWSStatus); err != nil {
glog.Errorf("%s: error updating machine status: %v", machine.Name, err)
return err
}

// TODO(vikasc): Revisit to compare complete machine status objects
if !equality.Semantic.DeepEqual(awsStatus, oldAWSStatus) || !equality.Semantic.DeepEqual(machine.Status.Addresses, machineCopy.Status.Addresses) {
glog.Infof("%s: machine status has changed, updating", machine.Name)
time := metav1.Now()
machineCopy.Status.LastUpdated = &time

if err := a.client.Status().Update(context.Background(), machineCopy); err != nil {
glog.Errorf("%s: error updating machine status: %v", machine.Name, err)
return err
}
} else {
glog.Infof("%s: status unchanged", machine.Name)
machine.Status.Addresses = networkAddresses
}

return nil
}

// updateMachineProviderConditions updates conditions set within machine provider status.
func (a *Actuator) updateMachineProviderConditions(machine *machinev1.Machine, conditionType providerconfigv1.AWSMachineProviderConditionType, reason string, msg string) error {

func (a *Actuator) setMachineProviderConditions(machine *machinev1.Machine, conditionType providerconfigv1.AWSMachineProviderConditionType, reason string, msg string) error {
glog.Infof("%s: updating machine conditions", machine.Name)

awsStatus := &providerconfigv1.AWSMachineProviderStatus{}
Expand All @@ -249,7 +228,7 @@ func (a *Actuator) updateMachineProviderConditions(machine *machinev1.Machine, c

awsStatus.Conditions = setAWSMachineProviderCondition(awsStatus.Conditions, conditionType, corev1.ConditionTrue, reason, msg, updateConditionIfReasonOrMessageChange)

if err := a.updateMachineStatus(machine, awsStatus, nil); err != nil {
if err := a.setMachineStatus(machine, awsStatus, nil); err != nil {
return err
}

Expand Down Expand Up @@ -402,6 +381,8 @@ func (a *Actuator) DeleteMachine(machine *machinev1.Machine) error {
func (a *Actuator) Update(context context.Context, machine *machinev1.Machine) error {
glog.Infof("%s: updating machine", machine.Name)

machineToBePatched := client.MergeFrom(machine.DeepCopy())

machineProviderConfig, err := providerConfigFromMachine(machine, a.codec)
if err != nil {
return a.handleMachineError(machine, mapierrors.InvalidMachineConfiguration("error decoding MachineProviderConfig: %v", err), updateEventAction)
Expand Down Expand Up @@ -439,7 +420,7 @@ func (a *Actuator) Update(context context.Context, machine *machinev1.Machine) e
a.handleMachineError(machine, mapierrors.UpdateMachine("no instance found, reason unknown"), updateEventAction)

// Update status to clear out machine details.
if err := a.updateStatus(machine, nil); err != nil {
if err := a.setStatus(machine, nil); err != nil {
return err
}
// This is an unrecoverable error condition. We should delay to
Expand Down Expand Up @@ -469,20 +450,28 @@ func (a *Actuator) Update(context context.Context, machine *machinev1.Machine) e

a.eventRecorder.Eventf(machine, corev1.EventTypeNormal, "Updated", "Updated machine %v", machine.Name)

modMachine, err := a.setMachineCloudProviderSpecifics(machine, newestInstance)
err = a.setMachineCloudProviderSpecifics(machine, newestInstance)
if err != nil {
return fmt.Errorf("%s: failed to set machine cloud provider specifics: %v", machine.Name, err)
}
machine = modMachine

updatedMachine, err := a.updateProviderID(machine, newestInstance)
err = a.setProviderID(machine, newestInstance)
if err != nil {
return fmt.Errorf("%s: failed to update machine object with providerID: %v", machine.Name, err)
}
machine = updatedMachine

// We do not support making changes to pre-existing instances, just update status.
return a.updateStatus(machine, newestInstance)
err = a.setStatus(machine, newestInstance)
if err != nil {
return fmt.Errorf("%s: failed to set machine status: %v", machine.Name, err)
}

err = a.patchMachine(context, machine, machineToBePatched)
if err != nil {
return fmt.Errorf("%s: failed to patch machine: %v", machine.Name, err)
}

return nil
}

// Exists determines if the given machine currently exists.
Expand Down Expand Up @@ -590,9 +579,8 @@ func (a *Actuator) updateLoadBalancers(client awsclient.Client, providerConfig *
return nil
}

// updateStatus calculates the new machine status, checks if anything has changed, and updates if so.
func (a *Actuator) updateStatus(machine *machinev1.Machine, instance *ec2.Instance) error {

// setStatus calculates the new machine status, checks if anything has changed, and updates if so.
func (a *Actuator) setStatus(machine *machinev1.Machine, instance *ec2.Instance) error {
glog.Infof("%s: Updating status", machine.Name)

// Starting with a fresh status as we assume full control of it here.
Expand Down Expand Up @@ -621,18 +609,10 @@ func (a *Actuator) updateStatus(machine *machinev1.Machine, instance *ec2.Instan

networkAddresses = append(networkAddresses, addresses...)
}

glog.Infof("%s: finished calculating AWS status", machine.Name)

awsStatus.Conditions = setAWSMachineProviderCondition(awsStatus.Conditions, providerconfigv1.MachineCreation, corev1.ConditionTrue, MachineCreationSucceeded, "machine successfully created", updateConditionIfReasonOrMessageChange)
// TODO(jchaloup): do we really need to update tis?
// origInstanceID := awsStatus.InstanceID
// if !StringPtrsEqual(origInstanceID, awsStatus.InstanceID) {
// mLog.Debug("AWS instance ID changed, clearing LastELBSync to trigger adding to ELBs")
// awsStatus.LastELBSync = nil
// }

if err := a.updateMachineStatus(machine, awsStatus, networkAddresses); err != nil {
if err := a.setMachineStatus(machine, awsStatus, networkAddresses); err != nil {
return err
}

Expand All @@ -654,3 +634,18 @@ func getClusterID(machine *machinev1.Machine) (string, bool) {
}
return clusterID, ok
}

func (a *Actuator) patchMachine(ctx context.Context, machine *machinev1.Machine, machineToBePatched client.Patch) error {
// Patch machine
if err := a.client.Patch(ctx, machine, machineToBePatched); err != nil {
klog.Errorf("Failed to update machine %q: %v", machine.GetName(), err)
return err
}

//Patch status
if err := a.client.Status().Patch(ctx, machine, machineToBePatched); err != nil {
klog.Errorf("Failed to update machine %q: %v", machine.GetName(), err)
return err
}
return nil
}
14 changes: 1 addition & 13 deletions pkg/actuators/machine/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
providerconfigv1 "sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsproviderconfig/v1beta1"
Expand Down Expand Up @@ -213,19 +212,8 @@ func TestActuator(t *testing.T) {
}

getMachineStatus := func(objectClient client.Client, machine *machinev1.Machine) (*providerconfigv1.AWSMachineProviderStatus, error) {
// Get updated machine object from the cluster client
key := types.NamespacedName{
Namespace: machine.Namespace,
Name: machine.Name,
}
updatedMachine := machinev1.Machine{}
err := objectClient.Get(context.Background(), client.ObjectKey(key), &updatedMachine)
if err != nil {
return nil, fmt.Errorf("unable to retrieve machine: %v", err)
}

machineStatus := &providerconfigv1.AWSMachineProviderStatus{}
if err := codec.DecodeProviderStatus(updatedMachine.Status.ProviderStatus, machineStatus); err != nil {
if err := codec.DecodeProviderStatus(machine.Status.ProviderStatus, machineStatus); err != nil {
return nil, fmt.Errorf("error decoding machine provider status: %v", err)
}
return machineStatus, nil
Expand Down

0 comments on commit e6a5716

Please sign in to comment.