From 7428be0e9e11816decec19856abea3036d90f3f4 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 26 May 2022 12:57:59 +0200 Subject: [PATCH] Fix failure for None task summaries Alternative fix to #23908 - with added typing. --- airflow/models/abstractoperator.py | 1 + airflow/www/utils.py | 3 ++- airflow/www/views.py | 24 +++++++++++++++++++----- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/airflow/models/abstractoperator.py b/airflow/models/abstractoperator.py index 6187c37182950..89f128aa3e475 100644 --- a/airflow/models/abstractoperator.py +++ b/airflow/models/abstractoperator.py @@ -101,6 +101,7 @@ class AbstractOperator(LoggingMixin, DAGNode): owner: str task_id: str + is_mapped: ClassVar[bool] HIDE_ATTRS_FROM_UI: ClassVar[FrozenSet[str]] = frozenset( ( diff --git a/airflow/www/utils.py b/airflow/www/utils.py index 2bdc2939bfd61..3a1ae0053ec96 100644 --- a/airflow/www/utils.py +++ b/airflow/www/utils.py @@ -40,6 +40,7 @@ from airflow import models from airflow.models import errors +from airflow.models.abstractoperator import AbstractOperator from airflow.models.dagrun import DagRun from airflow.models.taskinstance import TaskInstance from airflow.utils import timezone @@ -128,7 +129,7 @@ def get_mapped_summary(parent_instance, task_instances): } -def get_task_summary(dag_run: DagRun, task, session: Session) -> Optional[Dict[str, Any]]: +def get_task_summary(dag_run: DagRun, task: AbstractOperator, session: Session) -> Optional[Dict[str, Any]]: task_instance = ( session.query(TaskInstance) .filter( diff --git a/airflow/www/views.py b/airflow/www/views.py index 4c3d33a6b1e05..0f87d5c77b38a 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -32,7 +32,7 @@ from functools import wraps from json import JSONDecodeError from operator import itemgetter -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union from urllib.parse import parse_qsl, unquote, urlencode, urlparse import lazy_object_proxy @@ -126,6 +126,7 @@ from airflow.utils.session import NEW_SESSION, create_session, provide_session from airflow.utils.state import State, TaskInstanceState from airflow.utils.strings import to_boolean +from airflow.utils.task_group import TaskGroup from airflow.utils.timezone import td_format, utcnow from airflow.version import version from airflow.www import auth, utils as wwwutils @@ -250,15 +251,26 @@ def _safe_parse_datetime(v): abort(400, f"Invalid datetime: {v!r}") -def task_group_to_grid(task_item_or_group, dag, dag_runs, session): +def task_group_to_grid( + task_item_or_group: Union[AbstractOperator, TaskGroup], + dag: DAG, + dag_runs: Iterable[DagRun], + session: Session, +) -> Dict[str, Any]: """ Create a nested dict representation of this TaskGroup and its children used to construct the Graph. """ if isinstance(task_item_or_group, AbstractOperator): + task_instances: List[Dict[str, Any]] = [ + ts + for ts in filter( + None, (wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs) + ) + ] return { 'id': task_item_or_group.task_id, - 'instances': [wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs], + 'instances': task_instances, 'label': task_item_or_group.label, 'extra_links': task_item_or_group.extra_links, 'is_mapped': task_item_or_group.is_mapped, @@ -267,9 +279,11 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, session): # Task Group task_group = task_item_or_group - children = [task_group_to_grid(child, dag, dag_runs, session) for child in task_group.topological_sort()] + children: List[Dict[str, Any]] = [ + task_group_to_grid(child, dag, dag_runs, session) for child in task_group.topological_sort() + ] - def get_summary(dag_run, children): + def get_summary(dag_run: DagRun, children: List[Dict[str, Any]]): child_instances = [child['instances'] for child in children if 'instances' in child] child_instances = [item for sublist in child_instances for item in sublist]