Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SubWorkflow Error handling #5513

Open
2 tasks done
kdubovikov opened this issue Jun 26, 2024 · 12 comments
Open
2 tasks done

SubWorkflow Error handling #5513

kdubovikov opened this issue Jun 26, 2024 · 12 comments
Labels
enhancement New feature or request

Comments

@kdubovikov
Copy link

Motivation: Why do you think this is important?

We are creating an orchestration workflow that governs a dynamic number of model sub-workflows that are being developed by different team members. Those workflows can fail unexpectedly and we need to handle those failures gracefully in the orchestration workflow so that it still can complete successfully, as some of the sub-workflows are ok to fail, while others can be an irrecoverable failure.

Goal: What should the final outcome look like, ideally?

Something like try-catch block for the workflow-level error handling, or an on-failure decorator that can accept rich information about the workflow-level error trace

Describe alternatives you've considered

We have considered using explicit error handling in our code and logging errors in the DB, but it seems incorrect when using workflow engine, as it's the workflow's responsibility to handle and process sub-workflow or task-level exceptions that leaks into the application logic if we will implement it this way.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • Yes

Have you read the Code of Conduct?

  • Yes
@kdubovikov kdubovikov added enhancement New feature or request untriaged This issues has not yet been looked at by the Maintainers labels Jun 26, 2024
Copy link

runllm bot commented Jun 26, 2024

To handle sub-workflow errors gracefully in Flyte, you can use the failure_policy attribute with the @workflow decorator. This allows the parent workflow to continue executing even if some sub-workflows fail. Additionally, Flyte supports defining error handlers at the workflow level, which can be used to perform cleanup or logging tasks upon failure.

Here is an example of how you can define a parent workflow with error handling for sub-workflows:

from flytekit import task, workflow, WorkflowFailurePolicy

@task
def my_error_handler(error: Exception):
    # Handle the error, e.g., log it or clean up resources
    print(f"Error encountered: {error}")

@workflow(on_failure=my_error_handler)
def my_sub_wf():
    # Sub-workflow logic
    ...

@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def my_parent_wf() -> str:
    n1 = my_sub_wf()
    n2 = my_sub_wf()
    n3 = my_sub_wf()

    return n3.out1

In this example, my_parent_wf will continue running even if any of the sub-workflows (my_sub_wf) fail. The overall status of the execution will be marked as failure, but it will allow as many nodes as possible to execute. The my_error_handler task will be invoked whenever a sub-workflow fails to handle the error appropriately.

Would you like more details on this or other potential solutions?

Sources:


This is an AI-generated response and your feedback is appreciated! Please leave a 👍 if this is helpful and 👎 if it is not.

Mention @RunLLM to ask follow-up questions. See runllm.com for more!

@kdubovikov
Copy link
Author

FAIL_AFTER_EXECUTABLE_NODES_COMPLETE

Here, the parent workflow will fail if any of the sub-workflows fails. What we need is to be able to decide on what happens, similar to what you can do with try/catch in Python

@nihar-pixis
Copy link

One of our use-case :
Screenshot 2024-05-17 at 4 42 20 PM
Here is one example that we've tried where failure is being simulated for subworkflow_a, so it failed, now the task_g is dependant on subworkflow_a, in some cases user still might want to execute task_g without subworkflows_a's output and b, couldn't find a way handle the failure of subworkflow_a from main_workflow. Attached the screenshot and workflow code for reference
failure handling from the main/parent workflow can provide flexibility and more control as one might not always have control over subworkflow's from different teams/modules while orchestrating large and complex workflows.

Its always flexible to have error handlings on orchestration(node) level (be it subworkflow/task) as it provides MLEs/Data Engineers more control to decide the behaviours on data science/ other team module failures while dealing with complex orchestrations

@eapolinario
Copy link
Contributor

Flyte has support for Failure nodes, which lets workflows define special nodes to be executed in case the workflow fails. Unfortunately this doesn't cover your use case as failure nodes don't resume the execution after the failure, they only give you an opportunity to run a task.

@eapolinario eapolinario removed the untriaged This issues has not yet been looked at by the Maintainers label Jun 27, 2024
@kdubovikov
Copy link
Author

kdubovikov commented Jul 1, 2024

@eapolinario, is there any way this can get implemented? Or are there any suggested workarounds that we can proceed with?
Maybe this feature is relatively easy to add? Could we contribute from our end? I was discussing this with @kumare3 over a call, and he told that we can get some guidance on how to work around this problem or contribute to it's implementation.

Without this feature, I guess the only way to have recoverable failures in sub-workflows is to wrap every task in try/catch blocks and essentially implement the error handling on our end?

@kumare3
Copy link
Contributor

kumare3 commented Jul 2, 2024

Have you tried @eager?

@kdubovikov
Copy link
Author

Have you tried @eager?

@kumare3 , eager is an experimental feature if I am not mistaken, so we can't migrate the whole production system to eager just for error handling capabilities.

@kumare3
Copy link
Contributor

kumare3 commented Jul 2, 2024

@eapolinario what they want is success on failure, that is allow to catch the error and succeed.

@kdubovikov can you write an example of how you want it - we can help till then.

Also @eager is the best way to make things work with arbitrary code

@kdubovikov
Copy link
Author

@kumare3 is this example by @nihar-pixis not sufficient in any way? What do you need in addition?

#5513 (comment)

@kumare3
Copy link
Contributor

kumare3 commented Jul 8, 2024

Code example of the developer experience?

@nihar-pixis
Copy link

@kumare3 below are some dev-exp examples

from flytekit import task, workflow
from typing import Tuple, Optional


@task
def task_model_a() -> str:
    raise Exception("Simulated failure in Task A")

@task
def task_model_b() -> str:
    return "Output from model B"

@workflow 
def subworkflow_model_b() -> str: #model b subworkflow
    return task_model_b()

@workflow() 
def subworkflow_model_a() -> str: #model a subworkflow
    return task_model_a()

@task
def task_g(output_a: str, output_b: str) -> str:
    return f"Next task received: {output_a}, {output_b}"

# Define the main workflow

def task_e() -> str:
    return "success"


#USAGE EXAMPLE 1 :  convey task_g that result_a is optional to continue for execution
@workflow()
def main_workflow() -> str:
    
    result_a = subworkflow_model_a()
    result_b = subworkflow_model_b()

    #defined Optional which tells task_g that task_g can continue on will receive result_a as None or other default value
    final_result : Optional[str] = task_g(result_a: Optional = result_a, output_b = result_b)

    return final_result

#USAGE EXAMPLE 2: gives more flexibility to act depending upon status/exit code 
@workflow()
def main_workflow() -> str:
    
    result_a = subworkflow_model_a()

    if result_a.exit_code != 0:  # check if subworkflow failed
        
        # assign value for result_a
        result_a = "DEFAULT_VALUE" 

        # run other subworkflow/ task for result_a value
        result_a  = task_e()

         
    result_b = subworkflow_model_b()

    # since result_a has acceptable value assigned to it task_g can continue
    final_result = task_g(result_a = result_a, output_b = result_b)

    return final_result




#USAGE EXAMPLE 3: define if a task/subworkflow failure should mark main workflow as fail/success
@workflow()
def main_workflow() -> str:

    # typehint to speicify that result_a failure should not mark main_workflow failed as its optional
    result_a: Optional = subworkflow_model_a()
    
    # since result_b is not Optional main_workflow can fail
    result_b = subworkflow_model_b()

    # since result_a has acceptable value assigned to it task_g can continue
    # since final_result is optional we can assign a default value to it or it just returns none by marking workflow as success
    final_result: Optional= task_g(result_a = result_a, output_b = result_b)

    return final_result



if __name__ == "__main__":
 
    final_result = main_workflow()
    print(final_result)
    

@kdubovikov
Copy link
Author

@kumare3 hi. Could we get any feedback on this suggestion?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants