From 1b60be88db430cda156d295388df240c1aecc456 Mon Sep 17 00:00:00 2001 From: r-richmond Date: Tue, 20 Apr 2021 23:42:25 -0700 Subject: [PATCH] When one_success mark task as failed if no success --- airflow/ti_deps/deps/trigger_rule_dep.py | 6 +++++- tests/models/test_taskinstance.py | 25 +++++++++++++++--------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 766b13324a0fc..3ac3bf0a74ae0 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -134,8 +134,12 @@ def _evaluate_trigger_rule( # pylint: disable=too-many-branches if successes or skipped: ti.set_state(State.SKIPPED, session) elif trigger_rule == TR.ONE_SUCCESS: - if upstream_done and not successes: + if upstream_done and done == skipped: + # if upstream is done and all are skipped mark as skipped ti.set_state(State.SKIPPED, session) + elif upstream_done and successes <= 0: + # if upstream is done and there are no successes mark as upstream failed + ti.set_state(State.UPSTREAM_FAILED, session) elif trigger_rule == TR.ONE_FAILED: if upstream_done and not (failed or upstream_failed): ti.set_state(State.SKIPPED, session) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 8c3b1001f7d7d..3184992a585b6 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -905,6 +905,13 @@ def test_depends_on_past(self): ['one_success', 2, 0, 0, 0, 2, True, None, True], ['one_success', 2, 0, 1, 0, 3, True, None, True], ['one_success', 2, 1, 0, 0, 3, True, None, True], + ['one_success', 0, 5, 0, 0, 5, True, State.SKIPPED, False], + ['one_success', 0, 4, 1, 0, 5, True, State.UPSTREAM_FAILED, False], + ['one_success', 0, 3, 1, 1, 5, True, State.UPSTREAM_FAILED, False], + ['one_success', 0, 4, 0, 1, 5, True, State.UPSTREAM_FAILED, False], + ['one_success', 0, 0, 5, 0, 5, True, State.UPSTREAM_FAILED, False], + ['one_success', 0, 0, 4, 1, 5, True, State.UPSTREAM_FAILED, False], + ['one_success', 0, 0, 0, 5, 5, True, State.UPSTREAM_FAILED, False], # # Tests for all_failed # @@ -932,15 +939,15 @@ def test_depends_on_past(self): ) def test_check_task_dependencies( self, - trigger_rule, - successes, - skipped, - failed, - upstream_failed, - done, - flag_upstream_failed, - expect_state, - expect_completed, + trigger_rule: str, + successes: int, + skipped: int, + failed: int, + upstream_failed: int, + done: int, + flag_upstream_failed: bool, + expect_state: State, + expect_completed: bool, ): start_date = timezone.datetime(2016, 2, 1, 0, 0, 0) dag = models.DAG('test-dag', start_date=start_date)