Skip to content

Commit

Permalink
Including all upstream node deps on BranchNode subnode execution (fly…
Browse files Browse the repository at this point in the history
…teorg#543)

* waiting for upstream nodes on branch subnode evaluation

Signed-off-by: Daniel Rammer <[email protected]>

* removed dead comments

Signed-off-by: Daniel Rammer <[email protected]>

* added unit test

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Mar 24, 2023
1 parent e462412 commit b1e5482
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 27 deletions.
82 changes: 82 additions & 0 deletions pkg/controller/executors/mocks/node_lookup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion pkg/controller/executors/node_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@ import (
type NodeLookup interface {
GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool)
GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus
// Lookup for upstream edges, find all node ids from which this node can be reached.
ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error)
// Lookup for downstream edges, find all node ids that can be reached from the given node id.
FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error)
}

// Implements a contextual NodeLookup that can be composed of a disparate NodeGetter and a NodeStatusGetter
type contextualNodeLookup struct {
v1alpha1.NodeGetter
v1alpha1.NodeStatusGetter
DAGStructure
}

// Returns a Contextual NodeLookup using the given NodeGetter and a separate NodeStatusGetter.
// Very useful in Subworkflows where the Subworkflow is the reservoir of the nodes, but the status for these nodes
// maybe stored int he Top-level workflow node itself.
func NewNodeLookup(n v1alpha1.NodeGetter, s v1alpha1.NodeStatusGetter) NodeLookup {
func NewNodeLookup(n v1alpha1.NodeGetter, s v1alpha1.NodeStatusGetter, d DAGStructure) NodeLookup {
return contextualNodeLookup{
NodeGetter: n,
NodeStatusGetter: s,
DAGStructure: d,
}
}

Expand All @@ -45,6 +51,14 @@ func (s staticNodeLookup) GetNodeExecutionStatus(_ context.Context, id v1alpha1.
return s.status[id]
}

func (s staticNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
return nil, nil
}

func (s staticNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
return nil, nil
}

// Returns a new NodeLookup useful in Testing. Not recommended to be used in production
func NewTestNodeLookup(nodes map[v1alpha1.NodeID]v1alpha1.ExecutableNode, status map[v1alpha1.NodeID]v1alpha1.ExecutableNodeStatus) NodeLookup {
return staticNodeLookup{
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/executors/node_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ type nsg struct {
v1alpha1.NodeStatusGetter
}

type dag struct {
DAGStructure
}

func TestNewNodeLookup(t *testing.T) {
n := ng{}
ns := nsg{}
nl := NewNodeLookup(n, ns)
d := dag{}
nl := NewNodeLookup(n, ns, d)
assert.NotNil(t, nl)
typed := nl.(contextualNodeLookup)
assert.Equal(t, n, typed.NodeGetter)
assert.Equal(t, ns, typed.NodeStatusGetter)
assert.Equal(t, d, typed.DAGStructure)
}

func TestNewTestNodeLookup(t *testing.T) {
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node
childNodeStatus := nl.GetNodeExecutionStatus(ctx, branchTakenNode.GetID())
childNodeStatus.SetDataDir(nodeStatus.GetDataDir())
childNodeStatus.SetOutputDir(nodeStatus.GetOutputDir())
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return handler.UnknownTransition, err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return handler.UnknownTransition, err
Expand Down Expand Up @@ -196,7 +200,11 @@ func (b *branchHandler) Abort(ctx context.Context, nCtx handler.NodeExecutionCon
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return err
Expand Down Expand Up @@ -236,7 +244,11 @@ func (b *branchHandler) Finalize(ctx context.Context, nCtx handler.NodeExecution
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return err
Expand Down
45 changes: 30 additions & 15 deletions pkg/controller/nodes/branch/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,34 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) {
isErr bool
expectedPhase handler.EPhase
childPhase v1alpha1.NodePhase
nl *execMocks.NodeLookup
upstreamNodeID string
}{
{"upstreamNodeExists", executors.NodeStatusPending, nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, "n2"},
{"childNodeError", executors.NodeStatusUndefined, fmt.Errorf("err"),
&mocks2.ExecutableNodeStatus{}, bn, true, handler.EPhaseUndefined, v1alpha1.NodePhaseFailed, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, true, handler.EPhaseUndefined, v1alpha1.NodePhaseFailed, ""},
{"childPending", executors.NodeStatusPending, nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, ""},
{"childStillRunning", executors.NodeStatusRunning, nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseRunning, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseRunning, ""},
{"childFailure", executors.NodeStatusFailed(expectedError), nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseFailed, v1alpha1.NodePhaseFailed, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseFailed, v1alpha1.NodePhaseFailed, ""},
{"childComplete", executors.NodeStatusComplete, nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseSuccess, v1alpha1.NodePhaseSucceeded, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseSuccess, v1alpha1.NodePhaseSucceeded, ""},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
eCtx := &execMocks.ExecutionContext{}
eCtx.OnGetParentInfo().Return(parentInfo{})
nCtx, _ := createNodeContext(v1alpha1.BranchNodeNotYetEvaluated, &childNodeID, n, nil, test.nl, eCtx)

mockNodeLookup := &execMocks.NodeLookup{}
if len(test.upstreamNodeID) > 0 {
mockNodeLookup.OnToNodeMatch(childNodeID).Return([]string{test.upstreamNodeID}, nil)
} else {
mockNodeLookup.OnToNodeMatch(childNodeID).Return(nil, nil)
}

nCtx, _ := createNodeContext(v1alpha1.BranchNodeNotYetEvaluated, &childNodeID, n, nil, mockNodeLookup, eCtx)
newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt())
expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo)
mockNodeExecutor := &execMocks.Node{}
Expand All @@ -187,23 +197,27 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) {
fList, err1 := d.FromNode("x")
dList, err2 := d.ToNode(childNodeID)
b := assert.NoError(t, err1)
b = b && assert.Equal(t, fList, []v1alpha1.NodeID{})
b = b && assert.Equal(t, []v1alpha1.NodeID{}, fList)
b = b && assert.NoError(t, err2)
b = b && assert.Equal(t, dList, []v1alpha1.NodeID{nodeID})
dListExpected := []v1alpha1.NodeID{nodeID}
if len(test.upstreamNodeID) > 0 {
dListExpected = append([]string{test.upstreamNodeID}, dListExpected...)
}
b = b && assert.Equal(t, dListExpected, dList)
return b
}
return false
}),
mock.MatchedBy(func(lookup executors.NodeLookup) bool { return assert.Equal(t, lookup, test.nl) }),
mock.MatchedBy(func(lookup executors.NodeLookup) bool { return assert.Equal(t, lookup, mockNodeLookup) }),
mock.MatchedBy(func(n v1alpha1.ExecutableNode) bool { return assert.Equal(t, n.GetID(), childNodeID) }),
).Return(test.ns, test.err)

childNodeStatus := &mocks2.ExecutableNodeStatus{}
if test.nl != nil {
if mockNodeLookup != nil {
childNodeStatus.OnGetOutputDir().Return("parent-output-dir")
test.nodeStatus.OnGetDataDir().Return("parent-data-dir")
test.nodeStatus.OnGetOutputDir().Return("parent-output-dir")
test.nl.OnGetNodeExecutionStatus(ctx, childNodeID).Return(childNodeStatus)
mockNodeLookup.OnGetNodeExecutionStatus(ctx, childNodeID).Return(childNodeStatus)
childNodeStatus.On("SetDataDir", storage.DataReference("parent-data-dir")).Once()
childNodeStatus.On("SetOutputDir", storage.DataReference("parent-output-dir")).Once()
}
Expand Down Expand Up @@ -295,17 +309,18 @@ func TestBranchHandler_AbortNode(t *testing.T) {

t.Run("BranchNodeSuccess", func(t *testing.T) {
mockNodeExecutor := &execMocks.Node{}
nl := &execMocks.NodeLookup{}
mockNodeLookup := &execMocks.NodeLookup{}
mockNodeLookup.OnToNodeMatch(mock.Anything).Return(nil, nil)
eCtx := &execMocks.ExecutionContext{}
eCtx.OnGetParentInfo().Return(parentInfo{})
nCtx, s := createNodeContext(v1alpha1.BranchNodeSuccess, &n1, n, nil, nl, eCtx)
nCtx, s := createNodeContext(v1alpha1.BranchNodeSuccess, &n1, n, nil, mockNodeLookup, eCtx)
newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt())
expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo)
mockNodeExecutor.OnAbortHandlerMatch(mock.Anything,
mock.MatchedBy(func(e executors.ExecutionContext) bool { return assert.Equal(t, e, expectedExecContext) }),
mock.Anything,
mock.Anything, mock.Anything, mock.Anything).Return(nil)
nl.OnGetNode(*s.s.FinalizedNodeID).Return(n, true)
mockNodeLookup.OnGetNode(*s.s.FinalizedNodeID).Return(n, true)
branch := New(mockNodeExecutor, eventConfig, promutils.NewTestScope())
err := branch.Abort(ctx, nCtx, "")
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
subWorkflow: compiledWf,
subWorkflowClosure: workflowCacheContents.CompiledWorkflow,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), compiledWf, compiledWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus),
nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus, compiledWf),
dynamicJobSpecURI: string(f.GetLoc()),
}, nil
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
subWorkflow: dynamicWf,
subWorkflowClosure: closure,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus),
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus, dynamicWf),
dynamicJobSpecURI: string(f.GetLoc()),
}, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodes/subworkflow/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *subworkflowHandler) HandleFailingSubWorkflow(ctx context.Context, nCtx
}

status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
return s.HandleFailureNodeOfSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
}

Expand All @@ -220,7 +220,7 @@ func (s *subworkflowHandler) StartSubWorkflow(ctx context.Context, nCtx handler.
}

status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)

// assert startStatus.IsComplete() == true
return s.startAndHandleSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
Expand All @@ -233,7 +233,7 @@ func (s *subworkflowHandler) CheckSubWorkflowStatus(ctx context.Context, nCtx ha
}

status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
}

Expand All @@ -243,7 +243,7 @@ func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeE
return err
}
status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
execContext, err := s.getExecutionContextForDownstream(nCtx)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1.
nodeStatus.SetDataDir(dataDir)
nodeStatus.SetOutputDir(outputDir)
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())
s, err := c.nodeExecutor.SetInputsForStartNode(ctx, execcontext, w, executors.NewNodeLookup(w, w.GetExecutionStatus()), inputs)
s, err := c.nodeExecutor.SetInputsForStartNode(ctx, execcontext, w, executors.NewNodeLookup(w, w.GetExecutionStatus(), w), inputs)
if err != nil {
return StatusReady, err
}
Expand Down

0 comments on commit b1e5482

Please sign in to comment.