Skip to content

Commit

Permalink
Brought the logic of getting task statistics into a separate function
Browse files Browse the repository at this point in the history
  • Loading branch information
avkirilishin committed Feb 10, 2022
1 parent 24063fd commit 2b15757
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 13 deletions.
42 changes: 30 additions & 12 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,31 @@ def get_downstream(task):
return result


def get_task_stats_from_query(qry):
"""
Return a dict of the task quantity, grouped by dag id and task status.
:param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>),
ordered by <dag id> and <is dag running>
"""
data = {}
last_dag_id = None
has_running_dags = False
for dag_id, state, is_dag_running, count in qry:
if last_dag_id != dag_id:
last_dag_id = dag_id
has_running_dags = False
elif not is_dag_running and has_running_dags:
continue

if is_dag_running:
has_running_dags = True
if dag_id not in data:
data[dag_id] = {}
data[dag_id][state] = count
return data


######################################################################################
# Error handlers
######################################################################################
Expand Down Expand Up @@ -981,20 +1006,13 @@ def task_stats(self, session=None):
final_task_instance_query_result.c.state,
final_task_instance_query_result.c.is_dag_running,
)
.order_by(final_task_instance_query_result.c.is_dag_running.desc())
.order_by(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.is_dag_running.desc(),
)
)

data = {}
has_running_dags = False
for dag_id, state, is_dag_running, count in qry:
if is_dag_running:
has_running_dags = True
elif has_running_dags:
break
if dag_id not in data:
data[dag_id] = {}
data[dag_id][state] = count

data = get_task_stats_from_query(qry)
payload = {}
for dag_id in filter_dag_ids:
payload[dag_id] = []
Expand Down
35 changes: 34 additions & 1 deletion tests/www/views/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@
from airflow.configuration import initialize_config
from airflow.plugins_manager import AirflowPlugin, EntryPointSource
from airflow.www import views
from airflow.www.views import get_key_paths, get_safe_url, get_value_from_path, truncate_task_duration
from airflow.www.views import (
get_key_paths,
get_safe_url,
get_task_stats_from_query,
get_value_from_path,
truncate_task_duration,
)
from tests.test_utils.config import conf_vars
from tests.test_utils.mock_plugins import mock_plugin_manager
from tests.test_utils.www import check_content_in_response, check_content_not_in_response
Expand Down Expand Up @@ -339,3 +345,30 @@ def test_dag_edit_privileged_requires_view_has_action_decorators(cls: type):
action_funcs = action_funcs - {"action_post"}
for action_function in action_funcs:
assert_decorator_used(cls, action_function, views.action_has_dag_edit_access)


def test_get_task_stats_from_query():
query_data = [
['dag1', 'queued', True, 1],
['dag1', 'running', True, 2],
['dag1', 'success', False, 3],
['dag2', 'running', True, 4],
['dag2', 'success', True, 5],
['dag3', 'success', False, 6],
]
expected_data = {
'dag1': {
'queued': 1,
'running': 2,
},
'dag2': {
'running': 4,
'success': 5,
},
'dag3': {
'success': 6,
},
}

data = get_task_stats_from_query(query_data)
assert data == expected_data

0 comments on commit 2b15757

Please sign in to comment.