From d0d1b7f14ad0dee7f74e243e4ab6f5f768e62474 Mon Sep 17 00:00:00 2001 From: Nikki Everett Date: Wed, 28 Feb 2024 10:10:12 -0600 Subject: [PATCH] Add failure node example from flytesnacks #1328 (#4958) * add failure node example from https://github.com/flyteorg/flytesnacks/pull/1328 Signed-off-by: nikki everett * fix formatting and index toc Signed-off-by: nikki everett * copy edits Signed-off-by: nikki everett --------- Signed-off-by: nikki everett --- .../development_lifecycle/failure_node.md | 89 +++++++++++++++++++ .../user_guide/development_lifecycle/index.md | 1 + 2 files changed, 90 insertions(+) create mode 100644 docs/user_guide/development_lifecycle/failure_node.md diff --git a/docs/user_guide/development_lifecycle/failure_node.md b/docs/user_guide/development_lifecycle/failure_node.md new file mode 100644 index 0000000000..61756e4a8a --- /dev/null +++ b/docs/user_guide/development_lifecycle/failure_node.md @@ -0,0 +1,89 @@ +(failure_node)= +# Failure node + +```{eval-rst} + .. tags:: FailureNode, Intermediate +``` + +The failure node feature enables you to designate a specific node to execute in the event of a failure within your workflow. + +For example, a workflow involves creating a cluster at the beginning, followed by the execution of tasks, and concludes with the deletion of the cluster once all tasks are completed. However, if any task within the workflow encounters an error, flyte will abort the entire workflow and won’t delete the cluster. This poses a challenge if you still need to clean up the cluster even in a task failure. + +To address this issue, you can add a failure node into your workflow. This ensures that critical actions, such as deleting the cluster, are executed even in the event of failures occurring throughout the workflow execution: + +```python +from flytekit import WorkflowFailurePolicy, task, workflow + + +@task +def create_cluster(name: str): + print(f"Creating cluster: {name}") + +``` + +Create a task that will fail during execution: + +```python +@task +def t1(a: int, b: str): + print(f"{a} {b}") + raise ValueError("Fail!") + + +@task +def delete_cluster(name: str): + print(f"Deleting cluster {name}") +``` + +Create a task that will be executed if any of the tasks in the workflow fail: + +```python +@task +def clean_up(name: str): + print(f"Cleaning up cluster {name}") + +``` + +Specify the `on_failure` to a cleanup task. This task will be executed if any of the tasks in the workflow fail: + + +:::{note} +The input of `clean_up` should be the exact same as the input of the workflow. +::: + +```python +@workflow(on_failure=clean_up) +def subwf(name: str): + c = create_cluster(name=name) + t = t1(a=1, b="2") + d = delete_cluster(name=name) + c >> t >> d +``` + +By setting the failure policy to `FAIL_AFTER_EXECUTABLE_NODES_COMPLETE` to ensure that the `wf1` is executed even if the subworkflow fails. In this case, both parent and child workflows will fail, resulting in the `clean_up` task being executed twice: + +```python +@workflow(on_failure=clean_up, failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) +def wf1(name: str = "my_cluster"): + c = create_cluster(name=name) + subwf(name="another_cluster") + t = t1(a=1, b="2") + d = delete_cluster(name=name) + c >> t >> d + + +@workflow +def clean_up_wf(name: str): + return clean_up(name=name) +``` + +You can also set the `on_failure` to a workflow. This workflow will be executed if any of the tasks in the workflow fail: + +```python +@workflow(on_failure=clean_up_wf) +def wf2(name: str = "my_cluster"): + c = create_cluster(name=name) + t = t1(a=1, b="2") + d = delete_cluster(name=name) + c >> t >> d +``` diff --git a/docs/user_guide/development_lifecycle/index.md b/docs/user_guide/development_lifecycle/index.md index 693740c661..8c21abc291 100644 --- a/docs/user_guide/development_lifecycle/index.md +++ b/docs/user_guide/development_lifecycle/index.md @@ -13,6 +13,7 @@ private_images caching cache_serializing decks +failure_node creating_a_new_project running_tasks running_workflows