Skip to content

Commit

Permalink
fix: sync hooks should be deleted after sync phase/wave completion
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmt authored and Alexander Matyushentsev committed Jul 15, 2020
1 parent ee1db09 commit 85f170b
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 24 deletions.
28 changes: 15 additions & 13 deletions pkg/sync/sync_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
log "github.com/sirupsen/logrus"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -301,16 +300,7 @@ func (sc *syncContext) Sync() {
sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to get resource health: %v", err))
} else {
sc.setResourceResult(task, "", operationState, message)

// maybe delete the hook
if task.needsDeleting() {
err := sc.deleteResource(task)
if err != nil && !errors.IsNotFound(err) {
sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to delete resource: %v", err))
}
}
}

} else {
// this must be calculated on the live object
healthStatus, err := health.GetResourceHealth(task.liveObj, sc.healthOverride)
Expand Down Expand Up @@ -341,6 +331,17 @@ func (sc *syncContext) Sync() {
return
}

// delete all completed hooks which have appropriate delete policy
hooksPendingDeletion := tasks.Filter(func(task *syncTask) bool {
return task.isHook() && task.liveObj != nil && !task.running() && task.deleteOnPhaseCompletion()
})
for _, task := range hooksPendingDeletion {
err := sc.deleteResource(task)
if err != nil && !apierr.IsNotFound(err) {
sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to delete resource: %v", err))
}
}

// syncFailTasks only run during failure, so separate them from regular tasks
syncFailTasks, tasks := tasks.Split(func(t *syncTask) bool { return t.phase == common.SyncPhaseSyncFail })

Expand Down Expand Up @@ -388,7 +389,7 @@ func (sc *syncContext) Sync() {
}
default:
sc.setRunningPhase(tasks.Filter(func(task *syncTask) bool {
return task.needsDeleting()
return task.deleteOnPhaseCompletion()
}), true)
}
}
Expand Down Expand Up @@ -768,10 +769,11 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
wg.Wait()
}

hooksPendingDeletion := createTasks.Filter(func(t *syncTask) bool { return t.deleteBeforeCreation() })
// delete anything that need deleting
if runState == successful && createTasks.Any(func(t *syncTask) bool { return t.needsDeleting() }) {
if runState == successful && hooksPendingDeletion.Len() > 0 {
var wg sync.WaitGroup
for _, task := range createTasks.Filter(func(t *syncTask) bool { return t.needsDeleting() }) {
for _, task := range hooksPendingDeletion {
wg.Add(1)
go func(t *syncTask) {
defer wg.Done()
Expand Down
101 changes: 101 additions & 0 deletions pkg/sync/sync_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/rest"
testcore "k8s.io/client-go/testing"

"github.com/argoproj/gitops-engine/pkg/health"
synccommon "github.com/argoproj/gitops-engine/pkg/sync/common"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"github.com/argoproj/gitops-engine/pkg/utils/kube/kubetest"
Expand Down Expand Up @@ -593,6 +594,106 @@ func TestRunSyncFailHooksFailed(t *testing.T) {
assert.Equal(t, synccommon.ResultCodeSynced, resources[2].Status)
}

type resourceNameHealthOverride map[string]health.HealthStatusCode

func (r resourceNameHealthOverride) GetResourceHealth(obj *unstructured.Unstructured) (*health.HealthStatus, error) {
if status, ok := r[obj.GetName()]; ok {
return &health.HealthStatus{Status: status, Message: "test"}, nil
}
return nil, nil
}

func TestRunSync_HooksNotDeletedIfPhaseNotCompleted(t *testing.T) {
completedHook := newHook(synccommon.HookTypePreSync)
completedHook.SetName("completed-hook")
completedHook.SetNamespace(FakeArgoCDNamespace)
_ = Annotate(completedHook, synccommon.AnnotationKeyHookDeletePolicy, "HookSucceeded")

inProgressHook := newHook(synccommon.HookTypePreSync)
inProgressHook.SetNamespace(FakeArgoCDNamespace)
inProgressHook.SetName("in-progress-hook")
_ = Annotate(inProgressHook, synccommon.AnnotationKeyHookDeletePolicy, "HookSucceeded")

syncCtx := newTestSyncCtx(
WithHealthOverride(resourceNameHealthOverride(map[string]health.HealthStatusCode{
inProgressHook.GetName(): health.HealthStatusProgressing,
})),
WithInitialState(synccommon.OperationRunning, "", []synccommon.ResourceSyncResult{{
ResourceKey: kube.GetResourceKey(completedHook),
HookPhase: synccommon.OperationSucceeded,
SyncPhase: synccommon.SyncPhasePreSync,
}, {
ResourceKey: kube.GetResourceKey(inProgressHook),
HookPhase: synccommon.OperationRunning,
SyncPhase: synccommon.SyncPhasePreSync,
}}))
fakeDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme())
syncCtx.dynamicIf = fakeDynamicClient
deletedCount := 0
fakeDynamicClient.PrependReactor("delete", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
deletedCount += 1
return true, nil, nil
})
syncCtx.resources = groupResources(ReconciliationResult{
Live: []*unstructured.Unstructured{completedHook, inProgressHook},
Target: []*unstructured.Unstructured{nil, nil},
})
syncCtx.hooks = []*unstructured.Unstructured{completedHook, inProgressHook}

syncCtx.kubectl = &kubetest.MockKubectlCmd{
Commands: map[string]kubetest.KubectlOutput{},
}

syncCtx.Sync()

assert.Equal(t, synccommon.OperationRunning, syncCtx.phase)
assert.Equal(t, 0, deletedCount)
}

func TestRunSync_HooksDeletedAfterPhaseCompleted(t *testing.T) {
completedHook1 := newHook(synccommon.HookTypePreSync)
completedHook1.SetName("completed-hook1")
completedHook1.SetNamespace(FakeArgoCDNamespace)
_ = Annotate(completedHook1, synccommon.AnnotationKeyHookDeletePolicy, "HookSucceeded")

completedHook2 := newHook(synccommon.HookTypePreSync)
completedHook2.SetNamespace(FakeArgoCDNamespace)
completedHook2.SetName("completed-hook2")
_ = Annotate(completedHook2, synccommon.AnnotationKeyHookDeletePolicy, "HookSucceeded")

syncCtx := newTestSyncCtx(
WithInitialState(synccommon.OperationRunning, "", []synccommon.ResourceSyncResult{{
ResourceKey: kube.GetResourceKey(completedHook1),
HookPhase: synccommon.OperationSucceeded,
SyncPhase: synccommon.SyncPhasePreSync,
}, {
ResourceKey: kube.GetResourceKey(completedHook2),
HookPhase: synccommon.OperationSucceeded,
SyncPhase: synccommon.SyncPhasePreSync,
}}))
fakeDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme())
syncCtx.dynamicIf = fakeDynamicClient
deletedCount := 0
fakeDynamicClient.PrependReactor("delete", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) {
deletedCount += 1
return true, nil, nil
})
syncCtx.resources = groupResources(ReconciliationResult{
Live: []*unstructured.Unstructured{completedHook1, completedHook2},
Target: []*unstructured.Unstructured{nil, nil},
})
syncCtx.hooks = []*unstructured.Unstructured{completedHook1, completedHook2}

syncCtx.kubectl = &kubetest.MockKubectlCmd{
Commands: map[string]kubetest.KubectlOutput{},
}

syncCtx.Sync()

assert.Equal(t, synccommon.OperationSucceeded, syncCtx.phase)
assert.Equal(t, 2, deletedCount)
}

func Test_syncContext_liveObj(t *testing.T) {
type fields struct {
compareResult ReconciliationResult
Expand Down
9 changes: 6 additions & 3 deletions pkg/sync/sync_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,11 @@ func (t *syncTask) hasHookDeletePolicy(policy common.HookDeletePolicy) bool {
return false
}

func (t *syncTask) needsDeleting() bool {
return t.liveObj != nil && (t.pending() && t.hasHookDeletePolicy(common.HookDeletePolicyBeforeHookCreation) ||
t.successful() && t.hasHookDeletePolicy(common.HookDeletePolicyHookSucceeded) ||
func (t *syncTask) deleteBeforeCreation() bool {
return t.liveObj != nil && t.pending() && t.hasHookDeletePolicy(common.HookDeletePolicyBeforeHookCreation)
}

func (t *syncTask) deleteOnPhaseCompletion() bool {
return t.liveObj != nil && (t.successful() && t.hasHookDeletePolicy(common.HookDeletePolicyHookSucceeded) ||
t.failed() && t.hasHookDeletePolicy(common.HookDeletePolicyHookFailed))
}
22 changes: 14 additions & 8 deletions pkg/sync/sync_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,22 @@ func Test_syncTask_hasHookDeletePolicy(t *testing.T) {
assert.True(t, (&syncTask{targetObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookFailed")}).hasHookDeletePolicy(common.HookDeletePolicyHookFailed))
}

func Test_syncTask_needsDeleting(t *testing.T) {
assert.False(t, (&syncTask{liveObj: NewPod()}).needsDeleting())
func Test_syncTask_deleteOnPhaseCompletion(t *testing.T) {
assert.False(t, (&syncTask{liveObj: NewPod()}).deleteOnPhaseCompletion())
// must be hook
assert.False(t, (&syncTask{liveObj: Annotate(NewPod(), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).needsDeleting())
assert.True(t, (&syncTask{operationState: common.OperationSucceeded, liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookSucceeded")}).deleteOnPhaseCompletion())
assert.True(t, (&syncTask{operationState: common.OperationFailed, liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookFailed")}).deleteOnPhaseCompletion())
}

func Test_syncTask_deleteBeforeCreation(t *testing.T) {
assert.False(t, (&syncTask{liveObj: NewPod()}).deleteBeforeCreation())
// must be hook
assert.False(t, (&syncTask{liveObj: Annotate(NewPod(), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation())
// no need to delete if no live obj
assert.False(t, (&syncTask{targetObj: Annotate(Annotate(NewPod(), "argoocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).needsDeleting())
assert.True(t, (&syncTask{liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).needsDeleting())
assert.True(t, (&syncTask{liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).needsDeleting())
assert.True(t, (&syncTask{operationState: common.OperationSucceeded, liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookSucceeded")}).needsDeleting())
assert.True(t, (&syncTask{operationState: common.OperationFailed, liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookFailed")}).needsDeleting())
assert.False(t, (&syncTask{targetObj: Annotate(Annotate(NewPod(), "argoocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation())
assert.True(t, (&syncTask{liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation())
assert.True(t, (&syncTask{liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation())

}

func Test_syncTask_wave(t *testing.T) {
Expand Down

0 comments on commit 85f170b

Please sign in to comment.