-
Notifications
You must be signed in to change notification settings - Fork 14.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Downstreaming to a task_group creates a dependency to its returned task, not its first one #40196
Comments
I guess - this is a side-effect of some of the python metaprogramming that we use for dependencies -- as far as I understand, task group should not return anything (or at least it's not specified) - but I might be wrong about it. @uranusjr -> I guess you would be the best person to comment on it maybe? |
Hello @potiuk , thank you for your answer. Indeed, the part of the official Airflow documentation regarding task groups doesn't specify whether it's possible to return a value from a task group. However, the unofficial documentations from Astronomier (that for what I understand is close to Airflow development) tell that it's possible:
Best regards, |
Sounds like good candidate to fix and document better - PRs are welcome, but it would be great to hear from those who implemented it what's the intention here :D. |
Glad I found this issue as I was experiencing the same with the following:
Now I had trouble finding this behavior explained in documentation as well, and I have definitely made heavy use of |
Ahhh. I see exactly what happened. I looked a bit at the history of it and did a little investigation and here is what happend (@uranusjr -> would love to get you confirm my understanding and see if you agree with my assesment of what should be done here):
However that does not work well for this case:
Precisely, because of the case you describe - that the previous task would be upstream of the last task in a group. (and @uranusjr correctly mentioned in #19903 (comment) it could be done by returning Then the task group-decorated function return value have been improved by @uranusjr here #20671 -> where it was allowed that task group-decorated function returns nothing which is equivalent of returning the task_group - which works for both sides of the dependencies (>> tg will add downstream to first task in the group where tg >> will add upstream dependency from the last task in the group) It was supposed to be documented in #20671 which was created as a follow-up task - but it has been closed as "completed" by #26028 - but in fact that PR only adds somme examples and does not really describe this behaviour in the docs (and it does not describe the differences depending on what is returned by the decorated function). I honestly find it hard to justify the the behaviour when So my proposal is that we should issue a deprecation warning when Then it should of course be documented (but I I would rather see it documented as deprecated behaviour if others agree with me). @uranusjr (and also @eladkal - who had already fixed our example dag in the past that was exhibiting that confusing behaviour - #21240 - what do you think ? |
Hi, Thank you all for your comments and investigations! So @potiuk , you're saying that Best regards, |
No. What I am saying is that task_group should not return anything (i.e. None) and IMHO returning task should be deprecated and raise warning (@uranusjr -> would love to hear what you think and if I understood the whole problem properly). We already have code implemented in #20671 by @uranusjr that will act as-if the function returned the Returning a single task from Of course if you return Does it make sense? |
Hi, Yes, it makes sense, thank you @potiuk! But what if I need to get the result of the final task of a task-group? from airflow.decorators import dag, task, task_group
from airflow.models.baseoperator import BaseOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from pendulum import datetime
@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False,)
def branching_to_first_task_of_group_with_return_value() -> None:
"""Do a branching to a taskflow group and get its result."""
choice_a_result = choice_a()
choice_b_result = choice_b()
choose_a_or_b() >> [choice_a_result, choice_b_result]
get_result(choice_a_result, choice_b_result)
@task.branch
def choose_a_or_b() -> str:
return "choice_a"
# return "choice_b"
@task_group()
def choice_a() -> int:
value = get_a_value()
return operation_on_value(value)
# Should I use `return (value, operation_on_value(value))` ?
@task
def get_a_value() -> int:
return 1
@task
def operation_on_value(value: int) -> int:
return value * 6
@task
def choice_b() -> int:
return 2
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def get_result(result_a: int | None, result_b: int | None) -> int:
return result_a or result_b
branching_to_first_task_of_group_with_return_value() Best regards, |
I don’t think this has anything to do with dynamic task mapping specifically? This is just the design decision Airflow made when task groups were introduced. Not sure if we can change it either, depending on the last task is a pretty intuitive choice. |
Hi @uranusjr, thank you for your comment! Indeed, for what I understand it's not about dynamic task mapping (expand and things like that) but task-groups. In my opinion, it's intuitive when wanting to downstream from the task-group but not when wanting to downstream to the task-group (e.g., with branching like here). Best regards, |
This is because you return the task from the group function. WHen you do that, dependencies to the group is connected to the task you return. If you want to connect to the group itself instead, simply don’t return the task: @task_group()
def choice_a() -> int:
value = get_a_value()
operation_on_value(value) However, I do observe another bug. When you do this, we don’t connect the task to the group’s downstream: cc @bbovenzi Does the server need to provide extra information to make this happen? |
Hi @uranusjr, thank you again for your comment! In that case, how am I supposed to get the result of a task that is inside a taskgroup outside this taskgroup? E.g., in the following example, allow Best regards, |
Hi, Another case I came across is the following: Here I can not make that my files preparation is complete before the execution starts. If I would daisy chain the file_list through the last task of the task_group and return that, then I can not have upstream dependencies anymore. Not returning the file_list from the task_group is not a good option because in my case the task_group is a function of a common package that is used in multiple nested task_groups and its hard to find the right ID to access it through xcom.pull. Thanks @potiuk for the idea of returning multiple tasks, thats more convenient than my previous workaround. I am not sure about the implications, but for me something like a |
+1, |
Experienced this issue too... I had 3 branching tasks that merged into 1 "merging" task and I needed to add additiontal logic to 1 of the branching tasks, so I decided to turn it into a @task_group assuming I could return a value like before, but struggled. Ended up using something like this: from airflow.decorators import dag, task, task_group
from airflow.operators.empty import EmptyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime
import random
@dag(
dag_id='branched_dag_with_task_group',
schedule_interval=None,
start_date=datetime(2023, 1, 1),
catchup=False,
is_paused_upon_creation=False,
)
def branched_dag_with_task_group():
@task
def start():
print("Starting the DAG")
return ["path1", "path2", "path3"]
@task.branch
def branch(paths):
return random.choice(paths)
@task
def path1():
print("Executing Path 1")
return "Result from Path 1"
@task
def path2():
print("Executing Path 3")
return "Result from Path 3"
@task_group
def path3():
@task
def path3_task1():
print("Executing Path 2 - Task 1")
return random.choice([1,2])
@task.skip_if(condition=lambda context: context['ti'].xcom_pull(task_ids='path3.path3_task1') == 1)
@task
def path3_task2(t1):
print(f"Executing Path 2 - Task 2, received: {t1}")
return "Result from Path 2"
@task(trigger_rule=TriggerRule.NONE_FAILED)
def path3_task3(t1, t2):
print(f"{t1=}")
print(f"{t2=}")
return 42
t1 = path3_task1()
t2 = path3_task2(t1)
t3 = path3_task3(t1, t2)
t1 >> t2 >> t3
return t1, t2, t3
@task(trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
def merge(path1_result, path2_result, path3_result):
print(f"{path1_result=}")
print(f"{path2_result=}")
print(f"{path3_result=}")
start_result = start()
branched_paths = branch(start_result)
path1_result = path1()
path2_result = path2()
path3_result = path3()
merged_result = merge(path1_result, path2_result, path3_result[-1])
start_result >> branched_paths
branched_paths >> path1_result >> merged_result
branched_paths >> path2_result >> merged_result
branched_paths >> path3_result >> merged_result
branched_dag_with_task_group() |
Apache Airflow version
2.9.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
Hello Airflow team,
I found something I consider to be a bug, or at least an unexpected behavior.
When I try to set a task_group
my_task_group
as the downstream of a taskstart_task
:my_task_group
doesn't return anything, the downstream is set to the first task ofmy_task_group
(expected behavior).my_task_group
returns something, the downstream is set to the task ofmy_task_group
that returns this value (unexpected behavior).Examples:
Without return value
In this case,
start
is linked totask_1_of_group
(expected behavior).Screenshot
Code
With return value
In this case,
start
is linked totask_2_of_group
(becausetask_2_of_group
is returned by the task).Screenshot
Code
Workaround
A workaround I found is to use an EmptyOperator as an entrypoint.
The procedure is to set it as downstream for the task before the group, then to give it to the group as a parameter and to set it as upstream task for the first task of the group.
Screenshot
Code
Thank you for your work, Airflow is awesome!
Best regards,
Nathan Rousseau, A.K.A le-chartreux
What you think should happen instead?
The behavior of setting the downstream to a
task_group
should not change no matter thistask_group
returns something or not.As a user, I expect it to always be set to the first task of the
task_group
.How to reproduce
Copy/paste the codes in the 'What happened?' part of this issue (especially the one with a return value since it's the one with an unexpected behavior).
Operating System
Red Hat Enterprise Linux 8.9 (Ootpa)
Versions of Apache Airflow Providers
apache-airflow-providers-common-sql==1.13.0
apache-airflow-providers-fab==1.1.0
apache-airflow-providers-ftp==3.9.0
apache-airflow-providers-http==4.11.0
apache-airflow-providers-imap==3.6.0
apache-airflow-providers-smtp==1.7.0
apache-airflow-providers-sqlite==3.8.0
Deployment
Virtualenv installation
Deployment details
I just used the standard pip install inside a venv.
Anything else?
No response
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: