Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Fix case in which downstream nodes may incorrectly fail because of pa…
Browse files Browse the repository at this point in the history
…rentNodeID #minor (#288)

* Branch canexecute fix

Signed-off-by: Ketan Umare <[email protected]>

* tests

Signed-off-by: Ketan Umare <[email protected]>

* more unit test fixes

Signed-off-by: Ketan Umare <[email protected]>

* fixed tests

Signed-off-by: Ketan Umare <[email protected]>

* more documentation

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Jul 15, 2021
1 parent 40b6aaf commit 9d0d964
Show file tree
Hide file tree
Showing 3 changed files with 337 additions and 155 deletions.
29 changes: 23 additions & 6 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) {
// Node not yet started
{
createSingleNodeWf := func(parentPhase v1alpha1.NodePhase, _ int) (v1alpha1.ExecutableWorkflow, v1alpha1.ExecutableNode, v1alpha1.ExecutableNodeStatus) {
sn := &v1alpha1.NodeSpec{
ID: v1alpha1.StartNodeID,
Kind: v1alpha1.NodeKindStart,
}
n := &v1alpha1.NodeSpec{
ID: v1alpha1.EndNodeID,
Kind: v1alpha1.NodeKindEnd,
Expand All @@ -292,7 +296,8 @@ func TestNodeExecutor_RecursiveNodeHandler_RecurseEndNode(t *testing.T) {
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "wf",
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
v1alpha1.EndNodeID: n,
v1alpha1.StartNodeID: sn,
v1alpha1.EndNodeID: n,
},
Connections: v1alpha1.Connections{
Upstream: map[v1alpha1.NodeID][]v1alpha1.NodeID{
Expand Down Expand Up @@ -608,6 +613,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
mockWf := &mocks.ExecutableWorkflow{}
mockWf.OnStartNode().Return(mockNodeN0)
mockWf.OnGetNode(nodeN2).Return(mockNode, true)
mockWf.OnGetNode(nodeN0).Return(mockNodeN0, true)
mockWf.OnGetNodeExecutionStatusMatch(mock.Anything, nodeN0).Return(mockN0Status)
mockWf.OnGetNodeExecutionStatusMatch(mock.Anything, nodeN2).Return(mockN2Status)
mockWf.OnGetConnections().Return(connections)
Expand Down Expand Up @@ -653,8 +659,8 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
mock.MatchedBy(func(ctx context.Context) bool { return true }),
mock.MatchedBy(func(o handler.NodeExecutionContext) bool { return true }),
).Return(handler.UnknownTransition, fmt.Errorf("should not be called"))
h.On("FinalizeRequired").Return(false)
hf.On("GetHandler", v1alpha1.NodeKindTask).Return(h, nil)
h.OnFinalizeRequired().Return(false)
hf.OnGetHandler(v1alpha1.NodeKindTask).Return(h, nil)

mockWf, _ := setupNodePhase(test.parentNodePhase, test.currentNodePhase, test.expectedNodePhase)
startNode := mockWf.StartNode()
Expand Down Expand Up @@ -1095,6 +1101,11 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) {
taskID := taskID

createSingleNodeWf := func(parentPhase v1alpha1.NodePhase, maxAttempts int) (v1alpha1.ExecutableWorkflow, v1alpha1.ExecutableNode, v1alpha1.ExecutableNodeStatus) {
startNode := &v1alpha1.NodeSpec{
Kind: v1alpha1.NodeKindStart,
ID: v1alpha1.StartNodeID,
}

n := &v1alpha1.NodeSpec{
ID: defaultNodeID,
TaskRef: &taskID,
Expand All @@ -1103,6 +1114,7 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) {
MinAttempts: &maxAttempts,
},
}

ns := &v1alpha1.NodeStatus{}

return &v1alpha1.FlyteWorkflow{
Expand All @@ -1123,7 +1135,8 @@ func TestNodeExecutor_RecursiveNodeHandler_UpstreamNotReady(t *testing.T) {
WorkflowSpec: &v1alpha1.WorkflowSpec{
ID: "wf",
Nodes: map[v1alpha1.NodeID]*v1alpha1.NodeSpec{
defaultNodeID: n,
v1alpha1.StartNodeID: startNode,
defaultNodeID: n,
},
Connections: v1alpha1.Connections{
Upstream: map[v1alpha1.NodeID][]v1alpha1.NodeID{
Expand Down Expand Up @@ -1210,8 +1223,8 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) {
expectedError bool
}{
{"branchSuccess", v1alpha1.BranchNodeSuccess, v1alpha1.NodePhaseNotYetStarted, true, executors.NodePhaseQueued, false},
{"branchNotYetDone", v1alpha1.BranchNodeNotYetEvaluated, v1alpha1.NodePhaseNotYetStarted, false, executors.NodePhaseUndefined, true},
{"branchError", v1alpha1.BranchNodeError, v1alpha1.NodePhaseNotYetStarted, false, executors.NodePhaseUndefined, true},
{"branchNotYetDone", v1alpha1.BranchNodeNotYetEvaluated, v1alpha1.NodePhaseNotYetStarted, false, executors.NodePhasePending, false},
{"branchError", v1alpha1.BranchNodeError, v1alpha1.NodePhaseNotYetStarted, false, executors.NodePhasePending, false},
}

for _, test := range tests {
Expand All @@ -1228,13 +1241,16 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) {

hf.OnGetHandlerMatch(v1alpha1.NodeKindTask).Return(h, nil)

now := v1.Time{Time: time.Now()}
parentBranchNodeID := "branchNode"
parentBranchNode := &mocks.ExecutableNode{}
parentBranchNode.OnGetID().Return(parentBranchNodeID)
parentBranchNode.OnGetBranchNode().Return(&mocks.ExecutableBranchNode{})
parentBranchNodeStatus := &mocks.ExecutableNodeStatus{}
parentBranchNodeStatus.OnGetPhase().Return(v1alpha1.NodePhaseRunning)
parentBranchNodeStatus.OnIsDirty().Return(false)
parentBranchNodeStatus.OnGetStartedAt().Return(&now)
parentBranchNodeStatus.OnGetLastUpdatedAt().Return(nil)
bns := &mocks.MutableBranchNodeStatus{}
parentBranchNodeStatus.OnGetBranchStatus().Return(bns)
bns.OnGetPhase().Return(test.parentNodePhase)
Expand Down Expand Up @@ -1271,6 +1287,7 @@ func TestNodeExecutor_RecursiveNodeHandler_BranchNode(t *testing.T) {
branchTakeNodeStatus.OnGetDataDir().Return("data")
branchTakeNodeStatus.OnGetParentNodeID().Return(&parentBranchNodeID)
branchTakeNodeStatus.OnGetParentTaskID().Return(nil)
branchTakeNodeStatus.OnGetStartedAt().Return(&now)

if test.phaseUpdateExpected {
var ee *core.ExecutionError
Expand Down
77 changes: 44 additions & 33 deletions pkg/controller/nodes/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func (p PredicatePhase) String() string {
return "undefined"
}

// CanExecute method informs the callee if the given node can begin execution. This is dependent on
// primarily that all nodes upstream to the given node are successful and the results are available.
func CanExecute(ctx context.Context, dag executors.DAGStructure, nl executors.NodeLookup, node v1alpha1.BaseNode) (
PredicatePhase, error) {

Expand All @@ -47,36 +49,47 @@ func CanExecute(ctx context.Context, dag executors.DAGStructure, nl executors.No
return PredicatePhaseReady, nil
}

nodeStatus := nl.GetNodeExecutionStatus(ctx, nodeID)
parentNodeID := nodeStatus.GetParentNodeID()
upstreamNodes, err := dag.ToNode(nodeID)
if err != nil {
return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node")
}

skipped := false
for _, upstreamNodeID := range upstreamNodes {
upstreamNode, ok := nl.GetNode(upstreamNodeID)
if !ok {
return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Upstream node [%v] of node [%v] not defined", upstreamNodeID, nodeID)
}

upstreamNodeStatus := nl.GetNodeExecutionStatus(ctx, upstreamNodeID)

if upstreamNodeStatus.IsDirty() {
return PredicatePhaseNotReady, nil
}

if parentNodeID != nil && *parentNodeID == upstreamNodeID {
upstreamNode, ok := nl.GetNode(upstreamNodeID)
if !ok {
return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Upstream node [%v] of node [%v] not defined", upstreamNodeID, nodeID)
// BranchNodes are special. When the upstreamNode is a branch node it could represent one of two cases
// Case 1: the upstreamNode is the parent branch node (the one that houses the condition) and the child node
// is one of the branches for the condition.
// i.e For an example Branch (if condition then take-branch1 else take-branch2)
// condition is the branch node, take-branch1/2 are the 2 child nodes and this condition can be triggered
// for either one of them
// Case 2: the upstreamNode is some branch node and the current node is simply a node that consumes the output
// of the branch
// Thus,
// In case1, we can proceed to execute one of the branches as soon as the branch evaluation completes, but
// branch node itself will not be successful (as it contains the branches)
// In case2, we will continue the evaluation of the CanExecute code block and can only proceed if the entire
// branch node is complete
if upstreamNode.GetBranchNode() != nil && upstreamNodeStatus.GetBranchStatus() != nil {
if upstreamNodeStatus.GetBranchStatus().GetPhase() != v1alpha1.BranchNodeSuccess {
return PredicatePhaseNotReady, nil
}

// Deprecated: This if block will be removed in a future version. It's harmless (will be no-op) for newly
// compiled Workflows as sub-branch-nodes won't have an execution or code dependency on branch nodes.
// This only happens if current node is the child node of a branch node
if upstreamNode.GetBranchNode() == nil || upstreamNodeStatus.GetBranchStatus().GetPhase() != v1alpha1.BranchNodeSuccess {
logger.Debugf(ctx, "Branch sub node is expected to have parent branch node in succeeded state")
return PredicatePhaseUndefined, errors.Errorf(errors.IllegalStateError, nodeID, "Upstream node [%v] is set as parent, but is not a branch node of [%v] or in illegal state.", upstreamNodeID, nodeID)
// Branch node is success, so we are free to go ahead and execute the child nodes of the branch node, but
// not any of the dependent nodes
nodeStatus := nl.GetNodeExecutionStatus(ctx, nodeID)
if nodeStatus.GetParentNodeID() != nil && *nodeStatus.GetParentNodeID() == upstreamNodeID {
continue
}

continue
}

if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped ||
Expand All @@ -103,8 +116,6 @@ func GetParentNodeMaxEndTime(ctx context.Context, dag executors.DAGStructure, nl
return zeroTime, nil
}

nodeStatus := nl.GetNodeExecutionStatus(ctx, node.GetID())
parentNodeID := nodeStatus.GetParentNodeID()
upstreamNodes, err := dag.ToNode(nodeID)
if err != nil {
return zeroTime, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node")
Expand All @@ -113,24 +124,24 @@ func GetParentNodeMaxEndTime(ctx context.Context, dag executors.DAGStructure, nl
var latest v1.Time
for _, upstreamNodeID := range upstreamNodes {
upstreamNodeStatus := nl.GetNodeExecutionStatus(ctx, upstreamNodeID)
if parentNodeID != nil && *parentNodeID == upstreamNodeID {
upstreamNode, ok := nl.GetNode(upstreamNodeID)
if !ok {
return zeroTime, errors.Errorf(errors.BadSpecificationError, nodeID, "Upstream node [%v] of node [%v] not defined", upstreamNodeID, nodeID)
}

// Deprecated: This if block will be removed in a future version. It's harmless (will be no-op) for newly
// compiled Workflows as sub-branch-nodes won't have an execution or code dependency on branch nodes.
// This only happens if current node is the child node of a branch node
if upstreamNode.GetBranchNode() == nil || upstreamNodeStatus.GetBranchStatus().GetPhase() != v1alpha1.BranchNodeSuccess {
logger.Debugf(ctx, "Branch sub node is expected to have parent branch node in succeeded state")
return zeroTime, errors.Errorf(errors.IllegalStateError, nodeID, "Upstream node [%v] is set as parent, but is not a branch node of [%v] or in illegal state.", upstreamNodeID, nodeID)
}

continue
upstreamNode, ok := nl.GetNode(upstreamNodeID)
if !ok {
return zeroTime, errors.Errorf(errors.BadSpecificationError, nodeID, "Upstream node [%v] of node [%v] not defined", upstreamNodeID, nodeID)
}

if stoppedAt := upstreamNodeStatus.GetStoppedAt(); stoppedAt != nil && stoppedAt.Unix() > latest.Unix() {
// Special handling for branch node. The status for branch does not get updated to success, as it is the parent node
if upstreamNode.GetBranchNode() != nil && upstreamNodeStatus.GetBranchStatus() != nil {
if upstreamNodeStatus.GetBranchStatus().GetPhase() != v1alpha1.BranchNodeSuccess {
return zeroTime, nil
}
branchUpdatedAt := upstreamNodeStatus.GetStartedAt()
if upstreamNodeStatus.GetLastUpdatedAt() != nil {
branchUpdatedAt = upstreamNodeStatus.GetLastUpdatedAt()
}
if branchUpdatedAt != nil && branchUpdatedAt.Unix() > latest.Unix() {
latest = *branchUpdatedAt
}
} else if stoppedAt := upstreamNodeStatus.GetStoppedAt(); stoppedAt != nil && stoppedAt.Unix() > latest.Unix() {
latest = *upstreamNodeStatus.GetStoppedAt()
}
}
Expand Down
Loading

0 comments on commit 9d0d964

Please sign in to comment.