diff --git a/airflow/www/views.py b/airflow/www/views.py index c2b62328c21da..ec02bf0bb810d 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -491,6 +491,31 @@ def get_downstream(task): return result +def get_task_stats_from_query(qry): + """ + Return a dict of the task quantity, grouped by dag id and task status. + + :param qry: The data in the format (, , , ), + ordered by and + """ + data = {} + last_dag_id = None + has_running_dags = False + for dag_id, state, is_dag_running, count in qry: + if last_dag_id != dag_id: + last_dag_id = dag_id + has_running_dags = False + elif not is_dag_running and has_running_dags: + continue + + if is_dag_running: + has_running_dags = True + if dag_id not in data: + data[dag_id] = {} + data[dag_id][state] = count + return data + + ###################################################################################### # Error handlers ###################################################################################### @@ -981,20 +1006,13 @@ def task_stats(self, session=None): final_task_instance_query_result.c.state, final_task_instance_query_result.c.is_dag_running, ) - .order_by(final_task_instance_query_result.c.is_dag_running.desc()) + .order_by( + final_task_instance_query_result.c.dag_id, + final_task_instance_query_result.c.is_dag_running.desc(), + ) ) - data = {} - has_running_dags = False - for dag_id, state, is_dag_running, count in qry: - if is_dag_running: - has_running_dags = True - elif has_running_dags: - break - if dag_id not in data: - data[dag_id] = {} - data[dag_id][state] = count - + data = get_task_stats_from_query(qry) payload = {} for dag_id in filter_dag_ids: payload[dag_id] = [] diff --git a/tests/www/views/test_views.py b/tests/www/views/test_views.py index 35f5910280267..66cf54ffd82e2 100644 --- a/tests/www/views/test_views.py +++ b/tests/www/views/test_views.py @@ -24,7 +24,13 @@ from airflow.configuration import initialize_config from airflow.plugins_manager import AirflowPlugin, EntryPointSource from airflow.www import views -from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration +from airflow.www.views import ( + get_key_paths, + get_safe_url, + get_task_stats_from_query, + get_value_from_path, + truncate_task_duration, +) from tests.test_utils.config import conf_vars from tests.test_utils.mock_plugins import mock_plugin_manager from tests.test_utils.www import check_content_in_response, check_content_not_in_response @@ -339,3 +345,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type): action_funcs = action_funcs - {"action_post"} for action_function in action_funcs: assert_decorator_used(cls, action_function, views.action_has_dag_edit_access) + + +def test_get_task_stats_from_query(): + query_data = [ + ['dag1', 'queued', True, 1], + ['dag1', 'running', True, 2], + ['dag1', 'success', False, 3], + ['dag2', 'running', True, 4], + ['dag2', 'success', True, 5], + ['dag3', 'success', False, 6], + ] + expected_data = { + 'dag1': { + 'queued': 1, + 'running': 2, + }, + 'dag2': { + 'running': 4, + 'success': 5, + }, + 'dag3': { + 'success': 6, + }, + } + + data = get_task_stats_from_query(query_data) + assert data == expected_data