Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds retry-on-conflict during updates #725

Merged
merged 3 commits into from
Apr 19, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/cloud/aws/actuators/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/client-go/util/retry:go_default_library",
"//vendor/k8s.io/klog/klogr:go_default_library",
"//vendor/sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1:go_default_library",
"//vendor/sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1:go_default_library",
Expand Down
4 changes: 2 additions & 2 deletions pkg/cloud/aws/actuators/machine/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,13 +452,13 @@ func (a *Actuator) Exists(ctx context.Context, cluster *clusterv1.Cluster, machi
if machine.Status.NodeRef == nil {
nodeRef, err := a.getNodeReference(scope)
if err != nil {
// non critical error
a.log.Info("Failed to set nodeRef", "error", err)
// non critical error
return true, nil
}

scope.Machine.Status.NodeRef = nodeRef
a.log.Info("Setting machine's nodeRef", "machine-name", scope.Name(), "machine-namespace", scope.Namespace(), "nodeRef", nodeRef.Name)
a.log.V(2).Info("Setting machine's nodeRef", "machine-name", scope.Name(), "machine-namespace", scope.Namespace(), "nodeRef", nodeRef.Name)
}

return true, nil
Expand Down
73 changes: 45 additions & 28 deletions pkg/cloud/aws/actuators/machine_scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1"
clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
client "sigs.k8s.io/cluster-api/pkg/client/clientset_generated/clientset/typed/cluster/v1alpha1"
Expand Down Expand Up @@ -109,48 +110,64 @@ func (m *MachineScope) GetMachine() *clusterv1.Machine {
return m.Machine
}

// GetScope() returns the scope that is wrapping the machine.
// GetScope returns the scope that is wrapping the machine.
func (m *MachineScope) GetScope() *Scope {
return m.Scope
}

func (m *MachineScope) storeMachineSpec(machine *clusterv1.Machine) (*clusterv1.Machine, error) {
ext, err := v1alpha1.EncodeMachineSpec(m.MachineConfig)
if err != nil {
return nil, err
}

machine.Spec.ProviderSpec.Value = ext
return m.MachineClient.Update(machine)
}

func (m *MachineScope) storeMachineStatus(machine *clusterv1.Machine) (*clusterv1.Machine, error) {
ext, err := v1alpha1.EncodeMachineStatus(m.MachineStatus)
if err != nil {
return nil, err
}

m.Machine.Status.DeepCopyInto(&machine.Status)
machine.Status.ProviderStatus = ext
return m.MachineClient.UpdateStatus(machine)
}

// Close the MachineScope by updating the machine spec, machine status.
func (m *MachineScope) Close() {
if m.MachineClient == nil {
return
}

latestMachine, err := m.storeMachineSpec(m.Machine)
ext, err := v1alpha1.EncodeMachineSpec(m.MachineConfig)
if err != nil {
m.Error(err, "failed to update machine")
m.Error(err, "failed to encode machine spec")
return
}

_, err = m.storeMachineStatus(latestMachine)
status, err := v1alpha1.EncodeMachineStatus(m.MachineStatus)
if err != nil {
m.Error(err, "failed to store machine provider status")
m.Error(err, "failed to encode machine status")
return
}

// Sometimes when an object gets updated the local copy is out of date with
// the copy stored on the server. In the case of cluster-api this will
// always be because the local copy will have an out-of-date resource
// version. This is because something else has updated the resource version
// on the server and thus the local copy is behind.
// This retry function will update the resource version if the local copy is
// behind and try again.
// This retry function will *only* update the resource version. If some
// other data has changed then there is a problem. Nothing else should be
// updating the object and this function will (correctly) fail.
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
m.V(2).Info("Updating machine", "machine-resource-version", m.Machine.ResourceVersion, "node-ref", m.Machine.Status.NodeRef)
m.Machine.Spec.ProviderSpec.Value = ext
m.V(6).Info("Machine status before update", "machine-status", m.Machine.Status)
latest, err := m.MachineClient.Update(m.Machine)
if err != nil {
m.Error(err, "error updating machine")
newestMachine, err2 := m.MachineClient.Get(m.Machine.Name, metav1.GetOptions{})
if err2 != nil {
m.Error(err2, "failed to fetch latest Machine")
return err2
}
// Error if anything but the machine resource version changes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment here seems a bit misleading, for a while I was trying to figure out how it was testing to see if anything other than the resourceVersion changed.

Copy link
Contributor Author

@chuckha chuckha Apr 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about something like

// Update the resource version and retry the update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or

// Update only the resource and try again. If something else in the resource has changed this will error.

m.Machine.ResourceVersion = newestMachine.ResourceVersion
return err
}
m.V(5).Info("Latest machine", "machine", latest)
// The machine may have status (nodeRef) that the latest doesn't yet
// have, however some timestamps may be rolled back a bit with this copy.
m.Machine.Status.DeepCopyInto(&latest.Status)
latest.Status.ProviderStatus = status
_, err = m.MachineClient.UpdateStatus(latest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there also be a retry around UpdateStatus() here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update status doesn't need a retry. If the update status does fail then we'll run the Update again and get the new resource version and try the UpdateStatus again.

I think the biggest issue we could see is if there were multiple things updating this resource regularly, then we may have to consider breaking this Update & UpdateStatus from one operation into two with individual retry blocks.

return err
}); err != nil {
m.Error(err, "error retrying on conflict")
}
m.V(2).Info("Successfully updated machine")
}

// MachineConfigFromProviderSpec tries to decode the JSON-encoded spec, falling back on getting a MachineClass if the value is absent.
Expand Down
60 changes: 34 additions & 26 deletions pkg/cloud/aws/actuators/scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/aws/aws-sdk-go/service/elb"
"github.com/go-logr/logr"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"k8s.io/klog/klogr"
"sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1"
clusterv1 "sigs.k8s.io/cluster-api/pkg/apis/cluster/v1alpha1"
Expand Down Expand Up @@ -131,40 +133,46 @@ func (s *Scope) Region() string {
return s.ClusterConfig.Region
}

func (s *Scope) storeClusterConfig(cluster *clusterv1.Cluster) (*clusterv1.Cluster, error) {
ext, err := v1alpha1.EncodeClusterSpec(s.ClusterConfig)
if err != nil {
return nil, err
}

cluster.Spec.ProviderSpec.Value = ext
return s.ClusterClient.Update(cluster)
}

func (s *Scope) storeClusterStatus(cluster *clusterv1.Cluster) (*clusterv1.Cluster, error) {
ext, err := v1alpha1.EncodeClusterStatus(s.ClusterStatus)
if err != nil {
return nil, err
}

cluster.Status.ProviderStatus = ext
return s.ClusterClient.UpdateStatus(cluster)
}

// Close closes the current scope persisting the cluster configuration and status.
func (s *Scope) Close() {
if s.ClusterClient == nil {
return
}

latestCluster, err := s.storeClusterConfig(s.Cluster)
ext, err := v1alpha1.EncodeClusterSpec(s.ClusterConfig)
if err != nil {
s.Error(err, "failed to store provider config")
s.Error(err, "failed encoding cluster spec")
return
}

_, err = s.storeClusterStatus(latestCluster)
status, err := v1alpha1.EncodeClusterStatus(s.ClusterStatus)
if err != nil {
s.Error(err, "failed to store provider status")
s.Error(err, "failed encoding cluster status")
return
}

// Update the resource version and try again if there is a conflict saving the object.
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
chuckha marked this conversation as resolved.
Show resolved Hide resolved
s.V(2).Info("Updating cluster", "cluster-resource-version", s.Cluster.ResourceVersion)
s.Cluster.Spec.ProviderSpec.Value = ext
s.V(6).Info("Cluster status before update", "cluster-status", s.Cluster.Status)
latest, err := s.ClusterClient.Update(s.Cluster)
if err != nil {
s.Error(err, "error updating cluster")
newestCluster, err2 := s.ClusterClient.Get(s.Cluster.Name, metav1.GetOptions{})
if err2 != nil {
s.Error(err2, "failed to fetch latest cluster")
return err2
}
// Error if anything but the cluster resource version changes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, this comment seems a bit misleading.

s.Cluster.ResourceVersion = newestCluster.ResourceVersion
return err
}
s.V(5).Info("Latest cluster status", "cluster-status", latest.Status)
s.Cluster.Status.DeepCopyInto(&latest.Status)
latest.Status.ProviderStatus = status
_, err = s.ClusterClient.UpdateStatus(latest)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a retry around updating the status as well?

return err
}); err != nil {
s.Error(err, "failed to retry on conflict")
}
s.V(2).Info("Successfully updated cluster")
}
2 changes: 2 additions & 0 deletions pkg/cloud/aws/services/ec2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ go_library(
"//pkg/cloud/aws/tags:go_default_library",
"//pkg/record:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/request:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
"//vendor/github.com/go-logr/logr:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
],
)
Expand Down
19 changes: 17 additions & 2 deletions pkg/cloud/aws/services/ec2/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"sigs.k8s.io/cluster-api-provider-aws/pkg/apis/awsprovider/v1alpha1"
"sigs.k8s.io/cluster-api-provider-aws/pkg/cloud/aws/actuators"
Expand Down Expand Up @@ -435,9 +437,22 @@ func (s *Service) runInstance(role string, i *v1alpha1.Instance) (*v1alpha1.Inst
return nil, errors.Errorf("no instance returned for reservation %v", out.GoString())
}

s.scope.EC2.WaitUntilInstanceRunning(&ec2.DescribeInstancesInput{InstanceIds: []*string{out.Instances[0].InstanceId}})
s.scope.V(2).Info("Waiting for instance to run", "instance-id", *out.Instances[0].InstanceId)
err = s.scope.EC2.WaitUntilInstanceRunningWithContext(
aws.BackgroundContext(),
&ec2.DescribeInstancesInput{InstanceIds: []*string{out.Instances[0].InstanceId}},
request.WithWaiterLogger(&awslog{s.scope.Logger}),
)
return converters.SDKToInstance(out.Instances[0]), err
}

// An internal type to satisfy aws' log interface.
type awslog struct {
logr.Logger
}

return s.SDKToInstance(out.Instances[0])
func (a *awslog) Log(args ...interface{}) {
a.WithName("aws-logger").Info("AWS context", args...)
}

// UpdateInstanceSecurityGroups modifies the security groups of the given
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/aws/services/ec2/instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ vuO9LYxDXLVY9F7W4ccyCqe27Cj1xyAvdZxwhITrib8Wg5CMqoRpqTw5V3+TpA==
},
},
}, nil)
m.WaitUntilInstanceRunning(gomock.Any()).
m.WaitUntilInstanceRunningWithContext(gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
},
check: func(instance *v1alpha1.Instance, err error) {
Expand Down