-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[2.x engine] Append task run futures only when entering task run engine from flow run context #14439
Conversation
@@ -4226,6 +4226,35 @@ def my_flow(): | |||
assert result == "Failed" | |||
assert count == 2 | |||
|
|||
def test_nested_task_with_retries_on_outer_task(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unit test looks like what I'd have used to test, so while I didn't test myself if this work then I expect this will resolve teh issue!
@@ -1604,8 +1615,9 @@ async def create_task_run_future( | |||
) | |||
) | |||
|
|||
# Track the task run future in the flow run context | |||
flow_run_context.task_run_futures.append(future) | |||
if not entering_from_task_run: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a prefect dev environment running yet where I can test this locally, but only thing I'd double check is that the nested task run still gets its status correctly tracked in the UI and everything even when it's not appended to this list. Without knowing much about the prefect internals, I don't have good intuition here... If you say it works, I believe you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @thundercat1 - here's how it looks on this branch
using this code
from prefect import flow, task
failed = False
@task(name="Nested Flaky Task")
def nested_flaky_task():
# This task will fail the first time it is run, but will succeed if called a second time
global failed
if not failed:
failed = True
raise ValueError("Forced task failure")
@task(
name="Top Task",
retries=1,
)
def top_task():
nested_flaky_task()
@flow
def nested_task_flow():
top_task()
if __name__ == "__main__":
nested_task_flow()
which matches my expectations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this lgtm!
closes #14390
This PR addresses an issue where flow runs with nested tasks are incorrectly marked as failed, even when the inner tasks eventually succeed. For an example, please see the MRE provided in the related issue.
The issue occurs because when retries are not added to a nested task, a new task run future is created each time the task run engine is exited and re-entered. This results in multiple task run futures being tracked (including failed ones), causing incorrect flow run terminal state.
To resolve this, the PR ensures that task run futures are only appended when the task run is initiated from within the flow run context. This prevents the creation of unnecessary task run futures, thereby ensuring the flow run status accurately reflects the success of the inner tasks.
Changes
enter_task_run_engine
to include a parameter indicating whether the task run is being entered from a flow run context.create_task_run_future
to append task run futures only when the task run is initiated from within the flow run context.