diff --git a/airflow/utils/dot_renderer.py b/airflow/utils/dot_renderer.py index bd65300e409bd..f36075eac4853 100644 --- a/airflow/utils/dot_renderer.py +++ b/airflow/utils/dot_renderer.py @@ -17,7 +17,7 @@ # specific language governing permissions and limitations # under the License. """Renderer DAG (tasks and dependencies) to the graphviz object.""" -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import graphviz @@ -25,6 +25,7 @@ from airflow.models import TaskInstance from airflow.models.baseoperator import BaseOperator from airflow.models.dag import DAG +from airflow.models.mappedoperator import MappedOperator from airflow.models.taskmixin import DependencyMixin from airflow.serialization.serialized_objects import DagDependency from airflow.utils.state import State @@ -49,7 +50,9 @@ def _refine_color(color: str): def _draw_task( - task: BaseOperator, parent_graph: graphviz.Digraph, states_by_task_id: Optional[Dict[Any, Any]] + task: Union[MappedOperator, BaseOperator], + parent_graph: graphviz.Digraph, + states_by_task_id: Optional[Dict[Any, Any]], ) -> None: """Draw a single task on the given parent_graph""" if states_by_task_id: @@ -114,7 +117,7 @@ def _draw_nodes( node: DependencyMixin, parent_graph: graphviz.Digraph, states_by_task_id: Optional[Dict[str, str]] ) -> None: """Draw the node and its children on the given parent_graph recursively.""" - if isinstance(node, BaseOperator): + if isinstance(node, BaseOperator) or isinstance(node, MappedOperator): _draw_task(node, parent_graph, states_by_task_id) else: if not isinstance(node, TaskGroup): diff --git a/tests/utils/test_dot_renderer.py b/tests/utils/test_dot_renderer.py index 5d376e1e08147..5c5b47e7a5252 100644 --- a/tests/utils/test_dot_renderer.py +++ b/tests/utils/test_dot_renderer.py @@ -107,6 +107,19 @@ def test_should_render_dag_with_task_instances(self, session, dag_maker): 'third [color=black fillcolor=lime label=third shape=rectangle style="filled,rounded"]' in source ) + def test_should_render_dag_with_mapped_operator(self, session, dag_maker): + with dag_maker(dag_id="DAG_ID", session=session) as dag: + BashOperator.partial(task_id="first").expand(bash_command=["echo hello", "echo world"]) + + dot = dot_renderer.render_dag(dag) + source = dot.source + # Should render DAG title + assert "label=DAG_ID" in source + assert ( + 'first [color="#000000" fillcolor="#f0ede4" label=first shape=rectangle style="filled,rounded"]' + in source + ) + def test_should_render_dag_orientation(self, session, dag_maker): orientation = "TB" with dag_maker(dag_id="DAG_ID", orientation=orientation, session=session) as dag: