Skip to content

Commit

Permalink
Fix failure for None task summaries
Browse files Browse the repository at this point in the history
Alternative fix to #23908 - with added typing.
  • Loading branch information
potiuk committed May 26, 2022
1 parent 423b905 commit 7428be0
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
1 change: 1 addition & 0 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class AbstractOperator(LoggingMixin, DAGNode):

owner: str
task_id: str
is_mapped: ClassVar[bool]

HIDE_ATTRS_FROM_UI: ClassVar[FrozenSet[str]] = frozenset(
(
Expand Down
3 changes: 2 additions & 1 deletion airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

from airflow import models
from airflow.models import errors
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.utils import timezone
Expand Down Expand Up @@ -128,7 +129,7 @@ def get_mapped_summary(parent_instance, task_instances):
}


def get_task_summary(dag_run: DagRun, task, session: Session) -> Optional[Dict[str, Any]]:
def get_task_summary(dag_run: DagRun, task: AbstractOperator, session: Session) -> Optional[Dict[str, Any]]:
task_instance = (
session.query(TaskInstance)
.filter(
Expand Down
24 changes: 19 additions & 5 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from functools import wraps
from json import JSONDecodeError
from operator import itemgetter
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
from urllib.parse import parse_qsl, unquote, urlencode, urlparse

import lazy_object_proxy
Expand Down Expand Up @@ -126,6 +126,7 @@
from airflow.utils.session import NEW_SESSION, create_session, provide_session
from airflow.utils.state import State, TaskInstanceState
from airflow.utils.strings import to_boolean
from airflow.utils.task_group import TaskGroup
from airflow.utils.timezone import td_format, utcnow
from airflow.version import version
from airflow.www import auth, utils as wwwutils
Expand Down Expand Up @@ -250,15 +251,26 @@ def _safe_parse_datetime(v):
abort(400, f"Invalid datetime: {v!r}")


def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
def task_group_to_grid(
task_item_or_group: Union[AbstractOperator, TaskGroup],
dag: DAG,
dag_runs: Iterable[DagRun],
session: Session,
) -> Dict[str, Any]:
"""
Create a nested dict representation of this TaskGroup and its children used to construct
the Graph.
"""
if isinstance(task_item_or_group, AbstractOperator):
task_instances: List[Dict[str, Any]] = [
ts
for ts in filter(
None, (wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs)
)
]
return {
'id': task_item_or_group.task_id,
'instances': [wwwutils.get_task_summary(dr, task_item_or_group, session) for dr in dag_runs],
'instances': task_instances,
'label': task_item_or_group.label,
'extra_links': task_item_or_group.extra_links,
'is_mapped': task_item_or_group.is_mapped,
Expand All @@ -267,9 +279,11 @@ def task_group_to_grid(task_item_or_group, dag, dag_runs, session):
# Task Group
task_group = task_item_or_group

children = [task_group_to_grid(child, dag, dag_runs, session) for child in task_group.topological_sort()]
children: List[Dict[str, Any]] = [
task_group_to_grid(child, dag, dag_runs, session) for child in task_group.topological_sort()
]

def get_summary(dag_run, children):
def get_summary(dag_run: DagRun, children: List[Dict[str, Any]]):
child_instances = [child['instances'] for child in children if 'instances' in child]
child_instances = [item for sublist in child_instances for item in sublist]

Expand Down

0 comments on commit 7428be0

Please sign in to comment.