Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Including all upstream node deps on BranchNode subnode execution #543

Merged
merged 6 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions pkg/controller/executors/mocks/node_lookup.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 15 additions & 1 deletion pkg/controller/executors/node_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,27 @@ import (
type NodeLookup interface {
GetNode(nodeID v1alpha1.NodeID) (v1alpha1.ExecutableNode, bool)
GetNodeExecutionStatus(ctx context.Context, id v1alpha1.NodeID) v1alpha1.ExecutableNodeStatus
// Lookup for upstream edges, find all node ids from which this node can be reached.
ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error)
// Lookup for downstream edges, find all node ids that can be reached from the given node id.
FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error)
}

// Implements a contextual NodeLookup that can be composed of a disparate NodeGetter and a NodeStatusGetter
type contextualNodeLookup struct {
v1alpha1.NodeGetter
v1alpha1.NodeStatusGetter
DAGStructure
}

// Returns a Contextual NodeLookup using the given NodeGetter and a separate NodeStatusGetter.
// Very useful in Subworkflows where the Subworkflow is the reservoir of the nodes, but the status for these nodes
// maybe stored int he Top-level workflow node itself.
func NewNodeLookup(n v1alpha1.NodeGetter, s v1alpha1.NodeStatusGetter) NodeLookup {
func NewNodeLookup(n v1alpha1.NodeGetter, s v1alpha1.NodeStatusGetter, d DAGStructure) NodeLookup {
return contextualNodeLookup{
NodeGetter: n,
NodeStatusGetter: s,
DAGStructure: d,
}
}

Expand All @@ -45,6 +51,14 @@ func (s staticNodeLookup) GetNodeExecutionStatus(_ context.Context, id v1alpha1.
return s.status[id]
}

func (s staticNodeLookup) ToNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
return nil, nil
}

func (s staticNodeLookup) FromNode(id v1alpha1.NodeID) ([]v1alpha1.NodeID, error) {
return nil, nil
}

// Returns a new NodeLookup useful in Testing. Not recommended to be used in production
func NewTestNodeLookup(nodes map[v1alpha1.NodeID]v1alpha1.ExecutableNode, status map[v1alpha1.NodeID]v1alpha1.ExecutableNodeStatus) NodeLookup {
return staticNodeLookup{
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/executors/node_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@ type nsg struct {
v1alpha1.NodeStatusGetter
}

type dag struct {
DAGStructure
}

func TestNewNodeLookup(t *testing.T) {
n := ng{}
ns := nsg{}
nl := NewNodeLookup(n, ns)
d := dag{}
nl := NewNodeLookup(n, ns, d)
assert.NotNil(t, nl)
typed := nl.(contextualNodeLookup)
assert.Equal(t, n, typed.NodeGetter)
assert.Equal(t, ns, typed.NodeStatusGetter)
assert.Equal(t, d, typed.DAGStructure)
}

func TestNewTestNodeLookup(t *testing.T) {
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/nodes/branch/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ func (b *branchHandler) recurseDownstream(ctx context.Context, nCtx handler.Node
childNodeStatus := nl.GetNodeExecutionStatus(ctx, branchTakenNode.GetID())
childNodeStatus.SetDataDir(nodeStatus.GetDataDir())
childNodeStatus.SetOutputDir(nodeStatus.GetOutputDir())
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return handler.UnknownTransition, err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return handler.UnknownTransition, err
Expand Down Expand Up @@ -196,7 +200,11 @@ func (b *branchHandler) Abort(ctx context.Context, nCtx handler.NodeExecutionCon
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return err
Expand Down Expand Up @@ -236,7 +244,11 @@ func (b *branchHandler) Finalize(ctx context.Context, nCtx handler.NodeExecution
// TODO we should replace the call to RecursiveNodeHandler with a call to SingleNode Handler. The inputs are also already known ahead of time
// There is no DAGStructure for the branch nodes, the branch taken node is the leaf node. The node itself may be arbitrarily complex, but in that case the node should reference a subworkflow etc
// The parent of the BranchTaken Node is the actual Branch Node and all the data is just forwarded from the Branch to the executed node.
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), nCtx.NodeID())
upstreamNodeIds, err := nCtx.ContextualNodeLookup().ToNode(branchTakenNode.GetID())
if err != nil {
return err
}
dag := executors.NewLeafNodeDAGStructure(branchTakenNode.GetID(), append(upstreamNodeIds, nCtx.NodeID())...)
execContext, err := b.getExecutionContextForDownstream(nCtx)
if err != nil {
return err
Expand Down
45 changes: 30 additions & 15 deletions pkg/controller/nodes/branch/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,34 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) {
isErr bool
expectedPhase handler.EPhase
childPhase v1alpha1.NodePhase
nl *execMocks.NodeLookup
upstreamNodeID string
}{
{"upstreamNodeExists", executors.NodeStatusPending, nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, "n2"},
{"childNodeError", executors.NodeStatusUndefined, fmt.Errorf("err"),
&mocks2.ExecutableNodeStatus{}, bn, true, handler.EPhaseUndefined, v1alpha1.NodePhaseFailed, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, true, handler.EPhaseUndefined, v1alpha1.NodePhaseFailed, ""},
{"childPending", executors.NodeStatusPending, nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseQueued, ""},
{"childStillRunning", executors.NodeStatusRunning, nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseRunning, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseRunning, v1alpha1.NodePhaseRunning, ""},
{"childFailure", executors.NodeStatusFailed(expectedError), nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseFailed, v1alpha1.NodePhaseFailed, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseFailed, v1alpha1.NodePhaseFailed, ""},
{"childComplete", executors.NodeStatusComplete, nil,
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseSuccess, v1alpha1.NodePhaseSucceeded, &execMocks.NodeLookup{}},
&mocks2.ExecutableNodeStatus{}, bn, false, handler.EPhaseSuccess, v1alpha1.NodePhaseSucceeded, ""},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
eCtx := &execMocks.ExecutionContext{}
eCtx.OnGetParentInfo().Return(parentInfo{})
nCtx, _ := createNodeContext(v1alpha1.BranchNodeNotYetEvaluated, &childNodeID, n, nil, test.nl, eCtx)

mockNodeLookup := &execMocks.NodeLookup{}
if len(test.upstreamNodeID) > 0 {
mockNodeLookup.OnToNodeMatch(childNodeID).Return([]string{test.upstreamNodeID}, nil)
} else {
mockNodeLookup.OnToNodeMatch(childNodeID).Return(nil, nil)
}

nCtx, _ := createNodeContext(v1alpha1.BranchNodeNotYetEvaluated, &childNodeID, n, nil, mockNodeLookup, eCtx)
newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt())
expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo)
mockNodeExecutor := &execMocks.Node{}
Expand All @@ -187,23 +197,27 @@ func TestBranchHandler_RecurseDownstream(t *testing.T) {
fList, err1 := d.FromNode("x")
dList, err2 := d.ToNode(childNodeID)
b := assert.NoError(t, err1)
b = b && assert.Equal(t, fList, []v1alpha1.NodeID{})
b = b && assert.Equal(t, []v1alpha1.NodeID{}, fList)
b = b && assert.NoError(t, err2)
b = b && assert.Equal(t, dList, []v1alpha1.NodeID{nodeID})
dListExpected := []v1alpha1.NodeID{nodeID}
if len(test.upstreamNodeID) > 0 {
dListExpected = append([]string{test.upstreamNodeID}, dListExpected...)
}
b = b && assert.Equal(t, dListExpected, dList)
return b
}
return false
}),
mock.MatchedBy(func(lookup executors.NodeLookup) bool { return assert.Equal(t, lookup, test.nl) }),
mock.MatchedBy(func(lookup executors.NodeLookup) bool { return assert.Equal(t, lookup, mockNodeLookup) }),
mock.MatchedBy(func(n v1alpha1.ExecutableNode) bool { return assert.Equal(t, n.GetID(), childNodeID) }),
).Return(test.ns, test.err)

childNodeStatus := &mocks2.ExecutableNodeStatus{}
if test.nl != nil {
if mockNodeLookup != nil {
childNodeStatus.OnGetOutputDir().Return("parent-output-dir")
test.nodeStatus.OnGetDataDir().Return("parent-data-dir")
test.nodeStatus.OnGetOutputDir().Return("parent-output-dir")
test.nl.OnGetNodeExecutionStatus(ctx, childNodeID).Return(childNodeStatus)
mockNodeLookup.OnGetNodeExecutionStatus(ctx, childNodeID).Return(childNodeStatus)
childNodeStatus.On("SetDataDir", storage.DataReference("parent-data-dir")).Once()
childNodeStatus.On("SetOutputDir", storage.DataReference("parent-output-dir")).Once()
}
Expand Down Expand Up @@ -295,17 +309,18 @@ func TestBranchHandler_AbortNode(t *testing.T) {

t.Run("BranchNodeSuccess", func(t *testing.T) {
mockNodeExecutor := &execMocks.Node{}
nl := &execMocks.NodeLookup{}
mockNodeLookup := &execMocks.NodeLookup{}
mockNodeLookup.OnToNodeMatch(mock.Anything).Return(nil, nil)
eCtx := &execMocks.ExecutionContext{}
eCtx.OnGetParentInfo().Return(parentInfo{})
nCtx, s := createNodeContext(v1alpha1.BranchNodeSuccess, &n1, n, nil, nl, eCtx)
nCtx, s := createNodeContext(v1alpha1.BranchNodeSuccess, &n1, n, nil, mockNodeLookup, eCtx)
newParentInfo, _ := common.CreateParentInfo(parentInfo{}, nCtx.NodeID(), nCtx.CurrentAttempt())
expectedExecContext := executors.NewExecutionContextWithParentInfo(nCtx.ExecutionContext(), newParentInfo)
mockNodeExecutor.OnAbortHandlerMatch(mock.Anything,
mock.MatchedBy(func(e executors.ExecutionContext) bool { return assert.Equal(t, e, expectedExecContext) }),
mock.Anything,
mock.Anything, mock.Anything, mock.Anything).Return(nil)
nl.OnGetNode(*s.s.FinalizedNodeID).Return(n, true)
mockNodeLookup.OnGetNode(*s.s.FinalizedNodeID).Return(n, true)
branch := New(mockNodeExecutor, eventConfig, promutils.NewTestScope())
err := branch.Abort(ctx, nCtx, "")
assert.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
subWorkflow: compiledWf,
subWorkflowClosure: workflowCacheContents.CompiledWorkflow,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), compiledWf, compiledWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus),
nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus, compiledWf),
dynamicJobSpecURI: string(f.GetLoc()),
}, nil
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
subWorkflow: dynamicWf,
subWorkflowClosure: closure,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus),
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus, dynamicWf),
dynamicJobSpecURI: string(f.GetLoc()),
}, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/nodes/subworkflow/subworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (s *subworkflowHandler) HandleFailingSubWorkflow(ctx context.Context, nCtx
}

status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
return s.HandleFailureNodeOfSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
}

Expand All @@ -220,7 +220,7 @@ func (s *subworkflowHandler) StartSubWorkflow(ctx context.Context, nCtx handler.
}

status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)

// assert startStatus.IsComplete() == true
return s.startAndHandleSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
Expand All @@ -233,7 +233,7 @@ func (s *subworkflowHandler) CheckSubWorkflowStatus(ctx context.Context, nCtx ha
}

status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
return s.handleSubWorkflow(ctx, nCtx, subWorkflow, nodeLookup)
}

Expand All @@ -243,7 +243,7 @@ func (s *subworkflowHandler) HandleAbort(ctx context.Context, nCtx handler.NodeE
return err
}
status := nCtx.NodeStatus()
nodeLookup := executors.NewNodeLookup(subWorkflow, status)
nodeLookup := executors.NewNodeLookup(subWorkflow, status, subWorkflow)
execContext, err := s.getExecutionContextForDownstream(nCtx)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/workflow/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *workflowExecutor) handleReadyWorkflow(ctx context.Context, w *v1alpha1.
nodeStatus.SetDataDir(dataDir)
nodeStatus.SetOutputDir(outputDir)
execcontext := executors.NewExecutionContext(w, w, w, nil, executors.InitializeControlFlow())
s, err := c.nodeExecutor.SetInputsForStartNode(ctx, execcontext, w, executors.NewNodeLookup(w, w.GetExecutionStatus()), inputs)
s, err := c.nodeExecutor.SetInputsForStartNode(ctx, execcontext, w, executors.NewNodeLookup(w, w.GetExecutionStatus(), w), inputs)
if err != nil {
return StatusReady, err
}
Expand Down