From f973598ee7f290f8e1d736bd6e2d449d1d451a8e Mon Sep 17 00:00:00 2001 From: Chuck Ha Date: Fri, 19 Apr 2019 13:31:55 -0400 Subject: [PATCH] Adds retry-on-conflict during updates (#725) * Adds retry-on-conflict during updates Signed-off-by: Chuck Ha * adds note about status update caveat Signed-off-by: Chuck Ha * clarify errors/comments Signed-off-by: Chuck Ha --- Gopkg.lock | 1 + pkg/cloud/aws/actuators/BUILD.bazel | 1 + pkg/cloud/aws/actuators/machine/actuator.go | 4 +- pkg/cloud/aws/actuators/machine_scope.go | 73 ++++++++++++-------- pkg/cloud/aws/actuators/scope.go | 60 +++++++++------- pkg/cloud/aws/services/ec2/BUILD.bazel | 2 + pkg/cloud/aws/services/ec2/instances.go | 19 ++++- pkg/cloud/aws/services/ec2/instances_test.go | 2 +- 8 files changed, 103 insertions(+), 59 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 97aabe27f5..2104d5a5dc 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1300,6 +1300,7 @@ "k8s.io/client-go/tools/clientcmd", "k8s.io/client-go/tools/clientcmd/api", "k8s.io/client-go/tools/record", + "k8s.io/client-go/util/retry", "k8s.io/cluster-bootstrap/token/api", "k8s.io/cluster-bootstrap/token/util", "k8s.io/code-generator/cmd/deepcopy-gen", diff --git a/pkg/cloud/aws/actuators/BUILD.bazel b/pkg/cloud/aws/actuators/BUILD.bazel index dab67a7b56..655603ca01 100644 --- a/pkg/cloud/aws/actuators/BUILD.bazel +++ b/pkg/cloud/aws/actuators/BUILD.bazel @@ -22,6 +22,7 @@ go_library( "//vendor/github.com/pkg/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/client-go/util/retry:go_default_library", "//vendor/k8s.io/klog/klogr:go_default_library", "//vendor/sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1:go_default_library", "//vendor/sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1:go_default_library", diff --git a/pkg/cloud/aws/actuators/machine/actuator.go b/pkg/cloud/aws/actuators/machine/actuator.go index 8d48fc37fa..884a0a18be 100644 --- a/pkg/cloud/aws/actuators/machine/actuator.go +++ b/pkg/cloud/aws/actuators/machine/actuator.go @@ -452,13 +452,13 @@ func (a *Actuator) Exists(ctx context.Context, cluster *clusterv1.Cluster, machi if machine.Status.NodeRef == nil { nodeRef, err := a.getNodeReference(scope) if err != nil { - // non critical error a.log.Info("Failed to set nodeRef", "error", err) + // non critical error return true, nil } scope.Machine.Status.NodeRef = nodeRef - a.log.Info("Setting machine's nodeRef", "machine-name", scope.Name(), "machine-namespace", scope.Namespace(), "nodeRef", nodeRef.Name) + a.log.V(2).Info("Setting machine's nodeRef", "machine-name", scope.Name(), "machine-namespace", scope.Namespace(), "nodeRef", nodeRef.Name) } return true, nil diff --git a/pkg/cloud/aws/actuators/machine_scope.go b/pkg/cloud/aws/actuators/machine_scope.go index 7ea9f908f6..d0a51a87ce 100644 --- a/pkg/cloud/aws/actuators/machine_scope.go +++ b/pkg/cloud/aws/actuators/machine_scope.go @@ -23,6 +23,7 @@ import ( "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/util/retry" "sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1" clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1" client "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1" @@ -109,48 +110,64 @@ func (m *MachineScope) GetMachine() *clusterv1.Machine { return m.Machine } -// GetScope() returns the scope that is wrapping the machine. +// GetScope returns the scope that is wrapping the machine. func (m *MachineScope) GetScope() *Scope { return m.Scope } -func (m *MachineScope) storeMachineSpec(machine *clusterv1.Machine) (*clusterv1.Machine, error) { - ext, err := v1alpha1.EncodeMachineSpec(m.MachineConfig) - if err != nil { - return nil, err - } - - machine.Spec.ProviderSpec.Value = ext - return m.MachineClient.Update(machine) -} - -func (m *MachineScope) storeMachineStatus(machine *clusterv1.Machine) (*clusterv1.Machine, error) { - ext, err := v1alpha1.EncodeMachineStatus(m.MachineStatus) - if err != nil { - return nil, err - } - - m.Machine.Status.DeepCopyInto(&machine.Status) - machine.Status.ProviderStatus = ext - return m.MachineClient.UpdateStatus(machine) -} - // Close the MachineScope by updating the machine spec, machine status. func (m *MachineScope) Close() { if m.MachineClient == nil { return } - - latestMachine, err := m.storeMachineSpec(m.Machine) + ext, err := v1alpha1.EncodeMachineSpec(m.MachineConfig) if err != nil { - m.Error(err, "failed to update machine") + m.Error(err, "failed to encode machine spec") return } - - _, err = m.storeMachineStatus(latestMachine) + status, err := v1alpha1.EncodeMachineStatus(m.MachineStatus) if err != nil { - m.Error(err, "failed to store machine provider status") + m.Error(err, "failed to encode machine status") + return + } + + // Sometimes when an object gets updated the local copy is out of date with + // the copy stored on the server. In the case of cluster-api this will + // always be because the local copy will have an out-of-date resource + // version. This is because something else has updated the resource version + // on the server and thus the local copy is behind. + // This retry function will update the resource version if the local copy is + // behind and try again. + // This retry function will *only* update the resource version. If some + // other data has changed then there is a problem. Nothing else should be + // updating the object and this function will (correctly) fail. + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + m.V(2).Info("Updating machine", "machine-resource-version", m.Machine.ResourceVersion, "node-ref", m.Machine.Status.NodeRef) + m.Machine.Spec.ProviderSpec.Value = ext + m.V(6).Info("Machine status before update", "machine-status", m.Machine.Status) + latest, err := m.MachineClient.Update(m.Machine) + if err != nil { + m.V(3).Info("Machine resource version is out of date") + // Fetch and update the latest resource version + newestMachine, err2 := m.MachineClient.Get(m.Machine.Name, metav1.GetOptions{}) + if err2 != nil { + m.Error(err2, "failed to fetch latest Machine") + return err2 + } + m.Machine.ResourceVersion = newestMachine.ResourceVersion + return err + } + m.V(5).Info("Latest machine", "machine", latest) + // The machine may have status (nodeRef) that the latest doesn't yet + // have, however some timestamps may be rolled back a bit with this copy. + m.Machine.Status.DeepCopyInto(&latest.Status) + latest.Status.ProviderStatus = status + _, err = m.MachineClient.UpdateStatus(latest) + return err + }); err != nil { + m.Error(err, "error retrying on conflict") } + m.V(2).Info("Successfully updated machine") } // MachineConfigFromProviderSpec tries to decode the JSON-encoded spec, falling back on getting a MachineClass if the value is absent. diff --git a/pkg/cloud/aws/actuators/scope.go b/pkg/cloud/aws/actuators/scope.go index 6c06c0f140..e17f0ccf03 100644 --- a/pkg/cloud/aws/actuators/scope.go +++ b/pkg/cloud/aws/actuators/scope.go @@ -23,6 +23,8 @@ import ( "github.com/aws/aws-sdk-go/service/elb" "github.com/go-logr/logr" "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/util/retry" "k8s.io/klog/klogr" "sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1" clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1" @@ -131,40 +133,46 @@ func (s *Scope) Region() string { return s.ClusterConfig.Region } -func (s *Scope) storeClusterConfig(cluster *clusterv1.Cluster) (*clusterv1.Cluster, error) { - ext, err := v1alpha1.EncodeClusterSpec(s.ClusterConfig) - if err != nil { - return nil, err - } - - cluster.Spec.ProviderSpec.Value = ext - return s.ClusterClient.Update(cluster) -} - -func (s *Scope) storeClusterStatus(cluster *clusterv1.Cluster) (*clusterv1.Cluster, error) { - ext, err := v1alpha1.EncodeClusterStatus(s.ClusterStatus) - if err != nil { - return nil, err - } - - cluster.Status.ProviderStatus = ext - return s.ClusterClient.UpdateStatus(cluster) -} - // Close closes the current scope persisting the cluster configuration and status. func (s *Scope) Close() { if s.ClusterClient == nil { return } - - latestCluster, err := s.storeClusterConfig(s.Cluster) + ext, err := v1alpha1.EncodeClusterSpec(s.ClusterConfig) if err != nil { - s.Error(err, "failed to store provider config") + s.Error(err, "failed encoding cluster spec") return } - - _, err = s.storeClusterStatus(latestCluster) + status, err := v1alpha1.EncodeClusterStatus(s.ClusterStatus) if err != nil { - s.Error(err, "failed to store provider status") + s.Error(err, "failed encoding cluster status") + return + } + + // Update the resource version and try again if there is a conflict saving the object. + if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + s.V(2).Info("Updating cluster", "cluster-resource-version", s.Cluster.ResourceVersion) + s.Cluster.Spec.ProviderSpec.Value = ext + s.V(6).Info("Cluster status before update", "cluster-status", s.Cluster.Status) + latest, err := s.ClusterClient.Update(s.Cluster) + if err != nil { + s.V(3).Info("Cluster resource version is out of date") + // Fetch and update the latest resource version + newestCluster, err2 := s.ClusterClient.Get(s.Cluster.Name, metav1.GetOptions{}) + if err2 != nil { + s.Error(err2, "failed to fetch latest cluster") + return err2 + } + s.Cluster.ResourceVersion = newestCluster.ResourceVersion + return err + } + s.V(5).Info("Latest cluster status", "cluster-status", latest.Status) + s.Cluster.Status.DeepCopyInto(&latest.Status) + latest.Status.ProviderStatus = status + _, err = s.ClusterClient.UpdateStatus(latest) + return err + }); err != nil { + s.Error(err, "failed to retry on conflict") } + s.V(2).Info("Successfully updated cluster") } diff --git a/pkg/cloud/aws/services/ec2/BUILD.bazel b/pkg/cloud/aws/services/ec2/BUILD.bazel index 577351b6a2..57d46c2d4a 100644 --- a/pkg/cloud/aws/services/ec2/BUILD.bazel +++ b/pkg/cloud/aws/services/ec2/BUILD.bazel @@ -33,7 +33,9 @@ go_library( "//pkg/cloud/aws/tags:go_default_library", "//pkg/record:go_default_library", "//vendor/github.com/aws/aws-sdk-go/aws:go_default_library", + "//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library", "//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library", + "//vendor/github.com/go-logr/logr:go_default_library", "//vendor/github.com/pkg/errors:go_default_library", ], ) diff --git a/pkg/cloud/aws/services/ec2/instances.go b/pkg/cloud/aws/services/ec2/instances.go index cb40f13896..12f4fed127 100644 --- a/pkg/cloud/aws/services/ec2/instances.go +++ b/pkg/cloud/aws/services/ec2/instances.go @@ -23,7 +23,9 @@ import ( "strings" "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/ec2" + "github.com/go-logr/logr" "github.com/pkg/errors" "sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1" "sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/aws/actuators" @@ -435,9 +437,22 @@ func (s *Service) runInstance(role string, i *v1alpha1.Instance) (*v1alpha1.Inst return nil, errors.Errorf("no instance returned for reservation %v", out.GoString()) } - s.scope.EC2.WaitUntilInstanceRunning(&ec2.DescribeInstancesInput{InstanceIds: []*string{out.Instances[0].InstanceId}}) + s.scope.V(2).Info("Waiting for instance to run", "instance-id", *out.Instances[0].InstanceId) + err = s.scope.EC2.WaitUntilInstanceRunningWithContext( + aws.BackgroundContext(), + &ec2.DescribeInstancesInput{InstanceIds: []*string{out.Instances[0].InstanceId}}, + request.WithWaiterLogger(&awslog{s.scope.Logger}), + ) + return converters.SDKToInstance(out.Instances[0]), err +} + +// An internal type to satisfy aws' log interface. +type awslog struct { + logr.Logger +} - return s.SDKToInstance(out.Instances[0]) +func (a *awslog) Log(args ...interface{}) { + a.WithName("aws-logger").Info("AWS context", args...) } // UpdateInstanceSecurityGroups modifies the security groups of the given diff --git a/pkg/cloud/aws/services/ec2/instances_test.go b/pkg/cloud/aws/services/ec2/instances_test.go index 94d3a654fc..8a41ad77ab 100644 --- a/pkg/cloud/aws/services/ec2/instances_test.go +++ b/pkg/cloud/aws/services/ec2/instances_test.go @@ -381,7 +381,7 @@ vuO9LYxDXLVY9F7W4ccyCqe27Cj1xyAvdZxwhITrib8Wg5CMqoRpqTw5V3+TpA== }, }, }, nil) - m.WaitUntilInstanceRunning(gomock.Any()). + m.WaitUntilInstanceRunningWithContext(gomock.Any(), gomock.Any(), gomock.Any()). Return(nil) }, check: func(instance *v1alpha1.Instance, err error) {