Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance multiple_outputs inference of dict typing #19608

Merged
merged 6 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,12 @@ repos:
- "4"
files: ^chart/values\.schema\.json$|^chart/values_schema\.schema\.json$
pass_filenames: true
# TODO: Bump to Python 3.7 when support for Python 3.6 is dropped in Airflow 2.3.
- repo: https://github.com/asottile/pyupgrade
rev: v2.31.0
hooks:
- id: pyupgrade
args: ["--py37-plus"]
args: ["--py36-plus"]
exclude: ^airflow/_vendor/
- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.9.0
Expand Down
23 changes: 15 additions & 8 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import inspect
import itertools
import re
import sys
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -108,9 +109,8 @@ class DecoratedOperator(BaseOperator):
:param op_args: a list of positional arguments that will get unpacked when
calling your callable (templated)
:type op_args: list
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
Defaults to False.
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
:param kwargs_to_upstream: For certain operators, we might need to upstream certain arguments
that would otherwise be absorbed by the DecoratedOperator (for example python_callable for the
Expand Down Expand Up @@ -231,7 +231,16 @@ def _validate_function(self, _, f):
@multiple_outputs.default
def _infer_multiple_outputs(self):
return_type = self.function_signature.return_annotation
ttype = getattr(return_type, "__origin__", None)

# If the return type annotation is already the builtins ``dict`` type, use it for the inference.
if return_type == dict:
ttype = return_type
# Checking if Python 3.6, ``__origin__`` attribute does not exist until 3.7; need to use ``__extra__``
# TODO: Remove check when support for Python 3.6 is dropped in Airflow 2.3.
elif sys.version_info < (3, 7):
ttype = getattr(return_type, "__extra__", None)
else:
ttype = getattr(return_type, "__origin__", None)

return return_type is not inspect.Signature.empty and ttype in (dict, Dict)

Expand Down Expand Up @@ -310,10 +319,8 @@ def task_decorator_factory(

:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
:param decorated_operator_class: The operator that executes the logic needed to run the python function in
the correct environment
Expand Down
16 changes: 6 additions & 10 deletions airflow/decorators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ class _PythonDecoratedOperator(DecoratedOperator, PythonOperator):
:param op_args: a list of positional arguments that will get unpacked when
calling your callable (templated)
:type op_args: list
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
Defaults to False.
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
"""

Expand Down Expand Up @@ -85,9 +84,8 @@ def python(

:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
Expand All @@ -109,10 +107,8 @@ def python_task(

:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
"""
return task_decorator_factory(
Expand Down
10 changes: 4 additions & 6 deletions airflow/decorators/python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ class _PythonVirtualenvDecoratedOperator(DecoratedOperator, PythonVirtualenvOper
:param op_args: a list of positional arguments that will get unpacked when
calling your callable (templated)
:type op_args: list
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
Defaults to False.
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False.
:type multiple_outputs: bool
"""

Expand Down Expand Up @@ -88,9 +87,8 @@ def virtualenv(

:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
:param multiple_outputs: If set to True, the decorated function's return value will be unrolled to
multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
Expand Down
25 changes: 19 additions & 6 deletions tests/decorators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
import unittest.mock
from collections import namedtuple
from datetime import date, timedelta
from typing import Dict, Tuple
from typing import Tuple

import pytest
from parameterized import parameterized

from airflow.decorators import task as task_decorator
from airflow.exceptions import AirflowException
Expand Down Expand Up @@ -113,13 +115,24 @@ def test_python_operator_python_callable_is_callable(self):
with pytest.raises(TypeError):
task_decorator(not_callable, dag=self.dag)

def test_infer_multiple_outputs_using_typing(self):
@task_decorator
def identity_dict(x: int, y: int) -> Dict[str, int]:
return {"x": x, "y": y}
@parameterized.expand([["dict"], ["dict[str, int]"], ["Dict"], ["Dict[str, int]"]])
def test_infer_multiple_outputs_using_dict_typing(self, test_return_annotation):
if sys.version_info < (3, 9) and test_return_annotation == "dict[str, int]":
self.skipTest("dict[...] not a supported typing prior to Python 3.9")

@task_decorator
def identity_dict(x: int, y: int) -> eval(test_return_annotation):
return {"x": x, "y": y}
Comment on lines +123 to +125
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible/easy to also handle the stringified annotation, e.g.

@task_decorator
def identity_dict(x: int, y: int) -> "dict":
    return {}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I don't see why not. I'll dig in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like typing.get_type_hints() does the eval automatically and can be useful here (to replace inspect.signature).

Copy link
Contributor Author

@josh-fell josh-fell Jan 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell we eventually get into the same __origin__ vs __extra__ attribute situation for Python 3.6 unfortunately. We can definitely add some tests for the stringified dict typing though.

Also looks like the recent refactor of airflow/decorators/base.py uses inspect.signature a little more extensively too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python 3.6

Haven’t we dropped it already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically it won't be dropped until 2.3 but this PR is slated for 2.2.4.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be viable to do de-stringification if 3.6 is not considered? We can add this in a separate PR that targets only 2.3+ if so.


assert identity_dict(5, 5).operator.multiple_outputs is True

@task_decorator
def identity_dict_stringified(x: int, y: int) -> test_return_annotation:
return {"x": x, "y": y}

assert identity_dict(5, 5).operator.multiple_outputs is True
assert identity_dict_stringified(5, 5).operator.multiple_outputs is True

def test_infer_multiple_outputs_using_other_typing(self):
@task_decorator
def identity_tuple(x: int, y: int) -> Tuple[int, int]:
return x, y
Expand Down