Skip to content

Commit

Permalink
Allow incremental condition patch
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziopandini committed Jun 26, 2020
1 parent bf704ba commit c704c98
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 35 deletions.
97 changes: 73 additions & 24 deletions util/conditions/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ type Patch []PatchOperation

// PatchOperation define an operation that changes a single condition.
type PatchOperation struct {
Target *clusterv1.Condition
Base *clusterv1.Condition
After *clusterv1.Condition
Before *clusterv1.Condition
Op PatchOperationType
}

Expand Down Expand Up @@ -57,12 +57,12 @@ func NewPatch(before Getter, after Getter) Patch {
targetCondition := targetConditions[i]
currentCondition := Get(before, targetCondition.Type)
if currentCondition == nil {
patch = append(patch, PatchOperation{Op: AddConditionPatch, Target: &targetCondition})
patch = append(patch, PatchOperation{Op: AddConditionPatch, After: &targetCondition})
continue
}

if !reflect.DeepEqual(&targetCondition, currentCondition) {
patch = append(patch, PatchOperation{Op: ChangeConditionPatch, Target: &targetCondition, Base: currentCondition})
patch = append(patch, PatchOperation{Op: ChangeConditionPatch, After: &targetCondition, Before: currentCondition})
}
}

Expand All @@ -72,67 +72,116 @@ func NewPatch(before Getter, after Getter) Patch {
baseCondition := baseConditions[i]
targetCondition := Get(after, baseCondition.Type)
if targetCondition == nil {
patch = append(patch, PatchOperation{Op: RemoveConditionPatch, Base: &baseCondition})
patch = append(patch, PatchOperation{Op: RemoveConditionPatch, Before: &baseCondition})
}
}
return patch
}

// applyOptions allows to set strategies for patch apply.
type applyOptions struct {
ignoreConflicts []clusterv1.ConditionType
}

func (o *applyOptions) shouldIgnoreConflict(t clusterv1.ConditionType) bool {
for _, i := range o.ignoreConflicts {
if i == t {
return true
}
}
return false
}

// ApplyOption defines an option for applying a condition patch.
type ApplyOption func(*applyOptions)

// IgnoreMergeConflicts allows to specify a list of condition types for which the value defined in the after
// object should take precedence on the latest value in case of conflicts.
func IgnoreMergeConflicts(t ...clusterv1.ConditionType) ApplyOption {
return func(c *applyOptions) {
c.ignoreConflicts = t
}
}

// Apply executes a three-way merge of a list of Patch.
// When merge conflicts are detected (latest deviated from before in an incompatible way), an error is returned.
func (p Patch) Apply(latest Setter) error {
func (p Patch) Apply(latest Setter, options ...ApplyOption) error {
if len(p) == 0 {
return nil
}

applyOpt := &applyOptions{}
for _, o := range options {
o(applyOpt)
}

for _, conditionPatch := range p {
switch conditionPatch.Op {
case AddConditionPatch:
// If the condition is already on latest, check if latest and after agree on the change; if not, this is a conflict.
if sourceCondition := Get(latest, conditionPatch.Target.Type); sourceCondition != nil {
if latestCondition := Get(latest, conditionPatch.After.Type); latestCondition != nil {
// If latest and after agree on the change, then it is a conflict.
if !hasSameState(sourceCondition, conditionPatch.Target) {
return errors.Errorf("error patching conditions: The condition %q on was modified by a different process and this caused a merge/AddCondition conflict", conditionPatch.Target.Type)
if !hasSameState(latestCondition, conditionPatch.After) {
// If conflicts should be ignored for this condition, keep the after value.
if applyOpt.shouldIgnoreConflict(conditionPatch.After.Type) {
Set(latest, conditionPatch.After)
continue
}
return errors.Errorf("error patching conditions: The condition %q on was modified by a different process and this caused a merge/AddCondition conflict", conditionPatch.After.Type)
}
// otherwise, the latest is already as intended.
// NOTE: We are preserving LastTransitionTime from the latest in order to avoid altering the existing value.
continue
}
// If the condition does not exists on the latest, add the new after condition.
Set(latest, conditionPatch.Target)
Set(latest, conditionPatch.After)

case ChangeConditionPatch:
sourceCondition := Get(latest, conditionPatch.Target.Type)
latestCondition := Get(latest, conditionPatch.After.Type)

// If the condition does not exist anymore on the latest, this is a conflict.
if sourceCondition == nil {
return errors.Errorf("error patching conditions: The condition %q on was deleted by a different process and this caused a merge/ChangeCondition conflict", conditionPatch.Target.Type)
if latestCondition == nil {
// If conflicts should be ignored for this condition, keep the after value.
if applyOpt.shouldIgnoreConflict(conditionPatch.After.Type) {
Set(latest, conditionPatch.After)
continue
}
return errors.Errorf("error patching conditions: The condition %q on was deleted by a different process and this caused a merge/ChangeCondition conflict", conditionPatch.After.Type)
}

// If the condition on the latest is different from the base condition, check if
// the after state corresponds to the desired value. If not this is a conflict.
if !reflect.DeepEqual(sourceCondition, conditionPatch.Base) {
if !hasSameState(sourceCondition, conditionPatch.Target) {
return errors.Errorf("error patching conditions: The condition %q on was modified by a different process and this caused a merge/ChangeCondition conflict", conditionPatch.Target.Type)
// the after state corresponds to the desired value. If not this is a conflict (unless we should ignore conflicts for this condition type).
if !reflect.DeepEqual(latestCondition, conditionPatch.Before) {
if !hasSameState(latestCondition, conditionPatch.After) {
// If conflicts should be ignored for this condition, keep the after value.
if applyOpt.shouldIgnoreConflict(conditionPatch.After.Type) {
Set(latest, conditionPatch.After)
continue
}
return errors.Errorf("error patching conditions: The condition %q on was modified by a different process and this caused a merge/ChangeCondition conflict", conditionPatch.After.Type)
}
// Otherwise the latest is already as intended.
// NOTE: We are preserving LastTransitionTime from the latest in order to avoid altering the existing value.
continue
}

// Otherwise apply the new after condition.
Set(latest, conditionPatch.Target)
Set(latest, conditionPatch.After)

case RemoveConditionPatch:
// If the condition is still on the latest, check if it is changed in the meantime;
// if so then this is a conflict.
if sourceCondition := Get(latest, conditionPatch.Base.Type); sourceCondition != nil {
if !hasSameState(sourceCondition, conditionPatch.Base) {
return errors.Errorf("error patching conditions: The condition %q on was modified by a different process and this caused a merge/RemoveCondition conflict", conditionPatch.Base.Type)
// if so then this is a conflict (unless we should ignore conflicts for this condition type).
if sourceCondition := Get(latest, conditionPatch.Before.Type); sourceCondition != nil {
if !hasSameState(sourceCondition, conditionPatch.Before) {
// If conflicts should be ignored for this condition, keep the after value (no condition).
if applyOpt.shouldIgnoreConflict(conditionPatch.Before.Type) {
Delete(latest, conditionPatch.Before.Type)
continue
}
return errors.Errorf("error patching conditions: The condition %q on was modified by a different process and this caused a merge/RemoveCondition conflict", conditionPatch.Before.Type)
}
}
// Otherwise the latest and after agreed on the delete operation, so there's nothing to change.
Delete(latest, conditionPatch.Base.Type)
Delete(latest, conditionPatch.Before.Type)
}
}
return nil
Expand Down
51 changes: 44 additions & 7 deletions util/conditions/patch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ func TestNewPatch(t *testing.T) {
after: getterWithConditions(fooTrue),
want: Patch{
{
Base: nil,
Target: fooTrue,
Before: nil,
After: fooTrue,
Op: AddConditionPatch,
},
},
Expand All @@ -66,8 +66,8 @@ func TestNewPatch(t *testing.T) {
after: getterWithConditions(fooFalse),
want: Patch{
{
Base: fooTrue,
Target: fooFalse,
Before: fooTrue,
After: fooFalse,
Op: ChangeConditionPatch,
},
},
Expand All @@ -78,8 +78,8 @@ func TestNewPatch(t *testing.T) {
after: getterWithConditions(),
want: Patch{
{
Base: fooTrue,
Target: nil,
Before: fooTrue,
After: nil,
Op: RemoveConditionPatch,
},
},
Expand Down Expand Up @@ -107,6 +107,7 @@ func TestApply(t *testing.T) {
before Getter
after Getter
latest Setter
options []ApplyOption
want clusterv1.Conditions
wantErr bool
}{
Expand Down Expand Up @@ -142,6 +143,15 @@ func TestApply(t *testing.T) {
want: nil,
wantErr: true,
},
{
name: "Add: When a condition already exists but with conflicts, it should not error if conflict for the condition type should be ignored",
before: getterWithConditions(),
after: getterWithConditions(fooTrue),
latest: setterWithConditions(fooFalse),
options: []ApplyOption{IgnoreMergeConflicts("foo")},
want: conditionList(fooTrue), // after condition should be kept in case of error
wantErr: false,
},
{
name: "Remove: When a condition was already deleted, it should pass",
before: getterWithConditions(fooTrue),
Expand All @@ -166,6 +176,15 @@ func TestApply(t *testing.T) {
want: nil,
wantErr: true,
},
{
name: "Remove: When a condition already exists but with conflicts, it should not error if conflict for the condition type should be ignored",
before: getterWithConditions(fooTrue),
after: getterWithConditions(),
latest: setterWithConditions(fooFalse),
options: []ApplyOption{IgnoreMergeConflicts("foo")},
want: conditionList(), // after condition should be kept in case of error
wantErr: false,
},
{
name: "Change: When a condition exists without conflicts, it should change",
before: getterWithConditions(fooTrue),
Expand All @@ -190,6 +209,15 @@ func TestApply(t *testing.T) {
want: nil,
wantErr: true,
},
{
name: "Change: When a condition exists with conflicts but there is no agreement on the final state, it should not error if conflict for the condition type should be ignored",
before: getterWithConditions(fooWarning),
after: getterWithConditions(fooFalse),
latest: setterWithConditions(fooTrue),
options: []ApplyOption{IgnoreMergeConflicts("foo")},
want: conditionList(fooFalse), // after condition should be kept in case of error
wantErr: false,
},
{
name: "Change: When a condition was deleted, it should error",
before: getterWithConditions(fooTrue),
Expand All @@ -198,6 +226,15 @@ func TestApply(t *testing.T) {
want: nil,
wantErr: true,
},
{
name: "Change: When a condition was deleted, it should not error if conflict for the condition type should be ignored",
before: getterWithConditions(fooTrue),
after: getterWithConditions(fooFalse),
latest: setterWithConditions(),
options: []ApplyOption{IgnoreMergeConflicts("foo")},
want: conditionList(fooFalse), // after condition should be kept in case of error
wantErr: false,
},
}

for _, tt := range tests {
Expand All @@ -206,7 +243,7 @@ func TestApply(t *testing.T) {

patch := NewPatch(tt.before, tt.after)

err := patch.Apply(tt.latest)
err := patch.Apply(tt.latest, tt.options...)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down
19 changes: 18 additions & 1 deletion util/patch/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package patch

import clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"

// Option is some configuration that modifies options for a patch request.
type Option interface {
// ApplyToHelper applies this configuration to the given Helper options.
Expand All @@ -27,13 +29,28 @@ type HelperOptions struct {
// IncludeStatusObservedGeneration sets the status.observedGeneration field
// on the incoming object to match metadata.generation, only if there is a change.
IncludeStatusObservedGeneration bool

// IgnoreConditionsConflicts defines a list of condition types for which the value defined in the object
// passed to patch should take precedence on the latest value in case of conflicts.
IgnoreConditionConflicts []clusterv1.ConditionType
}

// WithStatusObservedGeneration sets the status.observedGeneration field
// on the incoming object to match metadata.generation, only if there is a change.
type WithStatusObservedGeneration struct{}

// ApplyToHelper applies this configuration to the given an List options.
// ApplyToHelper applies this configuration to the given HelperOptions.
func (w WithStatusObservedGeneration) ApplyToHelper(in *HelperOptions) {
in.IncludeStatusObservedGeneration = true
}

// IgnoreConditionsConflicts allows to define condition types for which the value defined in the object
// passed to patch should take precedence on the latest value in case of conflicts.
type IgnoreConditionsConflicts struct {
Conditions []clusterv1.ConditionType
}

// ApplyToHelper applies this configuration to the given HelperOptions.
func (w IgnoreConditionsConflicts) ApplyToHelper(in *HelperOptions) {
in.IgnoreConditionConflicts = w.Conditions
}
7 changes: 4 additions & 3 deletions util/patch/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -114,7 +115,7 @@ func (h *Helper) Patch(ctx context.Context, obj runtime.Object, opts ...Option)
return kerrors.NewAggregate([]error{
h.patch(ctx, obj),
h.patchStatus(ctx, obj),
h.patchStatusConditions(ctx, obj),
h.patchStatusConditions(ctx, obj, options.IgnoreConditionConflicts),
})
}

Expand Down Expand Up @@ -151,7 +152,7 @@ func (h *Helper) patchStatus(ctx context.Context, obj runtime.Object) error {
//
// Condition changes are then applied to the latest version of the object, and if there are
// no unresolvable conflicts, the patch is sent again.
func (h *Helper) patchStatusConditions(ctx context.Context, obj runtime.Object) error {
func (h *Helper) patchStatusConditions(ctx context.Context, obj runtime.Object, ignoreConflicts []clusterv1.ConditionType) error {
// Nothing to do if the object isn't a condition patcher.
if !h.isConditionsSetter {
return nil
Expand Down Expand Up @@ -211,7 +212,7 @@ func (h *Helper) patchStatusConditions(ctx context.Context, obj runtime.Object)
conditionsPatch := client.MergeFromWithOptions(latest.DeepCopyObject(), client.MergeFromWithOptimisticLock{})

// Set the condition patch previously created on the new object.
if err := diff.Apply(latest); err != nil {
if err := diff.Apply(latest, conditions.IgnoreMergeConflicts(ignoreConflicts...)); err != nil {
return false, err
}

Expand Down
42 changes: 42 additions & 0 deletions util/patch/patch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,48 @@ var _ = Describe("Patch Helper", func() {
}, timeout).Should(BeTrue())
})

Specify("should return not an error if there is an unresolvable conflict but conflicts for the condition type should be ignored", func() {
obj := obj.DeepCopy()

By("Creating the object")
Expect(testEnv.Create(ctx, obj)).ToNot(HaveOccurred())
key := client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}
defer func() {
Expect(testEnv.Delete(ctx, obj)).To(Succeed())
}()
objCopy := obj.DeepCopy()

By("Marking a custom condition to be false")
conditions.MarkFalse(objCopy, clusterv1.ReadyCondition, "reason", clusterv1.ConditionSeverityInfo, "message")
Expect(testEnv.Status().Update(ctx, objCopy)).To(Succeed())

By("Validating that the local object's resource version is behind")
Expect(obj.ResourceVersion).ToNot(Equal(objCopy.ResourceVersion))

By("Creating a new patch helper")
patcher, err := NewHelper(obj, testEnv)
Expect(err).NotTo(HaveOccurred())

By("Marking Ready=True")
conditions.MarkTrue(obj, clusterv1.ReadyCondition)

By("Patching the object")
Expect(patcher.Patch(ctx, obj, IgnoreConditionsConflicts{Conditions: []clusterv1.ConditionType{clusterv1.ReadyCondition}})).To(Succeed())

By("Validating the object has been updated")
Eventually(func() bool {
objAfter := obj.DeepCopy()
if err := testEnv.Get(ctx, key, objAfter); err != nil {
return false
}

readyBefore := conditions.Get(obj, clusterv1.ReadyCondition)
readyAfter := conditions.Get(objAfter, clusterv1.ReadyCondition)

return cmp.Equal(readyBefore, readyAfter)
}, timeout).Should(BeTrue())
})

})
})

Expand Down

0 comments on commit c704c98

Please sign in to comment.