From 7c962819d1d79d1e2ece79ee4d8854dce5158dee Mon Sep 17 00:00:00 2001 From: smile-luobin Date: Tue, 27 Jun 2023 03:34:20 +0800 Subject: [PATCH] fix: fix bugs in throttler and syncManager initialization in WorkflowController (#11210) Signed-off-by: smile-luobin --- workflow/controller/controller.go | 2 +- workflow/controller/controller_test.go | 114 ++++++++++++++++++++++++- 2 files changed, 112 insertions(+), 4 deletions(-) diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 274e5659c6d2..3dc148cfe8ca 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -365,7 +365,7 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error { labelSelector = labelSelector.Add(*req) } listOpts := metav1.ListOptions{LabelSelector: labelSelector.String()} - wfList, err := wfc.wfclientset.ArgoprojV1alpha1().Workflows(wfc.namespace).List(ctx, listOpts) + wfList, err := wfc.wfclientset.ArgoprojV1alpha1().Workflows(wfc.GetManagedNamespace()).List(ctx, listOpts) if err != nil { return err } diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index ed9bf6012c91..46ef50cd1db1 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -308,6 +308,14 @@ func expectWorkflow(ctx context.Context, controller *WorkflowController, name st test(wf) } +func expectNamespacedWorkflow(ctx context.Context, controller *WorkflowController, namespace, name string, test func(wf *wfv1.Workflow)) { + wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + panic(err) + } + test(wf) +} + func getPod(woc *wfOperationCtx, name string) (*apiv1.Pod, error) { return woc.controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Get(context.Background(), name, metav1.GetOptions{}) } @@ -794,9 +802,6 @@ func TestParallelismWithInitializeRunningWorkflows(t *testing.T) { "Parallelism": func(x *WorkflowController) { x.Config.Parallelism = 1 }, - "NamespaceParallelism": func(x *WorkflowController) { - x.Config.NamespaceParallelism = 1 - }, } { t.Run(tt, func(t *testing.T) { cancel, controller := newController( @@ -860,6 +865,109 @@ status: } } +func TestNamespaceParallelismWithInitializeRunningWorkflows(t *testing.T) { + for tt, f := range map[string]func(controller *WorkflowController){ + "NamespaceParallelism": func(x *WorkflowController) { + x.Config.NamespaceParallelism = 1 + }, + } { + t.Run(tt, func(t *testing.T) { + cancel, controller := newController( + wfv1.MustUnmarshalWorkflow(` +metadata: + name: my-ns-0-wf-0 + namespace: ns-0 + creationTimestamp: 2023-06-13T16:39:00Z +spec: + entrypoint: main + templates: + - name: main + container: + image: my-image +`), + wfv1.MustUnmarshalWorkflow(` +metadata: + name: my-ns-1-wf-0 + namespace: ns-1 + creationTimestamp: 2023-06-13T16:40:00Z +spec: + entrypoint: main + templates: + - name: main + container: + image: my-image +`), + wfv1.MustUnmarshalWorkflow(` +metadata: + name: my-ns-0-wf-1 + namespace: ns-0 + creationTimestamp: 2023-06-13T16:41:00Z + labels: + workflows.argoproj.io/phase: Running +spec: + entrypoint: main + templates: + - name: main + container: + image: my-image +status: + phase: Running +`), + wfv1.MustUnmarshalWorkflow(` +metadata: + name: my-ns-1-wf-1 + namespace: ns-1 + creationTimestamp: 2023-06-13T16:42:00Z + labels: + workflows.argoproj.io/phase: Running +spec: + entrypoint: main + templates: + - name: main + container: + image: my-image +status: + phase: Running +`), + f, + ) + defer cancel() + ctx := context.Background() + + ns0PendingWfTested := false + ns1PendingWfTested := false + for { + assert.True(t, controller.processNextItem(ctx)) + if !ns0PendingWfTested { + expectNamespacedWorkflow(ctx, controller, "ns-0", "my-ns-0-wf-0", func(wf *wfv1.Workflow) { + if assert.NotNil(t, wf) { + if wf.Status.Phase != "" { + assert.Equal(t, wfv1.WorkflowPending, wf.Status.Phase) + assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message) + ns0PendingWfTested = true + } + } + }) + } + if !ns1PendingWfTested { + expectNamespacedWorkflow(ctx, controller, "ns-1", "my-ns-1-wf-0", func(wf *wfv1.Workflow) { + if assert.NotNil(t, wf) { + if wf.Status.Phase != "" { + assert.Equal(t, wfv1.WorkflowPending, wf.Status.Phase) + assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message) + ns1PendingWfTested = true + } + } + }) + } + if ns0PendingWfTested && ns1PendingWfTested { + break + } + } + }) + } +} + func TestPodCleanupRetryIsReset(t *testing.T) { wf := wfv1.MustUnmarshalWorkflow(` metadata: