Skip to content

Commit

Permalink
fix: fix bugs in throttler and syncManager initialization in Workflow…
Browse files Browse the repository at this point in the history
…Controller (#11210)

Signed-off-by: smile-luobin <[email protected]>
  • Loading branch information
smile-luobin authored Jun 26, 2023
1 parent 29d63c5 commit 451d275
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 4 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func (wfc *WorkflowController) initManagers(ctx context.Context) error {
labelSelector = labelSelector.Add(*req)
}
listOpts := metav1.ListOptions{LabelSelector: labelSelector.String()}
wfList, err := wfc.wfclientset.ArgoprojV1alpha1().Workflows(wfc.namespace).List(ctx, listOpts)
wfList, err := wfc.wfclientset.ArgoprojV1alpha1().Workflows(wfc.GetManagedNamespace()).List(ctx, listOpts)
if err != nil {
return err
}
Expand Down
114 changes: 111 additions & 3 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ func expectWorkflow(ctx context.Context, controller *WorkflowController, name st
test(wf)
}

func expectNamespacedWorkflow(ctx context.Context, controller *WorkflowController, namespace, name string, test func(wf *wfv1.Workflow)) {
wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
panic(err)
}
test(wf)
}

func getPod(woc *wfOperationCtx, name string) (*apiv1.Pod, error) {
return woc.controller.kubeclientset.CoreV1().Pods(woc.wf.Namespace).Get(context.Background(), name, metav1.GetOptions{})
}
Expand Down Expand Up @@ -794,9 +802,6 @@ func TestParallelismWithInitializeRunningWorkflows(t *testing.T) {
"Parallelism": func(x *WorkflowController) {
x.Config.Parallelism = 1
},
"NamespaceParallelism": func(x *WorkflowController) {
x.Config.NamespaceParallelism = 1
},
} {
t.Run(tt, func(t *testing.T) {
cancel, controller := newController(
Expand Down Expand Up @@ -860,6 +865,109 @@ status:
}
}

func TestNamespaceParallelismWithInitializeRunningWorkflows(t *testing.T) {
for tt, f := range map[string]func(controller *WorkflowController){
"NamespaceParallelism": func(x *WorkflowController) {
x.Config.NamespaceParallelism = 1
},
} {
t.Run(tt, func(t *testing.T) {
cancel, controller := newController(
wfv1.MustUnmarshalWorkflow(`
metadata:
name: my-ns-0-wf-0
namespace: ns-0
creationTimestamp: 2023-06-13T16:39:00Z
spec:
entrypoint: main
templates:
- name: main
container:
image: my-image
`),
wfv1.MustUnmarshalWorkflow(`
metadata:
name: my-ns-1-wf-0
namespace: ns-1
creationTimestamp: 2023-06-13T16:40:00Z
spec:
entrypoint: main
templates:
- name: main
container:
image: my-image
`),
wfv1.MustUnmarshalWorkflow(`
metadata:
name: my-ns-0-wf-1
namespace: ns-0
creationTimestamp: 2023-06-13T16:41:00Z
labels:
workflows.argoproj.io/phase: Running
spec:
entrypoint: main
templates:
- name: main
container:
image: my-image
status:
phase: Running
`),
wfv1.MustUnmarshalWorkflow(`
metadata:
name: my-ns-1-wf-1
namespace: ns-1
creationTimestamp: 2023-06-13T16:42:00Z
labels:
workflows.argoproj.io/phase: Running
spec:
entrypoint: main
templates:
- name: main
container:
image: my-image
status:
phase: Running
`),
f,
)
defer cancel()
ctx := context.Background()

ns0PendingWfTested := false
ns1PendingWfTested := false
for {
assert.True(t, controller.processNextItem(ctx))
if !ns0PendingWfTested {
expectNamespacedWorkflow(ctx, controller, "ns-0", "my-ns-0-wf-0", func(wf *wfv1.Workflow) {
if assert.NotNil(t, wf) {
if wf.Status.Phase != "" {
assert.Equal(t, wfv1.WorkflowPending, wf.Status.Phase)
assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message)
ns0PendingWfTested = true
}
}
})
}
if !ns1PendingWfTested {
expectNamespacedWorkflow(ctx, controller, "ns-1", "my-ns-1-wf-0", func(wf *wfv1.Workflow) {
if assert.NotNil(t, wf) {
if wf.Status.Phase != "" {
assert.Equal(t, wfv1.WorkflowPending, wf.Status.Phase)
assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message)
ns1PendingWfTested = true
}
}
})
}
if ns0PendingWfTested && ns1PendingWfTested {
break
}
}
})
}
}

func TestPodCleanupRetryIsReset(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`
metadata:
Expand Down

0 comments on commit 451d275

Please sign in to comment.