Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branching doesn't work as expected #11347

Closed
grayver opened this issue Oct 8, 2020 · 2 comments
Closed

Branching doesn't work as expected #11347

grayver opened this issue Oct 8, 2020 · 2 comments
Labels
kind:bug This is a clearly a bug

Comments

@grayver
Copy link

grayver commented Oct 8, 2020

Apache Airflow version: 1.10.12

Environment:

  • Cloud provider or hardware configuration: Amazon EC2 instance, 4 CPU cores, 8GB RAM
  • OS (e.g. from /etc/os-release): Ubuntu 18.04.5 LTS (Bionic Beaver)
  • Kernel (e.g. uname -a): Linux ip-XX-XX-XX-XX.ec2.internal 5.4.0-1025-aws Tutorial improvements. #25~18.04.1-Ubuntu SMP Fri Sep 11 12:03:04 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux
  • Install tools: Ansible Airflow role (https://github.com/idealista/airflow-role)

What happened:

I've made a combination of example DAGs from Airflow documentation (Branching section and Trigger Rules section):

branching_dag.py
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.dates import days_ago
from airflow.utils.trigger_rule import TriggerRule

dag = DAG(
    dag_id='branching_dag',
    schedule_interval=None,
    start_date=days_ago(1)
)

run_this_first = DummyOperator(task_id='run_this_first', dag=dag)

branching = BranchPythonOperator(
    task_id='branching', dag=dag,
    python_callable=lambda: 'branch_a'
)

branch_a = DummyOperator(task_id='branch_a', dag=dag)
follow_branch_a = BashOperator(task_id='follow_branch_a', dag=dag, bash_command='sleep 30s')

branch_b = DummyOperator(task_id='branch_b', dag=dag)

join = DummyOperator(task_id='join', dag=dag, trigger_rule=TriggerRule.NONE_FAILED_OR_SKIPPED)

run_this_first >> branching
branching >> branch_a >> follow_branch_a >> join
branching >> join
branching >> branch_b >> join

Here is original image from Branching section of documentation:
original branching
and here is image from Trigger Rules section:
original trigger rules

According to DAG graph whichever branch would be selected, join task should be executed anyway. But I'm getting this:
airflow-branching-problem

So, join task is skipped right after branch_a is selected.

What you expected to happen:

According to documentation (Branching section):

Paths of the branching task are branch_a, join and branch_b. Since join is a downstream task of branch_a, it will be excluded from the skipped tasks when branch_a is returned by the Python callable.

and from Trigger rules section:

The join task will be triggered as soon as branch_false has been skipped (a valid completion state) and follow_branch_a has succeeded. Because skipped tasks will not cascade through none_failed_or_skipped.

But it's not fulfilled in practice.

@grayver grayver added the kind:bug This is a clearly a bug label Oct 8, 2020
@kaxil
Copy link
Member

kaxil commented Oct 8, 2020

This will be fixed in 1.10.13, PR: #10751

@kaxil
Copy link
Member

kaxil commented Oct 8, 2020

duplicate of #10725

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

3 participants