-
Notifications
You must be signed in to change notification settings - Fork 59
Implement workflow execution recovery #290
Conversation
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
@@ -92,6 +94,10 @@ func (p NodePhase) String() string { | |||
return "RetryableFailure" | |||
case NodePhaseDynamicRunning: | |||
return "DynamicRunning" | |||
case NodePhaseRecovering: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you have to do op_code_generate
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
@@ -193,6 +195,42 @@ func TestAdminLaunchPlanExecutor_Launch(t *testing.T) { | |||
assert.NoError(t, err) | |||
}) | |||
|
|||
t.Run("happy recover", func(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if recover fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to call create execution when applicable
} else { | ||
logger.Debugf(ctx, "No outputs found for recovered node [%+v]", nCtx.NodeExecutionMetadata().GetNodeExecutionID()) | ||
} | ||
outputFile := v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohh no are we still using this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kumare3 what should i be using instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kumare3 this doesn't include output file paths though. should i update it?
pkg/controller/nodes/executor.go
Outdated
@@ -429,14 +554,19 @@ func (c *nodeExecutor) handleQueuedOrRunningNode(ctx context.Context, nCtx *node | |||
np = v1alpha1.NodePhaseSucceeded | |||
finalStatus = executors.NodeStatusSuccess | |||
} | |||
if np == v1alpha1.NodePhaseRecovering && !h.FinalizeRequired() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it is recovering we do not even care about the handler or finalization right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need Recovering
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
Its amazing how clean this change is. It really fits in well into the premise? |
pkg/controller/controller.go
Outdated
// The admin client might not be initialized if EnableAdminLauncher is set to False. | ||
if adminClient == nil { | ||
var err error | ||
adminClient, err = getAdminClient(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we now always want an admin client, let's remove the condition and just always initialize it and use it up there if needed and down here all the time...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -17,7 +21,8 @@ import ( | |||
) | |||
|
|||
type launchPlanHandler struct { | |||
launchPlan launchplan.Executor | |||
launchPlan launchplan.Executor | |||
recoveryClient recovery.Client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about why do we need the recovery client at this layer... won't that be handled at the Node executor layer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the reason that we want to call the recover execution endpoint instead in case the child workflow node has failed, so that we can recover a partially failed child workflow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I thought.. but looking at the interface and implementation of recovery client... it retrieves Node executions from admin... I think... I obviously could be missing something since I'm half looking at this and half prepping my bags :-D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we retrieve the node execution - which has target metadata of type workflow node, which we can use to fetch the originally-launched child execution
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
PTAL @kumare3 |
Signed-off-by: Katrina Rogan <[email protected]>
TL;DR
Implement workflow execution recovery. For ordinary task nodes this modifies pre-execute to attempt to recover node executions when the overall workflow execution is run in recovery mode, helping elide needless re-computation for previously succeeded node executions.
For workflow nodes (that is, those that call out to create an execution from a launch plan) this sets the triggered executions to recover from the original node execution which created an execution using the same launch plan.
For dynamic nodes (that is those that contain a subworkflow) the behavior remains the same as task nodes. If the original dynamic node succeeded, great, we'll recover it. Otherwise the dynamic workflow runs again.
Type
Are all requirements met?
Complete description
How did you fix the bug, make the feature etc. Link to any design docs etc
Tracking Issue
flyteorg/flyte#1151
Follow-up issue
NA