diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7d451dc27d3be..7aebe585e6d55 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -195,11 +195,12 @@ repos: - "4" files: ^chart/values\.schema\.json$|^chart/values_schema\.schema\.json$ pass_filenames: true + # TODO: Bump to Python 3.7 when support for Python 3.6 is dropped in Airflow 2.3. - repo: https://github.com/asottile/pyupgrade rev: v2.31.0 hooks: - id: pyupgrade - args: ["--py37-plus"] + args: ["--py36-plus"] exclude: ^airflow/_vendor/ - repo: https://github.com/pre-commit/pygrep-hooks rev: v1.9.0 diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py index f098bb161e053..e8216625758af 100644 --- a/airflow/decorators/base.py +++ b/airflow/decorators/base.py @@ -19,6 +19,7 @@ import inspect import itertools import re +import sys from typing import ( Any, Callable, @@ -108,9 +109,8 @@ class DecoratedOperator(BaseOperator): :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :type op_args: list - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool :param kwargs_to_upstream: For certain operators, we might need to upstream certain arguments that would otherwise be absorbed by the DecoratedOperator (for example python_callable for the @@ -231,7 +231,16 @@ def _validate_function(self, _, f): @multiple_outputs.default def _infer_multiple_outputs(self): return_type = self.function_signature.return_annotation - ttype = getattr(return_type, "__origin__", None) + + # If the return type annotation is already the builtins ``dict`` type, use it for the inference. + if return_type == dict: + ttype = return_type + # Checking if Python 3.6, ``__origin__`` attribute does not exist until 3.7; need to use ``__extra__`` + # TODO: Remove check when support for Python 3.6 is dropped in Airflow 2.3. + elif sys.version_info < (3, 7): + ttype = getattr(return_type, "__extra__", None) + else: + ttype = getattr(return_type, "__origin__", None) return return_type is not inspect.Signature.empty and ttype in (dict, Dict) @@ -310,10 +319,8 @@ def task_decorator_factory( :param python_callable: Function to decorate :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool :param decorated_operator_class: The operator that executes the logic needed to run the python function in the correct environment diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py index e08254888be65..05794ea65dc5b 100644 --- a/airflow/decorators/python.py +++ b/airflow/decorators/python.py @@ -33,9 +33,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator): :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :type op_args: list - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ @@ -85,9 +84,8 @@ def python( :param python_callable: Function to decorate :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ @@ -109,10 +107,8 @@ def python_task( :param python_callable: Function to decorate :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ return task_decorator_factory( diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py index 15ecbde8c6c41..b08cad1bed938 100644 --- a/airflow/decorators/python_virtualenv.py +++ b/airflow/decorators/python_virtualenv.py @@ -36,9 +36,8 @@ class _PythonVirtualenvDecoratedOperator(DecoratedOperator, PythonVirtualenvOper :param op_args: a list of positional arguments that will get unpacked when calling your callable (templated) :type op_args: list - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. - Defaults to False. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ @@ -88,9 +87,8 @@ def virtualenv( :param python_callable: Function to decorate :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. + :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to + multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ diff --git a/tests/decorators/test_python.py b/tests/decorators/test_python.py index 48f6c144ed24a..002993c91a0db 100644 --- a/tests/decorators/test_python.py +++ b/tests/decorators/test_python.py @@ -15,12 +15,14 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import sys import unittest.mock from collections import namedtuple from datetime import date, timedelta -from typing import Dict, Tuple +from typing import Tuple import pytest +from parameterized import parameterized from airflow.decorators import task as task_decorator from airflow.exceptions import AirflowException @@ -113,13 +115,24 @@ def test_python_operator_python_callable_is_callable(self): with pytest.raises(TypeError): task_decorator(not_callable, dag=self.dag) - def test_infer_multiple_outputs_using_typing(self): - @task_decorator - def identity_dict(x: int, y: int) -> Dict[str, int]: - return {"x": x, "y": y} + @parameterized.expand([["dict"], ["dict[str, int]"], ["Dict"], ["Dict[str, int]"]]) + def test_infer_multiple_outputs_using_dict_typing(self, test_return_annotation): + if sys.version_info < (3, 9) and test_return_annotation == "dict[str, int]": + self.skipTest("dict[...] not a supported typing prior to Python 3.9") + + @task_decorator + def identity_dict(x: int, y: int) -> eval(test_return_annotation): + return {"x": x, "y": y} + + assert identity_dict(5, 5).operator.multiple_outputs is True + + @task_decorator + def identity_dict_stringified(x: int, y: int) -> test_return_annotation: + return {"x": x, "y": y} - assert identity_dict(5, 5).operator.multiple_outputs is True + assert identity_dict_stringified(5, 5).operator.multiple_outputs is True + def test_infer_multiple_outputs_using_other_typing(self): @task_decorator def identity_tuple(x: int, y: int) -> Tuple[int, int]: return x, y