Skip to content

Commit

Permalink
avoid interacting with nil mutatedWfs (flyteorg#477)
Browse files Browse the repository at this point in the history
* avoid interacting with nil mutatedWfs

Signed-off-by: Babis Kiosidis <[email protected]>

* only strip crd fields from mutated wfs

Signed-off-by: Babis Kiosidis <[email protected]>

* test failure in executor

Signed-off-by: Babis Kiosidis <[email protected]>

Signed-off-by: Babis Kiosidis <[email protected]>
  • Loading branch information
ckiosidis authored Aug 25, 2022
1 parent 5b28a98 commit 031a005
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
4 changes: 3 additions & 1 deletion flytepropeller/pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
if wfClosureCrdFields != nil {
// strip data populated from WorkflowClosureReference
w.SubWorkflows, w.Tasks, w.WorkflowSpec = nil, nil, nil
mutatedWf.SubWorkflows, mutatedWf.Tasks, mutatedWf.WorkflowSpec = nil, nil, nil
if mutatedWf != nil {
mutatedWf.SubWorkflows, mutatedWf.Tasks, mutatedWf.WorkflowSpec = nil, nil, nil
}
}

if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions flytepropeller/pkg/controller/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,4 +894,25 @@ func TestPropellerHandler_OffloadedWorkflowClosure(t *testing.T) {
err := p.Handle(ctx, namespace, name)
assert.Error(t, err)
})

t.Run("TryMutate failure is handled", func(t *testing.T) {
scope := promutils.NewTestScope()

protoStore := &storagemocks.ComposedProtobufStore{}
protoStore.OnReadProtobufMatch(mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("foo"))
exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error {
return fmt.Errorf("foo")
}
dataStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, protoStore)
p := NewPropellerHandler(ctx, cfg, dataStore, s, exec, scope)

err := p.Handle(ctx, namespace, name)
assert.Error(t, err, "foo")

r, err := s.Get(ctx, namespace, name)
assert.NoError(t, err)
assert.Nil(t, r.WorkflowSpec)
assert.Nil(t, r.SubWorkflows)
assert.Nil(t, r.Tasks)
})
}

0 comments on commit 031a005

Please sign in to comment.