Skip to content

Commit

Permalink
feat: add write back to application informer (argoproj#15987)
Browse files Browse the repository at this point in the history
Signed-off-by: Soumya Ghosh Dastidar <[email protected]>
Signed-off-by: gdsoumya <[email protected]>
Signed-off-by: jmilic1 <[email protected]>
  • Loading branch information
gdsoumya authored and jmilic1 committed Nov 13, 2023
1 parent 9e95e16 commit 9c98c62
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 23 deletions.
34 changes: 27 additions & 7 deletions controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1331,8 +1331,7 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
}

kube.RetryUntilSucceed(context.Background(), updateOperationStateTimeout, "Update application operation state", logutils.NewLogrusLogger(logutils.NewWithCurrentConfig()), func() error {
appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace)
_, err = appClient.Patch(context.Background(), app.Name, types.MergePatchType, patchJSON, metav1.PatchOptions{})
_, err := ctrl.PatchAppWithWriteBack(context.Background(), app.Name, app.Namespace, types.MergePatchType, patchJSON, metav1.PatchOptions{})
if err != nil {
// Stop retrying updating deleted application
if apierr.IsNotFound(err) {
Expand Down Expand Up @@ -1370,6 +1369,27 @@ func (ctrl *ApplicationController) setOperationState(app *appv1.Application, sta
}
}

// writeBackToInformer writes a just recently updated App back into the informer cache.
// This prevents the situation where the controller operates on a stale app and repeats work
func (ctrl *ApplicationController) writeBackToInformer(app *appv1.Application) {
logCtx := log.WithFields(log.Fields{"application": app.Name, "appNamespace": app.Namespace, "project": app.Spec.Project, "informer-writeBack": true})
err := ctrl.appInformer.GetStore().Update(app)
if err != nil {
logCtx.Errorf("failed to update informer store: %v", err)
return
}
}

// PatchAppWithWriteBack patches an application and writes it back to the informer cache
func (ctrl *ApplicationController) PatchAppWithWriteBack(ctx context.Context, name, ns string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *appv1.Application, err error) {
patchedApp, err := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(ns).Patch(ctx, name, pt, data, opts, subresources...)
if err != nil {
return patchedApp, err
}
ctrl.writeBackToInformer(patchedApp)
return patchedApp, err
}

func (ctrl *ApplicationController) processAppRefreshQueueItem() (processNext bool) {
patchMs := time.Duration(0) // time spent in doing patch/update calls
setOpMs := time.Duration(0) // time spent in doing Operation patch calls in autosync
Expand Down Expand Up @@ -1643,8 +1663,7 @@ func (ctrl *ApplicationController) normalizeApplication(orig, app *appv1.Applica
if err != nil {
logCtx.Errorf("error constructing app spec patch: %v", err)
} else if modified {
appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace)
_, err = appClient.Patch(context.Background(), app.Name, types.MergePatchType, patch, metav1.PatchOptions{})
_, err := ctrl.PatchAppWithWriteBack(context.Background(), app.Name, app.Namespace, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
logCtx.Errorf("Error persisting normalized application spec: %v", err)
} else {
Expand Down Expand Up @@ -1688,8 +1707,7 @@ func (ctrl *ApplicationController) persistAppStatus(orig *appv1.Application, new
defer func() {
patchMs = time.Since(start)
}()
appClient := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(orig.Namespace)
_, err = appClient.Patch(context.Background(), orig.Name, types.MergePatchType, patch, metav1.PatchOptions{})
_, err = ctrl.PatchAppWithWriteBack(context.Background(), orig.Name, orig.Namespace, types.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
logCtx.Warnf("Error updating application: %v", err)
} else {
Expand Down Expand Up @@ -1799,7 +1817,7 @@ func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *

appIf := ctrl.applicationClientset.ArgoprojV1alpha1().Applications(app.Namespace)
start := time.Now()
_, err := argo.SetAppOperation(appIf, app.Name, &op)
updatedApp, err := argo.SetAppOperation(appIf, app.Name, &op)
setOpTime := time.Since(start)
if err != nil {
if goerrors.Is(err, argo.ErrAnotherOperationInProgress) {
Expand All @@ -1811,6 +1829,8 @@ func (ctrl *ApplicationController) autoSync(app *appv1.Application, syncStatus *

logCtx.Errorf("Failed to initiate auto-sync to %s: %v", desiredCommitSHA, err)
return &appv1.ApplicationCondition{Type: appv1.ApplicationConditionSyncError, Message: err.Error()}, setOpTime
} else {
ctrl.writeBackToInformer(updatedApp)
}
message := fmt.Sprintf("Initiated automated sync to '%s'", desiredCommitSHA)
ctrl.auditLogger.LogAppEvent(app, argo.EventInfo{Reason: argo.EventReasonOperationStarted, Type: v1.EventTypeNormal}, message, "")
Expand Down
32 changes: 16 additions & 16 deletions controller/appcontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func TestFinalizeAppDeletion(t *testing.T) {
})
fakeAppCs.AddReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patched = true
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})
_, err := ctrl.finalizeApplicationDeletion(app, func(project string) ([]*v1alpha1.Cluster, error) {
return []*v1alpha1.Cluster{}, nil
Expand Down Expand Up @@ -675,7 +675,7 @@ func TestFinalizeAppDeletion(t *testing.T) {
})
fakeAppCs.AddReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patched = true
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})
objs, err := ctrl.finalizeApplicationDeletion(app, func(project string) ([]*v1alpha1.Cluster, error) {
return []*v1alpha1.Cluster{}, nil
Expand Down Expand Up @@ -709,7 +709,7 @@ func TestFinalizeAppDeletion(t *testing.T) {
})
fakeAppCs.AddReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patched = true
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})
_, err := ctrl.finalizeApplicationDeletion(app, func(project string) ([]*v1alpha1.Cluster, error) {
return []*v1alpha1.Cluster{}, nil
Expand Down Expand Up @@ -804,7 +804,7 @@ func TestNormalizeApplication(t *testing.T) {
normalized = true
}
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})
ctrl.processAppRefreshQueueItem()
assert.True(t, normalized)
Expand All @@ -826,7 +826,7 @@ func TestNormalizeApplication(t *testing.T) {
normalized = true
}
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})
ctrl.processAppRefreshQueueItem()
assert.False(t, normalized)
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestSetOperationStateOnDeletedApp(t *testing.T) {
patched := false
fakeAppCs.AddReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patched = true
return true, nil, apierr.NewNotFound(schema.GroupResource{}, "my-app")
return true, &v1alpha1.Application{}, apierr.NewNotFound(schema.GroupResource{}, "my-app")
})
ctrl.setOperationState(newFakeApp(), &v1alpha1.OperationState{Phase: synccommon.OperationSucceeded})
assert.True(t, patched)
Expand Down Expand Up @@ -955,9 +955,9 @@ func TestSetOperationStateLogRetries(t *testing.T) {
fakeAppCs.AddReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
if !patched {
patched = true
return true, nil, errors.New("fake error")
return true, &v1alpha1.Application{}, errors.New("fake error")
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})
ctrl.setOperationState(newFakeApp(), &v1alpha1.OperationState{Phase: synccommon.OperationSucceeded})
assert.True(t, patched)
Expand Down Expand Up @@ -1273,7 +1273,7 @@ func TestUpdateReconciledAt(t *testing.T) {
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})

t.Run("UpdatedOnFullReconciliation", func(t *testing.T) {
Expand Down Expand Up @@ -1347,7 +1347,7 @@ func TestFinalizeProjectDeletion_HasApplications(t *testing.T) {
patched := false
fakeAppCs.PrependReactor("patch", "*", func(action kubetesting.Action) (handled bool, ret runtime.Object, err error) {
patched = true
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})

err := ctrl.finalizeProjectDeletion(proj)
Expand All @@ -1365,7 +1365,7 @@ func TestFinalizeProjectDeletion_DoesNotHaveApplications(t *testing.T) {
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
}
return true, nil, nil
return true, &v1alpha1.AppProject{}, nil
})

err := ctrl.finalizeProjectDeletion(proj)
Expand All @@ -1390,7 +1390,7 @@ func TestProcessRequestedAppOperation_FailedNoRetries(t *testing.T) {
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})

ctrl.processRequestedAppOperation(app)
Expand Down Expand Up @@ -1418,7 +1418,7 @@ func TestProcessRequestedAppOperation_InvalidDestination(t *testing.T) {
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})
}()

Expand All @@ -1444,7 +1444,7 @@ func TestProcessRequestedAppOperation_FailedHasRetries(t *testing.T) {
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})

ctrl.processRequestedAppOperation(app)
Expand Down Expand Up @@ -1487,7 +1487,7 @@ func TestProcessRequestedAppOperation_RunningPreviouslyFailed(t *testing.T) {
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})

ctrl.processRequestedAppOperation(app)
Expand Down Expand Up @@ -1520,7 +1520,7 @@ func TestProcessRequestedAppOperation_HasRetriesTerminated(t *testing.T) {
if patchAction, ok := action.(kubetesting.PatchAction); ok {
assert.NoError(t, json.Unmarshal(patchAction.GetPatch(), &receivedPatch))
}
return true, nil, nil
return true, &v1alpha1.Application{}, nil
})

ctrl.processRequestedAppOperation(app)
Expand Down

0 comments on commit 9c98c62

Please sign in to comment.