Skip to content

Commit

Permalink
Adds retry-on-conflict during updates (kubernetes-sigs#725)
Browse files Browse the repository at this point in the history
* Adds retry-on-conflict during updates

Signed-off-by: Chuck Ha <[email protected]>

* adds note about status update caveat

Signed-off-by: Chuck Ha <[email protected]>

* clarify errors/comments

Signed-off-by: Chuck Ha <[email protected]>
  • Loading branch information
chuckha authored and detiber committed May 2, 2019
1 parent f8ae30d commit f973598
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 59 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/cloud/aws/actuators/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
"//vendor/k8s.io/klog/klogr:go_default_library",
"//vendor/sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1:go_default_library",
"//vendor/sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1:go_default_library",
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloud/aws/actuators/machine/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,13 @@ func (a *Actuator) Exists(ctx context.Context, cluster *clusterv1.Cluster, machi
if machine.Status.NodeRef == nil {
nodeRef, err := a.getNodeReference(scope)
if err != nil {
// non critical error
a.log.Info("Failed to set nodeRef", "error", err)
// non critical error
return true, nil
}

scope.Machine.Status.NodeRef = nodeRef
a.log.Info("Setting machine's nodeRef", "machine-name", scope.Name(), "machine-namespace", scope.Namespace(), "nodeRef", nodeRef.Name)
a.log.V(2).Info("Setting machine's nodeRef", "machine-name", scope.Name(), "machine-namespace", scope.Namespace(), "nodeRef", nodeRef.Name)
}

return true, nil
Expand Down
73 changes: 45 additions & 28 deletions pkg/cloud/aws/actuators/machine_scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1"
clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
client "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1"
Expand Down Expand Up @@ -109,48 +110,64 @@ func (m *MachineScope) GetMachine() *clusterv1.Machine {
return m.Machine
}

// GetScope() returns the scope that is wrapping the machine.
// GetScope returns the scope that is wrapping the machine.
func (m *MachineScope) GetScope() *Scope {
return m.Scope
}

func (m *MachineScope) storeMachineSpec(machine *clusterv1.Machine) (*clusterv1.Machine, error) {
ext, err := v1alpha1.EncodeMachineSpec(m.MachineConfig)
if err != nil {
return nil, err
}

machine.Spec.ProviderSpec.Value = ext
return m.MachineClient.Update(machine)
}

func (m *MachineScope) storeMachineStatus(machine *clusterv1.Machine) (*clusterv1.Machine, error) {
ext, err := v1alpha1.EncodeMachineStatus(m.MachineStatus)
if err != nil {
return nil, err
}

m.Machine.Status.DeepCopyInto(&machine.Status)
machine.Status.ProviderStatus = ext
return m.MachineClient.UpdateStatus(machine)
}

// Close the MachineScope by updating the machine spec, machine status.
func (m *MachineScope) Close() {
if m.MachineClient == nil {
return
}

latestMachine, err := m.storeMachineSpec(m.Machine)
ext, err := v1alpha1.EncodeMachineSpec(m.MachineConfig)
if err != nil {
m.Error(err, "failed to update machine")
m.Error(err, "failed to encode machine spec")
return
}

_, err = m.storeMachineStatus(latestMachine)
status, err := v1alpha1.EncodeMachineStatus(m.MachineStatus)
if err != nil {
m.Error(err, "failed to store machine provider status")
m.Error(err, "failed to encode machine status")
return
}

// Sometimes when an object gets updated the local copy is out of date with
// the copy stored on the server. In the case of cluster-api this will
// always be because the local copy will have an out-of-date resource
// version. This is because something else has updated the resource version
// on the server and thus the local copy is behind.
// This retry function will update the resource version if the local copy is
// behind and try again.
// This retry function will *only* update the resource version. If some
// other data has changed then there is a problem. Nothing else should be
// updating the object and this function will (correctly) fail.
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
m.V(2).Info("Updating machine", "machine-resource-version", m.Machine.ResourceVersion, "node-ref", m.Machine.Status.NodeRef)
m.Machine.Spec.ProviderSpec.Value = ext
m.V(6).Info("Machine status before update", "machine-status", m.Machine.Status)
latest, err := m.MachineClient.Update(m.Machine)
if err != nil {
m.V(3).Info("Machine resource version is out of date")
// Fetch and update the latest resource version
newestMachine, err2 := m.MachineClient.Get(m.Machine.Name, metav1.GetOptions{})
if err2 != nil {
m.Error(err2, "failed to fetch latest Machine")
return err2
}
m.Machine.ResourceVersion = newestMachine.ResourceVersion
return err
}
m.V(5).Info("Latest machine", "machine", latest)
// The machine may have status (nodeRef) that the latest doesn't yet
// have, however some timestamps may be rolled back a bit with this copy.
m.Machine.Status.DeepCopyInto(&latest.Status)
latest.Status.ProviderStatus = status
_, err = m.MachineClient.UpdateStatus(latest)
return err
}); err != nil {
m.Error(err, "error retrying on conflict")
}
m.V(2).Info("Successfully updated machine")
}

// MachineConfigFromProviderSpec tries to decode the JSON-encoded spec, falling back on getting a MachineClass if the value is absent.
Expand Down
60 changes: 34 additions & 26 deletions pkg/cloud/aws/actuators/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/aws/aws-sdk-go/service/elb"
"github.com/go-logr/logr"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/klogr"
"sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1"
clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
Expand Down Expand Up @@ -131,40 +133,46 @@ func (s *Scope) Region() string {
return s.ClusterConfig.Region
}

func (s *Scope) storeClusterConfig(cluster *clusterv1.Cluster) (*clusterv1.Cluster, error) {
ext, err := v1alpha1.EncodeClusterSpec(s.ClusterConfig)
if err != nil {
return nil, err
}

cluster.Spec.ProviderSpec.Value = ext
return s.ClusterClient.Update(cluster)
}

func (s *Scope) storeClusterStatus(cluster *clusterv1.Cluster) (*clusterv1.Cluster, error) {
ext, err := v1alpha1.EncodeClusterStatus(s.ClusterStatus)
if err != nil {
return nil, err
}

cluster.Status.ProviderStatus = ext
return s.ClusterClient.UpdateStatus(cluster)
}

// Close closes the current scope persisting the cluster configuration and status.
func (s *Scope) Close() {
if s.ClusterClient == nil {
return
}

latestCluster, err := s.storeClusterConfig(s.Cluster)
ext, err := v1alpha1.EncodeClusterSpec(s.ClusterConfig)
if err != nil {
s.Error(err, "failed to store provider config")
s.Error(err, "failed encoding cluster spec")
return
}

_, err = s.storeClusterStatus(latestCluster)
status, err := v1alpha1.EncodeClusterStatus(s.ClusterStatus)
if err != nil {
s.Error(err, "failed to store provider status")
s.Error(err, "failed encoding cluster status")
return
}

// Update the resource version and try again if there is a conflict saving the object.
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
s.V(2).Info("Updating cluster", "cluster-resource-version", s.Cluster.ResourceVersion)
s.Cluster.Spec.ProviderSpec.Value = ext
s.V(6).Info("Cluster status before update", "cluster-status", s.Cluster.Status)
latest, err := s.ClusterClient.Update(s.Cluster)
if err != nil {
s.V(3).Info("Cluster resource version is out of date")
// Fetch and update the latest resource version
newestCluster, err2 := s.ClusterClient.Get(s.Cluster.Name, metav1.GetOptions{})
if err2 != nil {
s.Error(err2, "failed to fetch latest cluster")
return err2
}
s.Cluster.ResourceVersion = newestCluster.ResourceVersion
return err
}
s.V(5).Info("Latest cluster status", "cluster-status", latest.Status)
s.Cluster.Status.DeepCopyInto(&latest.Status)
latest.Status.ProviderStatus = status
_, err = s.ClusterClient.UpdateStatus(latest)
return err
}); err != nil {
s.Error(err, "failed to retry on conflict")
}
s.V(2).Info("Successfully updated cluster")
}
2 changes: 2 additions & 0 deletions pkg/cloud/aws/services/ec2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ go_library(
"//pkg/cloud/aws/tags:go_default_library",
"//pkg/record:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
"//vendor/github.com/go-logr/logr:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
],
)
Expand Down
19 changes: 17 additions & 2 deletions pkg/cloud/aws/services/ec2/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/aws/actuators"
Expand Down Expand Up @@ -435,9 +437,22 @@ func (s *Service) runInstance(role string, i *v1alpha1.Instance) (*v1alpha1.Inst
return nil, errors.Errorf("no instance returned for reservation %v", out.GoString())
}

s.scope.EC2.WaitUntilInstanceRunning(&ec2.DescribeInstancesInput{InstanceIds: []*string{out.Instances[0].InstanceId}})
s.scope.V(2).Info("Waiting for instance to run", "instance-id", *out.Instances[0].InstanceId)
err = s.scope.EC2.WaitUntilInstanceRunningWithContext(
aws.BackgroundContext(),
&ec2.DescribeInstancesInput{InstanceIds: []*string{out.Instances[0].InstanceId}},
request.WithWaiterLogger(&awslog{s.scope.Logger}),
)
return converters.SDKToInstance(out.Instances[0]), err
}

// An internal type to satisfy aws' log interface.
type awslog struct {
logr.Logger
}

return s.SDKToInstance(out.Instances[0])
func (a *awslog) Log(args ...interface{}) {
a.WithName("aws-logger").Info("AWS context", args...)
}

// UpdateInstanceSecurityGroups modifies the security groups of the given
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/aws/services/ec2/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ vuO9LYxDXLVY9F7W4ccyCqe27Cj1xyAvdZxwhITrib8Wg5CMqoRpqTw5V3+TpA==
},
},
}, nil)
m.WaitUntilInstanceRunning(gomock.Any()).
m.WaitUntilInstanceRunningWithContext(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
},
check: func(instance *v1alpha1.Instance, err error) {
Expand Down

0 comments on commit f973598

Please sign in to comment.