From 5e52bd29abd690098ecf0701b8aab4792566eea3 Mon Sep 17 00:00:00 2001 From: Shahar Epstein <60007259+shahar1@users.noreply.github.com> Date: Sun, 24 Nov 2024 03:29:20 +0200 Subject: [PATCH] Avoid grouping task instance stats by try_number for dynamic mapped tasks (#44300) --- airflow/www/views.py | 15 ++++++++-- newsfragments/44300.bugfix.rst | 1 + tests/www/views/test_views_grid.py | 44 ++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 newsfragments/44300.bugfix.rst diff --git a/airflow/www/views.py b/airflow/www/views.py index c7704ce394ee8..363b743c6749c 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -317,7 +317,10 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session) -> TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, - TaskInstance.try_number, + case( + (TaskInstance.map_index == -1, TaskInstance.try_number), + else_=None, + ).label("try_number"), func.min(TaskInstanceNote.content).label("note"), func.count(func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label("state_count"), func.min(TaskInstance.queued_dttm).label("queued_dttm"), @@ -329,7 +332,15 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session) -> TaskInstance.dag_id == dag.dag_id, TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]), ) - .group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance.try_number) + .group_by( + TaskInstance.task_id, + TaskInstance.run_id, + TaskInstance.state, + case( + (TaskInstance.map_index == -1, TaskInstance.try_number), + else_=None, + ), + ) .order_by(TaskInstance.task_id, TaskInstance.run_id) ) diff --git a/newsfragments/44300.bugfix.rst b/newsfragments/44300.bugfix.rst new file mode 100644 index 0000000000000..ffd4b07b2ab0d --- /dev/null +++ b/newsfragments/44300.bugfix.rst @@ -0,0 +1 @@ +Fix stats of dynamic mapped tasks after automatic retries of failed tasks diff --git a/tests/www/views/test_views_grid.py b/tests/www/views/test_views_grid.py index c7a453ffaeeb0..067ca9325f6c0 100644 --- a/tests/www/views/test_views_grid.py +++ b/tests/www/views/test_views_grid.py @@ -520,3 +520,47 @@ def test_next_run_assets_404(admin_client): resp = admin_client.get("/object/next_run_assets/missingdag", follow_redirects=True) assert resp.status_code == 404, resp.json assert resp.json == {"error": "can't find dag missingdag"} + + +@pytest.mark.usefixtures("_freeze_time_for_dagruns") +def test_dynamic_mapped_task_with_retries(admin_client, dag_with_runs: list[DagRun], session): + """ + Test a DAG with a dynamic mapped task with retries + """ + run1, run2 = dag_with_runs + + for ti in run1.task_instances: + ti.state = TaskInstanceState.SUCCESS + for ti in sorted(run2.task_instances, key=lambda ti: (ti.task_id, ti.map_index)): + if ti.task_id == "task1": + ti.state = TaskInstanceState.SUCCESS + elif ti.task_id == "group.mapped": + if ti.map_index == 0: + ti.state = TaskInstanceState.FAILED + ti.start_date = pendulum.DateTime(2021, 7, 1, 1, 0, 0, tzinfo=pendulum.UTC) + ti.end_date = pendulum.DateTime(2021, 7, 1, 1, 2, 3, tzinfo=pendulum.UTC) + elif ti.map_index == 1: + ti.try_number = 1 + ti.state = TaskInstanceState.SUCCESS + ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, tzinfo=pendulum.UTC) + ti.end_date = None + elif ti.map_index == 2: + ti.try_number = 2 + ti.state = TaskInstanceState.FAILED + ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, tzinfo=pendulum.UTC) + ti.end_date = None + elif ti.map_index == 3: + ti.try_number = 3 + ti.state = TaskInstanceState.SUCCESS + ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, tzinfo=pendulum.UTC) + ti.end_date = None + session.flush() + + resp = admin_client.get(f"/object/grid_data?dag_id={DAG_ID}", follow_redirects=True) + + assert resp.status_code == 200, resp.json + + assert resp.json["groups"]["children"][-1]["children"][-1]["instances"][-1]["mapped_states"] == { + "failed": 2, + "success": 2, + }