From ad7f474eec11c49b46badb17f78bf1bcfa47ff51 Mon Sep 17 00:00:00 2001 From: Hussein Awala Date: Thu, 6 Jul 2023 07:50:06 +0200 Subject: [PATCH] Fix `operator_extra_links` property serialization in mapped tasks (#31904) * reproduce the problem in a unit test * Fix operator_extra_links serialization * replace fget by __get__ (cherry picked from commit 3318212482c6e11ac5c2e2828f7e467bca5b7245) --- airflow/serialization/serialized_objects.py | 6 ++- tests/serialization/test_dag_serialization.py | 40 ++++++++++++++++++- 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 0f319a1afb363..04c427309c765 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -287,7 +287,7 @@ class BaseSerialization: _datetime_types = (datetime.datetime,) # Object types that are always excluded in serialization. - _excluded_types = (logging.Logger, Connection, type) + _excluded_types = (logging.Logger, Connection, type, property) _json_schema: Validator | None = None @@ -822,7 +822,9 @@ def _serialize_node(cls, op: BaseOperator | MappedOperator, include_deps: bool) if op.operator_extra_links: serialize_op["_operator_extra_links"] = cls._serialize_operator_extra_links( - op.operator_extra_links + op.operator_extra_links.__get__(op) + if isinstance(op.operator_extra_links, property) + else op.operator_extra_links ) if include_deps: diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index e656a4f9d5d95..6a48c3ff2b2bf 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -68,7 +68,7 @@ from airflow.utils.task_group import TaskGroup from airflow.utils.xcom import XCOM_RETURN_KEY from tests.test_utils.config import conf_vars -from tests.test_utils.mock_operators import CustomOperator, GoogleLink, MockOperator +from tests.test_utils.mock_operators import AirflowLink2, CustomOperator, GoogleLink, MockOperator from tests.test_utils.timetables import CustomSerializationTimetable, cron_timetable, delta_timetable repo_root = Path(airflow.__file__).parent.parent @@ -2481,3 +2481,41 @@ def tg(a: str) -> None: serde_tg = serde_dag.task_group.children["tg"] assert isinstance(serde_tg, MappedTaskGroup) assert serde_tg._expand_input == DictOfListsExpandInput({"a": [".", ".."]}) + + +def test_mapped_task_with_operator_extra_links_property(): + class _DummyOperator(BaseOperator): + def __init__(self, inputs, **kwargs): + super().__init__(**kwargs) + self.inputs = inputs + + @property + def operator_extra_links(self): + return (AirflowLink2(),) + + with DAG("test-dag", start_date=datetime(2020, 1, 1)) as dag: + _DummyOperator.partial(task_id="task").expand(inputs=[1, 2, 3]) + serialized_dag = SerializedBaseOperator.serialize(dag) + assert serialized_dag["tasks"][0] == { + "task_id": "task", + "expand_input": { + "type": "dict-of-lists", + "value": {"__type": "dict", "__var": {"inputs": [1, 2, 3]}}, + }, + "partial_kwargs": {}, + "_disallow_kwargs_override": False, + "_expand_input_attr": "expand_input", + "downstream_task_ids": [], + "_operator_extra_links": [{"tests.test_utils.mock_operators.AirflowLink2": {}}], + "ui_color": "#fff", + "ui_fgcolor": "#000", + "template_ext": [], + "template_fields": [], + "template_fields_renderers": {}, + "_task_type": "_DummyOperator", + "_task_module": "tests.serialization.test_dag_serialization", + "_is_empty": False, + "_is_mapped": True, + } + deserialized_dag = SerializedDAG.deserialize_dag(serialized_dag) + assert deserialized_dag.task_dict["task"].operator_extra_links == [AirflowLink2()]