Skip to content

Commit

Permalink
Show task status only for running dags or only for the last finished dag
Browse files Browse the repository at this point in the history
  • Loading branch information
avkirilishin committed Feb 6, 2022
1 parent 0f172f4 commit 24063fd
Showing 1 changed file with 28 additions and 8 deletions.
36 changes: 28 additions & 8 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,9 @@ def task_stats(self, session=None):

# Select all task_instances from active dag_runs.
running_task_instance_query_result = session.query(
TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state')
TaskInstance.dag_id.label('dag_id'),
TaskInstance.state.label('state'),
sqla.literal(True).label('is_dag_running'),
).join(
running_dag_run_query_result,
and_(
Expand All @@ -946,7 +948,11 @@ def task_stats(self, session=None):
# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
last_task_instance_query_result = (
session.query(TaskInstance.dag_id.label('dag_id'), TaskInstance.state.label('state'))
session.query(
TaskInstance.dag_id.label('dag_id'),
TaskInstance.state.label('state'),
sqla.literal(False).label('is_dag_running'),
)
.join(TaskInstance.dag_run)
.join(
last_dag_run,
Expand All @@ -963,14 +969,28 @@ def task_stats(self, session=None):
else:
final_task_instance_query_result = running_task_instance_query_result.subquery('final_ti')

qry = session.query(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.state,
sqla.func.count(),
).group_by(final_task_instance_query_result.c.dag_id, final_task_instance_query_result.c.state)
qry = (
session.query(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.state,
final_task_instance_query_result.c.is_dag_running,
sqla.func.count(),
)
.group_by(
final_task_instance_query_result.c.dag_id,
final_task_instance_query_result.c.state,
final_task_instance_query_result.c.is_dag_running,
)
.order_by(final_task_instance_query_result.c.is_dag_running.desc())
)

data = {}
for dag_id, state, count in qry:
has_running_dags = False
for dag_id, state, is_dag_running, count in qry:
if is_dag_running:
has_running_dags = True
elif has_running_dags:
break
if dag_id not in data:
data[dag_id] = {}
data[dag_id][state] = count
Expand Down

0 comments on commit 24063fd

Please sign in to comment.