Skip to content

Commit

Permalink
feat: writeback rollout updates to informer to prevent stale data (#726)
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen authored Sep 22, 2020
1 parent fa3ddf0 commit a96bbdb
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 4 deletions.
2 changes: 2 additions & 0 deletions rollout/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type rolloutContext struct {
log *log.Entry
// rollout is the rollout being reconciled
rollout *v1alpha1.Rollout
// newRollout is the rollout after reconciliation. used to write back to informer
newRollout *v1alpha1.Rollout
// newRS is the "new" ReplicaSet. Also referred to as current, or desired.
// newRS will be nil when the pod template spec changes.
newRS *appsv1.ReplicaSet
Expand Down
26 changes: 25 additions & 1 deletion rollout/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -120,6 +121,7 @@ type reconcilerBase struct {

replicaSetLister appslisters.ReplicaSetLister
replicaSetSynced cache.InformerSynced
rolloutsInformer cache.SharedIndexInformer
rolloutsLister listers.RolloutLister
rolloutsSynced cache.InformerSynced
rolloutsIndexer cache.Indexer
Expand Down Expand Up @@ -171,6 +173,7 @@ func NewController(cfg ControllerConfig) *Controller {
defaultTrafficSplitVersion: cfg.DefaultTrafficSplitVersion,
replicaSetLister: cfg.ReplicaSetInformer.Lister(),
replicaSetSynced: cfg.ReplicaSetInformer.Informer().HasSynced,
rolloutsInformer: cfg.RolloutsInformer.Informer(),
rolloutsIndexer: cfg.RolloutsInformer.Informer().GetIndexer(),
rolloutsLister: cfg.RolloutsInformer.Lister(),
rolloutsSynced: cfg.RolloutsInformer.Informer().HasSynced,
Expand Down Expand Up @@ -406,7 +409,28 @@ func (c *Controller) syncHandler(key string) error {
if err != nil {
return err
}
return roCtx.reconcile()
err = roCtx.reconcile()
if roCtx.newRollout != nil {
c.writeBackToInformer(roCtx.newRollout)
}
return err
}

// writeBackToInformer writes a just recently updated Rollout back into the informer cache.
// This prevents the situation where the controller operates on a stale rollout and repeats work
func (c *Controller) writeBackToInformer(ro *v1alpha1.Rollout) {
logCtx := logutil.WithRollout(ro)
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ro)
if err != nil {
logCtx.Errorf("failed to convert rollout to unstructured: %v", err)
return
}
un := unstructured.Unstructured{Object: obj}
err = c.rolloutsInformer.GetStore().Update(&un)
if err != nil {
logCtx.Errorf("failed to update informer store: %v", err)
return
}
}

func (c *Controller) newRolloutContext(rollout *v1alpha1.Rollout) (*rolloutContext, error) {
Expand Down
31 changes: 31 additions & 0 deletions rollout/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
extensionsv1beta1 "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -1474,3 +1475,33 @@ func TestRolloutStrategyNotSet(t *testing.T) {
patchedRolout := f.getPatchedRollout(patchIndex)
assert.Contains(t, patchedRolout, `Rollout has missing field '.spec.strategy.canary or .spec.strategy.blueGreen'`)
}

// TestWriteBackToInformer verifies that after a rollout reconciles, the new version of the rollout
// is written back to the informer
func TestWriteBackToInformer(t *testing.T) {
f := newFixture(t)
defer f.Close()

r1 := newCanaryRollout("foo", 10, nil, nil, int32Ptr(0), intstr.FromInt(1), intstr.FromInt(0))
r1.Status.StableRS = ""
rs1 := newReplicaSetWithStatus(r1, 10, 10)

f.rolloutLister = append(f.rolloutLister, r1)
f.objects = append(f.objects, r1)

f.kubeobjects = append(f.kubeobjects, rs1)
f.replicaSetLister = append(f.replicaSetLister, rs1)

f.expectPatchRolloutAction(r1)

c, i, k8sI := f.newController(noResyncPeriodFunc)
roKey := getKey(r1, t)
f.runController(roKey, true, false, c, i, k8sI)

// Verify the informer was updated with the new unstructured object after reconciliation
obj, _, _ := c.rolloutsIndexer.GetByKey(roKey)
un := obj.(*unstructured.Unstructured)
stableRS, _, _ := unstructured.NestedString(un.Object, "status", "stableRS")
assert.NotEmpty(t, stableRS)
assert.Equal(t, rs1.Labels[v1alpha1.DefaultRolloutUniqueLabelKey], stableRS)
}
6 changes: 3 additions & 3 deletions rollout/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,13 @@ func (c *rolloutContext) persistRolloutStatus(newStatus *v1alpha1.RolloutStatus)
c.requeueStuckRollout(*newStatus)
return nil
}
c.log.Debugf("Rollout Patch: %s", patch)
_, err = c.argoprojclientset.ArgoprojV1alpha1().Rollouts(c.rollout.Namespace).Patch(c.rollout.Name, patchtypes.MergePatchType, patch)
newRollout, err := c.argoprojclientset.ArgoprojV1alpha1().Rollouts(c.rollout.Namespace).Patch(c.rollout.Name, patchtypes.MergePatchType, patch)
if err != nil {
c.log.Warningf("Error updating application: %v", err)
return err
}
c.log.Info("Patch status successfully")
c.log.Infof("Patched: %s", patch)
c.newRollout = newRollout
return nil
}

Expand Down

0 comments on commit a96bbdb

Please sign in to comment.