Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trigger_rule=TriggerRule.ONE_FAILED doesn't work properly with task_groups #30333

Closed
1 of 2 tasks
antonio-antuan opened this issue Mar 27, 2023 · 10 comments
Closed
1 of 2 tasks
Assignees
Labels
affected_version:2.5 Issues Reported for 2.5 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug

Comments

@antonio-antuan
Copy link

antonio-antuan commented Mar 27, 2023

Apache Airflow version

2.5.2 (checked on 2.5.3 also)

What happened

I'd like to setup "watcher" pattern inside of task_group, but the task always marked as "skipped".
reference to a similar issue: #30334 30334

What you think should happen instead

No response

How to reproduce

from datetime import datetime

from airflow import DAG
from airflow.decorators import task_group
from airflow.operators.python import task
from airflow.utils.trigger_rule import TriggerRule


@task(retries=0, trigger_rule=TriggerRule.ONE_FAILED)
def watcher():
    pass


@task
def foo(x):
    pass

@task
def failed(x):
    raise Exception("Failed")


@task_group()
def process_attestation(x):
    [foo(x), failed(x)] >> watcher()


with DAG(
        dag_id='test',
        start_date=datetime.now(),
) as dag:
    # Receives sboms from database, one row per each DAG run.
    # Will be changed in the future with HTTP-endpoint or something like that.
    @task
    def provider():
        return [1, 2, 3]


    process_attestation.expand(x=provider())

Operating System

Arch Linux (kernel 6.2.6)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==7.3.0
apache-airflow-providers-common-sql==1.3.4
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==8.11.0
apache-airflow-providers-http==4.2.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-sqlite==3.3.1

Deployment

Docker-Compose

Deployment details

No response

Anything else

the result graph screenshot:
image

task instance details screenshot:
image

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@antonio-antuan antonio-antuan added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Mar 27, 2023
@antonio-antuan
Copy link
Author

checked on 2.5.3, still reproduced

@hussein-awala hussein-awala removed the needs-triage label for new issues that we didn't triage yet label Apr 4, 2023
@hussein-awala hussein-awala self-assigned this Apr 4, 2023
@hussein-awala hussein-awala removed their assignment Apr 13, 2023
@pankajastro
Copy link
Member

look like this happening when trying to expand on task_group.

@hussein-awala
Copy link
Member

@pankajastro yes, this happens only with mapped task groups, but I confirm that there is a bug on this part. Unfortunately I didn't have time to work on it.

@dzhigimont
Copy link
Contributor

@hussein-awala @eladkal I can take it and the related issue #30334

@eladkal
Copy link
Contributor

eladkal commented Jul 15, 2023

@dzhigimont are you still working on this issue?

@dzhigimont
Copy link
Contributor

Hi @eladkal I am working on it. I had some pauses in the process

@uranusjr
Copy link
Member

uranusjr commented Jul 25, 2023

OK, so I went on a debug adventure, and here’s my conclusion. The analysis in #32701 (comment) is spot on, but the problem is deeper. Currently, we first check whether a task has all its dependencies satisfied, and only expand it if so:

for schedulable in itertools.chain(schedulable_tis, additional_tis):
old_state = schedulable.state
if not schedulable.are_dependencies_met(session=session, dep_context=dep_context):
old_states[schedulable.key] = old_state
continue
# If schedulable is not yet expanded, try doing it now. This is
# called in two places: First and ideally in the mini scheduler at
# the end of LocalTaskJob, and then as an "expansion of last resort"
# in the scheduler to ensure that the mapped task is correctly
# expanded before executed. Also see _revise_map_indexes_if_mapped
# docstring for additional information.
new_tis = None
if schedulable.map_index < 0:
new_tis = _expand_mapped_task_if_needed(schedulable)
if new_tis is not None:
additional_tis.extend(new_tis)
expansion_happened = True

(Notice how are_dependencies_met must satisfy to reach _expand_mapped_task_if_needed)

Whether a trigger rule dependency is satisfied, however, is not known before expansion, since we only want to consider “relevant” upstreams. This is why #32802 doesn’t work. The only way to work around this is to “fake” a dependency passage to allow expansion to happen. After expansion, the scheduling code would re-check deps, which would work as expected since now the tis have correct map index values.

I actually don’t think #32701 completely fixes the issue—if I understand the code correctly, it will fail when we add nested task mapping (i.e. a mapped task inside a mapped task group), since that would make another dep not passing and break the workaround. But I guess that’s a problem we can address when (if) we ever reach there.

I’ll think about if it’s possible to enhance #32701, or how we can better to build on it if it’d involve too much change.

@uranusjr
Copy link
Member

On further consideration, I don’t think #32701 should be merged as-is. While marking all upstream tis as relevant fixed ONE_FAILED, it would create a bug in the reverse direction for things such as ALL_FAILED, since in that case, TriggerRuleDep would over-consider upstream tis.

I think TriggerRuleDep needs a much more significant rewrite to implement a proper fix.

@red-crown
Copy link

Is there any kind of workaround for this issue?

@shahar1
Copy link
Contributor

shahar1 commented Jun 15, 2024

The DAG in the How to reproduce section works as expected when running it using the current main branch (v2.9.2+), so I think it is safe to presume it has been fixed since v2.5.2. However, I cannot put my finger on the exact PR that fixed it (maybe #33903 ?) - feel free to comment if you're certain about that part.
Closing as resolved.

@shahar1 shahar1 closed this as completed Jun 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.5 Issues Reported for 2.5 area:core area:dynamic-task-mapping AIP-42 kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants