From a9772cf287111a63eac8c2deb1190f7054d7580f Mon Sep 17 00:00:00 2001 From: Daniel Imberman Date: Mon, 20 Sep 2021 16:41:26 -0500 Subject: [PATCH] Add a Docker Taskflow decorator (#15330) Add the ability to run @task.docker on a python function and turn it into a DockerOperator that can run that python function remotely. ``` @task.docker( image="quay.io/bitnami/python:3.8.8", force_pull=True, docker_url="unix://var/run/docker.sock", network_mode="bridge", api_version='auto', ) def f(): import random return [random.random() for i in range(10000000)] ``` One notable aspect of this architecture is that we had to build it to make as few assumptions about user setups as possible. We could not share a volume between the worker and the container as this would break if the user runs the airflow worker on a docker container. We could not assume that users would have any specialized system libraries on their images (this implementation only requires python 3 and bash). To work with these requirements, we use base64 encoding to store a jinja generated python file and inputs (which are generated using the same functions used by the PythonVirtualEnvOperator). Once the container starts, it uses these environment variables to deserialize the strings, run the function, and store the result in a file located at /tmp/script.out. Once the function completes, we create a sleep loop until the DockerOperator retrieves the result via docker's get_archive API. This result can then be deserialized using pickle and sent to Airflow's XCom library in the same fashion as a python or python_virtualenv result. Co-authored-by: Ash Berlin-Taylor Co-authored-by: Ash Berlin-Taylor --- airflow/decorators/__init__.py | 137 ++------- airflow/decorators/base.py | 3 +- airflow/decorators/python.py | 40 ++- airflow/decorators/python_virtualenv.py | 45 +-- ...ial_taskflow_api_etl_docker_virtualenv.py} | 2 +- airflow/provider.yaml.schema.json | 12 + airflow/provider_info.schema.json | 12 + .../providers/docker/decorators/__init__.py | 17 ++ airflow/providers/docker/decorators/docker.py | 289 ++++++++++++++++++ airflow/providers/docker/operators/docker.py | 52 +++- airflow/providers/docker/provider.yaml | 4 + airflow/providers_manager.py | 92 +++++- airflow/utils/python_virtualenv_script.jinja2 | 17 +- .../howto/create-custom-decorator.rst | 120 ++++++++ docs/apache-airflow/howto/index.rst | 1 + docs/apache-airflow/tutorial_taskflow_api.rst | 40 ++- tests/providers/docker/decorators/__init__.py | 16 + .../docker/decorators/test_docker.py | 121 ++++++++ 18 files changed, 845 insertions(+), 175 deletions(-) rename airflow/example_dags/{tutorial_taskflow_api_etl_virtualenv.py => tutorial_taskflow_api_etl_docker_virtualenv.py} (97%) create mode 100644 airflow/providers/docker/decorators/__init__.py create mode 100644 airflow/providers/docker/decorators/docker.py create mode 100644 docs/apache-airflow/howto/create-custom-decorator.rst create mode 100644 tests/providers/docker/decorators/__init__.py create mode 100644 tests/providers/docker/decorators/test_docker.py diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index 1250f321de291..ef490dfcbf917 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -15,129 +15,36 @@ # specific language governing permissions and limitations # under the License. -from typing import Callable, Dict, Iterable, List, Optional, Union +from typing import TYPE_CHECKING -from airflow.decorators.python import python_task -from airflow.decorators.python_virtualenv import _virtualenv_task +from airflow.decorators.python import PythonDecoratorMixin, python_task # noqa +from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin from airflow.decorators.task_group import task_group # noqa from airflow.models.dag import dag # noqa +from airflow.providers_manager import ProvidersManager -class _TaskDecorator: - def __call__( - self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs - ): - """ - Python operator decorator. Wraps a function into an Airflow operator. - Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG. +class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin): + def __getattr__(self, name): + if name.startswith("__"): + raise AttributeError(f'{type(self).__name__} has no attribute {name!r}') + decorators = ProvidersManager().taskflow_decorators + if name not in decorators: + raise AttributeError(f"task decorator {name!r} not found") + return decorators[name] - :param python_callable: Function to decorate - :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. - :type multiple_outputs: bool - """ - return self.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs) - @staticmethod - def python(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs): - """ - Python operator decorator. Wraps a function into an Airflow operator. - Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG. +# [START mixin_for_autocomplete] +if TYPE_CHECKING: + try: + from airflow.providers.docker.decorators.docker import DockerDecoratorMixin - :param python_callable: Function to decorate - :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. - :type multiple_outputs: bool - """ - return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs) - - @staticmethod - def virtualenv( - python_callable: Optional[Callable] = None, - multiple_outputs: Optional[bool] = None, - requirements: Optional[Iterable[str]] = None, - python_version: Optional[Union[str, int, float]] = None, - use_dill: bool = False, - system_site_packages: bool = True, - string_args: Optional[Iterable[str]] = None, - templates_dict: Optional[Dict] = None, - templates_exts: Optional[List[str]] = None, - **kwargs, - ): - """ - Allows one to run a function in a virtualenv that is - created and destroyed automatically (with certain caveats). - - The function must be defined using def, and not be - part of a class. All imports must happen inside the function - and no variables outside of the scope may be referenced. A global scope - variable named virtualenv_string_args will be available (populated by - string_args). In addition, one can pass stuff through op_args and op_kwargs, and one - can use a return value. - Note that if your virtualenv runs in a different Python major version than Airflow, - you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to - Airflow through plugins. You can use string_args though. - - .. seealso:: - For more information on how to use this operator, take a look at the guide: - :ref:`howto/operator:PythonVirtualenvOperator` - - :param python_callable: A python function with no references to outside variables, - defined with def, which will be run in a virtualenv - :type python_callable: function - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. - :type multiple_outputs: bool - :param requirements: A list of requirements as specified in a pip install command - :type requirements: list[str] - :param python_version: The Python version to run the virtualenv with. Note that - both 2 and 2.7 are acceptable forms. - :type python_version: Optional[Union[str, int, float]] - :param use_dill: Whether to use dill to serialize - the args and result (pickle is default). This allow more complex types - but requires you to include dill in your requirements. - :type use_dill: bool - :param system_site_packages: Whether to include - system_site_packages in your virtualenv. - See virtualenv documentation for more information. - :type system_site_packages: bool - :param op_args: A list of positional arguments to pass to python_callable. - :type op_args: list - :param op_kwargs: A dict of keyword arguments to pass to python_callable. - :type op_kwargs: dict - :param string_args: Strings that are present in the global var virtualenv_string_args, - available to python_callable at runtime as a list[str]. Note that args are split - by newline. - :type string_args: list[str] - :param templates_dict: a dictionary where the values are templates that - will get templated by the Airflow engine sometime between - ``__init__`` and ``execute`` takes place and are made available - in your callable's context after the template has been applied - :type templates_dict: dict of str - :param templates_exts: a list of file extensions to resolve while - processing templated fields, for examples ``['.sql', '.hql']`` - :type templates_exts: list[str] - """ - return _virtualenv_task( - python_callable=python_callable, - multiple_outputs=multiple_outputs, - requirements=requirements, - python_version=python_version, - use_dill=use_dill, - system_site_packages=system_site_packages, - string_args=string_args, - templates_dict=templates_dict, - templates_exts=templates_exts, - **kwargs, - ) + class _DockerTask(_TaskDecorator, DockerDecoratorMixin): + pass + _TaskDecorator = _DockerTask + except ImportError: + pass +# [END mixin_for_autocomplete] task = _TaskDecorator() diff --git a/airflow/decorators/base.py b/airflow/decorators/base.py index b13396484c305..229a114fc9cfb 100644 --- a/airflow/decorators/base.py +++ b/airflow/decorators/base.py @@ -132,8 +132,7 @@ def __init__( def execute(self, context: Dict): return_value = super().execute(context) - self._handle_output(return_value=return_value, context=context, xcom_push=self.xcom_push) - return return_value + return self._handle_output(return_value=return_value, context=context, xcom_push=self.xcom_push) def _handle_output(self, return_value: Any, context: Dict, xcom_push: Callable): """ diff --git a/airflow/decorators/python.py b/airflow/decorators/python.py index a7251cb241c00..7dc6c1bff088b 100644 --- a/airflow/decorators/python.py +++ b/airflow/decorators/python.py @@ -62,11 +62,49 @@ def __init__( T = TypeVar("T", bound=Callable) +class PythonDecoratorMixin: + """ + Helper class for inheritance. This class is only used for the __init__.pyi so that IDEs + will autocomplete docker decorator functions + + :meta private: + """ + + def __call__( + self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs + ): + return self.python(python_callable, multiple_outputs, **kwargs) + + def python( + self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs + ): + """ + Python operator decorator. Wraps a function into an Airflow operator. + + Accepts kwargs for operator kwarg. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :type python_callable: Optional[Callable] + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. List/Tuples will unroll to xcom values + with index as key. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + :type multiple_outputs: bool + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_PythonDecoratedOperator, + **kwargs, + ) + + def python_task( python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs ): """ - Python operator decorator. Wraps a function into an Airflow operator. + Wraps a function into an Airflow operator. + Accepts kwargs for operator kwarg. Can be reused in a single DAG. :param python_callable: Function to decorate diff --git a/airflow/decorators/python_virtualenv.py b/airflow/decorators/python_virtualenv.py index 61e5d1f5dc93e..8024e5a99ca5a 100644 --- a/airflow/decorators/python_virtualenv.py +++ b/airflow/decorators/python_virtualenv.py @@ -70,24 +70,33 @@ def get_python_source(self): T = TypeVar("T", bound=Callable) -def _virtualenv_task( - python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs -): +class PythonVirtualenvDecoratorMixin: """ - Python operator decorator. Wraps a function into an Airflow operator. - Accepts kwargs for operator kwarg. Can be reused in a single DAG. + Helper class for inheritance. This class is only used for the __init__.pyi so that IDEs + will autocomplete docker decorator functions - :param python_callable: Function to decorate - :type python_callable: Optional[Callable] - :param multiple_outputs: if set, function return value will be - unrolled to multiple XCom values. List/Tuples will unroll to xcom values - with index as key. Dict will unroll to xcom values with keys as XCom keys. - Defaults to False. - :type multiple_outputs: bool + :meta private: """ - return task_decorator_factory( - python_callable=python_callable, - multiple_outputs=multiple_outputs, - decorated_operator_class=_PythonVirtualenvDecoratedOperator, - **kwargs, - ) + + def virtualenv( + self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs + ): + """ + Wraps a python function into an Airflow operator to run via a Python virtual environment. + + Accepts kwargs for operator kwarg. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :type python_callable: Optional[Callable] + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. List/Tuples will unroll to xcom values + with index as key. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + :type multiple_outputs: bool + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_PythonVirtualenvDecoratedOperator, + **kwargs, + ) diff --git a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py b/airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py similarity index 97% rename from airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py rename to airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py index 3a7a3244b1778..c89ea9b358ec2 100644 --- a/airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py +++ b/airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py @@ -62,7 +62,7 @@ def extract(): # [END extract_virtualenv] # [START transform_docker] - @task.virtualenv(multiple_outputs=True) + @task.docker(image='python:3.9-slim-buster', multiple_outputs=True) def transform(order_data_dict: dict): """ #### Transform task diff --git a/airflow/provider.yaml.schema.json b/airflow/provider.yaml.schema.json index f72338dbc7f24..6d650a2e96e4f 100644 --- a/airflow/provider.yaml.schema.json +++ b/airflow/provider.yaml.schema.json @@ -233,6 +233,18 @@ "type": "object", "description": "Additional extras that the provider should have" }, + "task-decorators": { + "type": "array", + "description": "Decorators to use with the TaskFlow API. Can be accessed by users via '@task.'", + "items": { + "name": { + "type": "string" + }, + "path": { + "type": "string" + } + } + }, "secrets-backends": { "type": "array", "description": "Secrets Backend class names", diff --git a/airflow/provider_info.schema.json b/airflow/provider_info.schema.json index 7a369a5777334..e36fe609ca91c 100644 --- a/airflow/provider_info.schema.json +++ b/airflow/provider_info.schema.json @@ -70,6 +70,18 @@ "items": { "type": "string" } + }, + "task-decorators": { + "type": "array", + "description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.'", + "items": { + "name": { + "type": "string" + }, + "path": { + "type": "string" + } + } } }, "required": [ diff --git a/airflow/providers/docker/decorators/__init__.py b/airflow/providers/docker/decorators/__init__.py new file mode 100644 index 0000000000000..217e5db960782 --- /dev/null +++ b/airflow/providers/docker/decorators/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/docker/decorators/docker.py b/airflow/providers/docker/decorators/docker.py new file mode 100644 index 0000000000000..2e02d2ae06efe --- /dev/null +++ b/airflow/providers/docker/decorators/docker.py @@ -0,0 +1,289 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import base64 +import inspect +import os +import pickle +from tempfile import TemporaryDirectory +from textwrap import dedent +from typing import Callable, Dict, Iterable, List, Optional, TypeVar, Union + +import dill + +from airflow.decorators.base import DecoratedOperator, task_decorator_factory +from airflow.providers.docker.operators.docker import DockerOperator +from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script + + +def _generate_decode_command(env_var, file): + # We don't need `f.close()` as the interpreter is about to exit anyway + return ( + f'python -c "import base64, os;' + rf'x = base64.b64decode(os.environ[\"{env_var}\"]);' + rf'f = open(\"{file}\", \"wb\"); f.write(x);"' + ) + + +def _b64_encode_file(filename): + with open(filename, "rb") as file_to_encode: + return base64.b64encode(file_to_encode.read()) + + +class _DockerDecoratedOperator(DecoratedOperator, DockerOperator): + """ + Wraps a Python callable and captures args/kwargs when called for execution. + + :param python_callable: A reference to an object that is callable + :type python_callable: python callable + :param op_kwargs: a dictionary of keyword arguments that will get unpacked + in your function (templated) + :type op_kwargs: dict + :param op_args: a list of positional arguments that will get unpacked when + calling your callable (templated) + :type op_args: list + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys. + Defaults to False. + :type multiple_outputs: bool + """ + + template_fields = ('op_args', 'op_kwargs') + + # since we won't mutate the arguments, we should just do the shallow copy + # there are some cases we can't deepcopy the objects (e.g protobuf). + shallow_copy_attrs = ('python_callable',) + + def __init__( + self, + use_dill=False, + **kwargs, + ) -> None: + command = "dummy command" + self.pickling_library = dill if use_dill else pickle + super().__init__( + command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs + ) + + def execute(self, context: Dict): + with TemporaryDirectory(prefix='venv') as tmp_dir: + input_filename = os.path.join(tmp_dir, 'script.in') + script_filename = os.path.join(tmp_dir, 'script.py') + + with open(input_filename, 'wb') as file: + if self.op_args or self.op_kwargs: + self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file) + py_source = self._get_python_source() + write_python_script( + jinja_context=dict( + op_args=self.op_args, + op_kwargs=self.op_kwargs, + pickling_library=self.pickling_library.__name__, + python_callable=self.python_callable.__name__, + python_callable_source=py_source, + string_args_global=False, + ), + filename=script_filename, + ) + + # Pass the python script to be executed, and the input args, via environment variables. This is + # more than slightly hacky, but it means it can work when Airflow itself is in the same Docker + # engine where this task is going to run (unlike say trying to mount a file in) + self.environment["__PYTHON_SCRIPT"] = _b64_encode_file(script_filename) + if self.op_args or self.op_kwargs: + self.environment["__PYTHON_INPUT"] = _b64_encode_file(input_filename) + else: + self.environment["__PYTHON_INPUT"] = "" + + self.command = ( + f"""bash -cx '{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} &&""" + f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in")} &&' + f'python /tmp/script.py /tmp/script.in /tmp/script.out\'' + ) + return super().execute(context) + + def _get_python_source(self): + raw_source = inspect.getsource(self.python_callable) + res = dedent(raw_source) + res = remove_task_decorator(res, "@task.docker") + return res + + +T = TypeVar("T", bound=Callable) # pylint: disable=invalid-name + + +def docker_task( + python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs +): + """ + Python operator decorator. Wraps a function into an Airflow operator. + Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :type python_callable: Optional[Callable] + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. List/Tuples will unroll to xcom values + with index as key. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + :type multiple_outputs: bool + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_DockerDecoratedOperator, + **kwargs, + ) + + +# [START decoratormixin] +class DockerDecoratorMixin: + """ + Helper class for inheritance. This class is only used during type checking or auto-completion + + :meta private: + """ + + def docker( + self, + multiple_outputs: Optional[bool] = None, + use_dill: bool = False, + image: str = "", + api_version: Optional[str] = None, + container_name: Optional[str] = None, + cpus: float = 1.0, + docker_url: str = 'unix://var/run/docker.sock', + environment: Optional[Dict] = None, + private_environment: Optional[Dict] = None, + force_pull: bool = False, + mem_limit: Optional[Union[float, str]] = None, + host_tmp_dir: Optional[str] = None, + network_mode: Optional[str] = None, + tls_ca_cert: Optional[str] = None, + tls_client_cert: Optional[str] = None, + tls_client_key: Optional[str] = None, + tls_hostname: Optional[Union[str, bool]] = None, + tls_ssl_version: Optional[str] = None, + tmp_dir: str = '/tmp/airflow', + user: Optional[Union[str, int]] = None, + mounts: Optional[List[str]] = None, + working_dir: Optional[str] = None, + xcom_all: bool = False, + docker_conn_id: Optional[str] = None, + dns: Optional[List[str]] = None, + dns_search: Optional[List[str]] = None, + auto_remove: bool = False, + shm_size: Optional[int] = None, + tty: bool = False, + privileged: bool = False, + cap_add: Optional[Iterable[str]] = None, + extra_hosts: Optional[Dict[str, str]] = None, + **kwargs, + ): + """ + :param python_callable: A python function with no references to outside variables, + defined with def, which will be run in a virtualenv + :type python_callable: function + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. List/Tuples will unroll to xcom values + with index as key. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + :type multiple_outputs: bool + :param use_dill: Whether to use dill or pickle for serialization + :type use_dill: bool + :param image: Docker image from which to create the container. + If image tag is omitted, "latest" will be used. + :type image: str + :param api_version: Remote API version. Set to ``auto`` to automatically + detect the server's version. + :type api_version: str + :param container_name: Name of the container. Optional (templated) + :type container_name: str or None + :param cpus: Number of CPUs to assign to the container. + This value gets multiplied with 1024. See + https://docs.docker.com/engine/reference/run/#cpu-share-constraint + :type cpus: float + :param docker_url: URL of the host running the docker daemon. + Default is unix://var/run/docker.sock + :type docker_url: str + :param environment: Environment variables to set in the container. (templated) + :type environment: dict + :param private_environment: Private environment variables to set in the container. + These are not templated, and hidden from the website. + :type private_environment: dict + :param force_pull: Pull the docker image on every run. Default is False. + :type force_pull: bool + :param mem_limit: Maximum amount of memory the container can use. + Either a float value, which represents the limit in bytes, + or a string like ``128m`` or ``1g``. + :type mem_limit: float or str + :param host_tmp_dir: Specify the location of the temporary directory on the host which will + be mapped to tmp_dir. If not provided defaults to using the standard system temp directory. + :type host_tmp_dir: str + :param network_mode: Network mode for the container. + :type network_mode: str + :param tls_ca_cert: Path to a PEM-encoded certificate authority + to secure the docker connection. + :type tls_ca_cert: str + :param tls_client_cert: Path to the PEM-encoded certificate + used to authenticate docker client. + :type tls_client_cert: str + :param tls_client_key: Path to the PEM-encoded key used to authenticate docker client. + :type tls_client_key: str + :param tls_hostname: Hostname to match against + the docker server certificate or False to disable the check. + :type tls_hostname: str or bool + :param tls_ssl_version: Version of SSL to use when communicating with docker daemon. + :type tls_ssl_version: str + :param tmp_dir: Mount point inside the container to + a temporary directory created on the host by the operator. + The path is also made available via the environment variable + ``AIRFLOW_TMP_DIR`` inside the container. + :type tmp_dir: str + :param user: Default user inside the docker container. + :type user: int or str + :param mounts: List of mounts to mount into the container, e.g. + ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``. + :type mounts: list + :param working_dir: Working directory to + set on the container (equivalent to the -w switch the docker client) + :type working_dir: str + :param xcom_all: Push all the stdout or just the last line. + The default is False (last line). + :type xcom_all: bool + :param docker_conn_id: ID of the Airflow connection to use + :type docker_conn_id: str + :param dns: Docker custom DNS servers + :type dns: list[str] + :param dns_search: Docker custom DNS search domain + :type dns_search: list[str] + :param auto_remove: Auto-removal of the container on daemon side when the + container's process exits. + The default is False. + :type auto_remove: bool + :param shm_size: Size of ``/dev/shm`` in bytes. The size must be + greater than 0. If omitted uses system default. + :type shm_size: int + :param tty: Allocate pseudo-TTY to the container + This needs to be set see logs of the Docker container. + :type tty: bool + :param privileged: Give extended privileges to this container. + :type privileged: bool + :param cap_add: Include container capabilities + :type cap_add: list[str] + """ + ... + # [END decoratormixin] diff --git a/airflow/providers/docker/operators/docker.py b/airflow/providers/docker/operators/docker.py index 76062304f9f43..4f2afff6a4b84 100644 --- a/airflow/providers/docker/operators/docker.py +++ b/airflow/providers/docker/operators/docker.py @@ -17,6 +17,9 @@ # under the License. """Implements Docker operator""" import ast +import io +import pickle +import tarfile from tempfile import TemporaryDirectory from typing import Dict, Iterable, List, Optional, Union @@ -141,6 +144,12 @@ class DockerOperator(BaseOperator): :type privileged: bool :param cap_add: Include container capabilities :type cap_add: list[str] + :param retrieve_output: Should this docker image consistently attempt to pull from and output + file before manually shutting down the image. Useful for cases where users want a pickle serialized + output that is not posted to logs + :type retrieve_output: bool + :param retrieve_output_path: path for output file that will be retrieved and passed to xcom + :type retrieve_output_path: Optional[str] """ template_fields = ('command', 'environment', 'container_name') @@ -185,9 +194,10 @@ def __init__( privileged: bool = False, cap_add: Optional[Iterable[str]] = None, extra_hosts: Optional[Dict[str, str]] = None, + retrieve_output: bool = False, + retrieve_output_path: Optional[str] = None, **kwargs, ) -> None: - super().__init__(**kwargs) self.api_version = api_version self.auto_remove = auto_remove @@ -227,6 +237,8 @@ def __init__( self.cli = None self.container = None + self.retrieve_output = retrieve_output + self.retrieve_output_path = retrieve_output_path def get_hook(self) -> DockerHook: """ @@ -298,6 +310,7 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio line = '' res_lines = [] + return_value = None for line in lines: if hasattr(line, 'decode'): # Note that lines returned can also be byte sequences so we have to handle decode here @@ -305,17 +318,48 @@ def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optio line = line.strip() res_lines.append(line) self.log.info(line) - result = self.cli.wait(self.container['Id']) if result['StatusCode'] != 0: res_lines = "\n".join(res_lines) raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}") - if self.do_xcom_push: - return res_lines if self.xcom_all else line + if self.retrieve_output and not return_value: + return_value = self._attempt_to_retrieve_result() + ret = None + if self.retrieve_output: + ret = return_value + elif self.do_xcom_push: + ret = self._get_return_value_from_logs(res_lines, line) + return ret finally: if self.auto_remove: self.cli.remove_container(self.container['Id']) + def _attempt_to_retrieve_result(self): + """ + Attempts to pull the result of the function from the expected file using docker's + get_archive function. + If the file is not yet ready, returns None + :return: + """ + + def copy_from_docker(container_id, src): + archived_result, stat = self.cli.get_archive(container_id, src) + # no need to port to a file since we intend to deserialize + file_standin = io.BytesIO(b"".join(archived_result)) + tar = tarfile.open(fileobj=file_standin) + file = tar.extractfile(stat['name']) + lib = getattr(self, 'pickling_library', pickle) + return lib.loads(file.read()) + + try: + return_value = copy_from_docker(self.container['Id'], self.retrieve_output_path) + return return_value + except APIError: + return None + + def _get_return_value_from_logs(self, res_lines, line): + return res_lines if self.xcom_all else line + def execute(self, context) -> Optional[str]: self.cli = self._get_cli() if not self.cli: diff --git a/airflow/providers/docker/provider.yaml b/airflow/providers/docker/provider.yaml index 2a86541a8feeb..9b50b581538c6 100644 --- a/airflow/providers/docker/provider.yaml +++ b/airflow/providers/docker/provider.yaml @@ -63,3 +63,7 @@ hook-class-names: # deprecated - to be removed after providers add dependency o connection-types: - hook-class-name: airflow.providers.docker.hooks.docker.DockerHook connection-type: docker + +task-decorators: + - class-name: airflow.providers.docker.decorators.docker.docker_task + name: docker diff --git a/airflow/providers_manager.py b/airflow/providers_manager.py index 9fb44f44084c7..02a79e7f8025a 100644 --- a/airflow/providers_manager.py +++ b/airflow/providers_manager.py @@ -17,6 +17,7 @@ # under the License. """Manages all providers.""" import fnmatch +import functools import importlib import json import logging @@ -62,7 +63,10 @@ class LazyDictWithCache(MutableMapping): at first use - and returns and caches the result. """ + __slots__ = ['_resolved', '_raw_dict'] + def __init__(self, *args, **kw): + self._resolved = set() self._raw_dict = dict(*args, **kw) def __setitem__(self, key, value): @@ -70,15 +74,21 @@ def __setitem__(self, key, value): def __getitem__(self, key): value = self._raw_dict.__getitem__(key) - if callable(value): - # exchange callable with result of calling it - value = value(key) + if key not in self._resolved and callable(value): + # exchange callable with result of calling it -- but only once! allow resolver to return a + # callable itself + value = value() + self._resolved.add(key) if value: self._raw_dict.__setitem__(key, value) return value def __delitem__(self, key): self._raw_dict.__delitem__(key) + try: + self._resolved.remove(key) + except KeyError: + pass def __iter__(self): return iter(self._raw_dict) @@ -86,6 +96,9 @@ def __iter__(self): def __len__(self): return len(self._raw_dict) + def __contains__(self, key): + return key in self._raw_dict + def _create_provider_info_schema_validator(): """Creates JSON schema validator from the provider_info.schema.json""" @@ -105,6 +118,20 @@ def _create_customized_form_field_behaviours_schema_validator(): return validator +def _check_builtin_provider_prefix(provider_package: str, class_name: str) -> bool: + if provider_package.startswith("apache-airflow"): + provider_path = provider_package[len("apache-") :].replace("-", ".") + if not class_name.startswith(provider_path): + log.warning( + "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'", + class_name, + provider_package, + provider_path, + ) + return False + return True + + def _sanity_check(provider_package: str, class_name: str) -> bool: """ Performs coherence check on provider classes. @@ -117,16 +144,8 @@ def _sanity_check(provider_package: str, class_name: str) -> bool: :return True if the class is OK, False otherwise. """ - if provider_package.startswith("apache-airflow"): - provider_path = provider_package[len("apache-") :].replace("-", ".") - if not class_name.startswith(provider_path): - log.warning( - "Coherence check failed when importing '%s' from '%s' package. It should start with '%s'", - class_name, - provider_package, - provider_path, - ) - return False + if not _check_builtin_provider_prefix(provider_package, class_name): + return False try: import_string(class_name) except Exception as e: @@ -227,6 +246,10 @@ def __init__(self): self._initialized_cache: Dict[str, bool] = {} # Keeps dict of providers keyed by module name self._provider_dict: Dict[str, ProviderInfo] = {} + # Keeps dict of hooks keyed by connection type + self._hooks_dict: Dict[str, HookInfo] = {} + + self._taskflow_decorators: Dict[str, Callable] = LazyDictWithCache() # keeps mapping between connection_types and hook class, package they come from self._hook_provider_dict: Dict[str, HookClassProvider] = {} # Keeps dict of hooks keyed by connection type. They are lazy evaluated at access time @@ -274,6 +297,12 @@ def initialize_providers_hooks(self): self._discover_hooks() self._hook_provider_dict = OrderedDict(sorted(self._hook_provider_dict.items())) + @provider_info_cache("taskflow_decorators") + def initialize_providers_taskflow_decorator(self): + """Lazy initialization of providers hooks.""" + self.initialize_providers_list() + self._discover_taskflow_decorators() + @provider_info_cache("extra_links") def initialize_providers_extra_links(self): """Lazy initialization of providers extra links.""" @@ -423,7 +452,9 @@ def _discover_hooks_from_connection_types( hook_class_name=hook_class_name, package_name=package_name ) # Defer importing hook to access time by setting import hook method as dict value - self._hooks_lazy_dict[connection_type] = self._import_hook + self._hooks_lazy_dict[connection_type] = functools.partial( + self._import_hook, connection_type + ) provider_uses_connection_types = True return provider_uses_connection_types @@ -523,6 +554,34 @@ def _import_info_from_all_hooks(self): self._connection_form_widgets = OrderedDict(sorted(self._connection_form_widgets.items())) self._field_behaviours = OrderedDict(sorted(self._field_behaviours.items())) + def _discover_taskflow_decorators(self) -> None: + for name, info in self._provider_dict.items(): + for taskflow_decorator in info.provider_info.get("task-decorators", []): + self._add_taskflow_decorator( + taskflow_decorator["name"], taskflow_decorator["class-name"], name + ) + + def _add_taskflow_decorator(self, name, decorator_class_name: str, provider_package: str) -> None: + if not _check_builtin_provider_prefix(provider_package, decorator_class_name): + return + + if name in self._taskflow_decorators: + try: + existing = self._taskflow_decorators[name] + other_name = f'{existing.__module__}.{existing.__name}' + except Exception: + # If problem importing, then get the value from the functools.partial + other_name = self._taskflow_decorators._raw_dict[name].args[0] + + log.warning( + "The taskflow decorator '%s' has been already registered (by %s).", + name, + other_name, + ) + return + + self._taskflow_decorators[name] = functools.partial(import_string, decorator_class_name) + @staticmethod def _get_attr(obj: Any, attr_name: str): """Retrieves attributes of an object, or warns if not found""" @@ -733,6 +792,11 @@ def hooks(self) -> Dict[str, Optional[HookInfo]]: # When we return hooks here it will only be used to retrieve hook information return self._hooks_lazy_dict # type: ignore + @property + def taskflow_decorators(self) -> Dict[str, Callable]: + self.initialize_providers_taskflow_decorator() + return self._taskflow_decorators + @property def extra_links_class_names(self) -> List[str]: """Returns set of extra link class names.""" diff --git a/airflow/utils/python_virtualenv_script.jinja2 b/airflow/utils/python_virtualenv_script.jinja2 index b871f3fc56cfe..94a425b4976d5 100644 --- a/airflow/utils/python_virtualenv_script.jinja2 +++ b/airflow/utils/python_virtualenv_script.jinja2 @@ -15,24 +15,23 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -#} +-#} import {{ pickling_library }} import sys -# Check whether Airflow is available in the environment. -# If it is, we'll want to ensure that we integrate any macros that are being provided -# by plugins prior to unpickling the task context. -if sys.version_info.major >= 3 and sys.version_info.minor >= 6: +{# Check whether Airflow is available in the environment. + # If it is, we'll want to ensure that we integrate any macros that are being provided + # by plugins prior to unpickling the task context. #} +if sys.version_info >= (3,6): try: from airflow.plugins_manager import integrate_macros_plugins integrate_macros_plugins() except ImportError: - # Airflow is not available in this environment, therefore we won't - # be able to integrate any plugin macros. + {# Airflow is not available in this environment, therefore we won't + # be able to integrate any plugin macros. #} pass -# Read args {% if op_args or op_kwargs %} with open(sys.argv[1], "rb") as file: arg_dict = {{ pickling_library }}.load(file) @@ -40,9 +39,11 @@ with open(sys.argv[1], "rb") as file: arg_dict = {"args": [], "kwargs": {}} {% endif %} +{% if string_args_global | default(true) -%} # Read string args with open(sys.argv[3], "r") as file: virtualenv_string_args = list(map(lambda x: x.strip(), list(file))) +{% endif %} # Script {{ python_callable_source }} diff --git a/docs/apache-airflow/howto/create-custom-decorator.rst b/docs/apache-airflow/howto/create-custom-decorator.rst new file mode 100644 index 0000000000000..b6a9a7b8bbef0 --- /dev/null +++ b/docs/apache-airflow/howto/create-custom-decorator.rst @@ -0,0 +1,120 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Creating Custom ``@task`` Decorators +==================================== + +As of Airflow 2.2 it is possible add custom decorators to the TaskFlow interface from within a provider +package and have those decorators appear natively as part of the ``@task.____`` design. + +For an example. Let's say you were trying to create an easier mechanism to run python functions as "foo" +tasks. The steps to create and register ``@task.foo`` are: + +1. Create a ``FooDecoratedOperator`` + + In this case, we are assuming that you have an existing ``FooOperator`` that takes a python function as an + argument. By creating a ``FooDecoratedOperator`` that inherits from ``FooOperator`` and + ``airflow.decorators.base.DecoratedOperator``, Airflow will supply much of the needed functionality required + to treat your new class as a taskflow native class. + +2. Create a ``foo_task`` function + + Once you have your decorated class, create a function that takes arguments ``python_callable``, ``multiple_outputs``, + and ``kwargs``. This function will use the ``airflow.decorators.base.task_decorator_factory`` function to convert + the new ``FooDecoratedOperator`` into a TaskFlow function decorator! + + .. code-block:: python + + def foo_task( + python_callable: Optional[Callable] = None, + multiple_outputs: Optional[bool] = None, + **kwargs + ): + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=FooDecoratedOperator, + **kwargs, + ) + +3. Register your new decorator in get_provider_info of your provider + + Finally, add a key-value ``task-decortor`` to the dict returned from the provider entrypoint. This should be + a list with each item containing ``name`` and ``class-name`` keys. When Airflow starts, the + ``ProviderManager`` class will automatically import this value and ``task.foo`` will work as a new decorator! + + .. code-block:: python + + def get_provider_info(): + return { + "package-name": "foo-provider-airflow", + "name": "Foo", + "task-decorators": [ + { + "name": "foo", + # "Import path" and function name of the `foo_task` + "class-name": ["name.of.python.package.foo_task"], + } + ], + # ... + } + + Please note that the ``name`` must be a valid python identifier. + +(Optional) Adding IDE auto-completion support +============================================= + +.. note:: + + This section mostly applies to the apache-airflow managed providers. We have not decided if we will allow third-party providers to register auto-completion in this way. + +For better or worse, Python IDEs can not auto-complete dynamically +generated methods (see `JetBrain's write up on the subject `_). + +To get around this, we had to find a solution that was "best possible." IDEs will only allow typing +through stub files, but we wanted to avoid any situation where a user would update their provider and the auto-complete +would be out of sync with the provider's actual parameters. + +To hack around this problem, we found that you could extend the ``_TaskDecorator`` class in the ``airflow/decorators/__init__.py`` inside an ``if TYPE_CHECKING`` block +and the correct auto-complete will show up in the IDE. + +The first step is to create a ``Mixin`` class for your decorator. + +Mixin classes are classes in python that tell the python interpreter that python can import them at any time. +Because they are not dependent on other classes, Mixin classes are great for multiple inheritance. + +In the DockerDecorator we created a Mixin class that looks like this: + +.. exampleinclude:: ../../../airflow/providers/docker/decorators/docker.py + :language: python + :start-after: [START decoratormixin] + :end-before: [END decoratormixin] + +Notice that the function does not actually need to return anything as we only use this class for type checking. Sadly you will have to duplicate the args, defaults and types from your real FooOperator in order for them to show up in auto-completion prompts. + +Once you have your Mixin class ready, go to ``airflow/decorators/__init__.py`` and add section similar to this + +.. exampleinclude:: ../../../airflow/decorators/__init__.py + :language: python + :start-after: [START mixin_for_autocomplete] + :end-before: [END mixin_for_autocomplete] + +The ``if TYPE_CHECKING`` guard means that this code will only be used for type checking (such as mypy) or generating IDE auto-completion. Catching the ``ImportError`` is important as + +Once the change is merged and the next Airflow (minor or patch) release comes out, users will be able to see your decorator in IDE auto-complete. This auto-complete will change based on the version of the provider that the user has installed. + +Please note that this step is not required to create a working decorator but does create a better experience for users of the provider. diff --git a/docs/apache-airflow/howto/index.rst b/docs/apache-airflow/howto/index.rst index efd5c481b5be0..7227421f25475 100644 --- a/docs/apache-airflow/howto/index.rst +++ b/docs/apache-airflow/howto/index.rst @@ -36,6 +36,7 @@ configuring an Airflow environment. customize-state-colors-ui customize-dag-ui-page-instance-name custom-operator + create-custom-decorator connection variable run-behind-proxy diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst index ed5742d7fb6ed..3c8992e24fe94 100644 --- a/docs/apache-airflow/tutorial_taskflow_api.rst +++ b/docs/apache-airflow/tutorial_taskflow_api.rst @@ -160,25 +160,41 @@ the dependencies as shown below. :start-after: [START main_flow] :end-before: [END main_flow] -Using the TaskFlow API with Virtual Environments +Using the Taskflow API with Docker or Virtual Environments ---------------------------------------------------------- -As of Airflow 2.0.3, you will have the ability to use the TaskFlow API with a -virtual environment. This added functionality will allow a much more -comprehensive range of use-cases for the TaskFlow API, as you will not be limited to the +If you have tasks that require complex or conflicting requirements then you will have the ability to use the +Taskflow API with either a Docker container (since version 2.2.0) or Python virtual environment (since 2.0.2). +This added functionality will allow a much more +comprehensive range of use-cases for the Taskflow API, as you will not be limited to the packages and system libraries of the Airflow worker. -To run your Airflow task in a virtual environment, switch your ``@task`` decorator to a ``@task.virtualenv`` -decorator. The ``@task.virtualenv`` decorator will allow you to create a new virtualenv with custom libraries -and even a different python version to run your function. +To use a docker image with the Taskflow API, change the decorator to ``@task.docker`` +and add any needed arguments to correctly run the task. Please note that the docker +image must have a working Python installed and take in a bash command as the ``command`` argument. -.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_virtualenv.py +Below is an example of using the ``@task.docker`` decorator to run a python task. + +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py + :language: python + :dedent: 4 + :start-after: [START transform_docker] + :end-before: [END transform_docker] + +It is worth noting that the python source code (extracted from the decorated function) and any callable args are sent to the container via (encoded and pickled) environment variables so the length of these is not boundless (the exact limit depends on system settings). + +If you don't want to run your image on a Docker environment, and instead want to create a separate virtual +environment on the same machine, you can use the ``@task.virtualenv`` decorator instead. The ``@task.virtualenv`` +decorator will allow you to create a new virtualenv with custom libraries and even a different +Python version to run your function. + +.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl_docker_virtualenv.py :language: python :dedent: 4 :start-after: [START extract_virtualenv] :end-before: [END extract_virtualenv] -This option should allow for far greater flexibility for users who wish to keep their workflows more simple +These two options should allow for far greater flexibility for users who wish to keep their workflows more simple and pythonic. Multiple outputs inference @@ -187,9 +203,9 @@ Tasks can also infer multiple outputs by using dict python typing. .. code-block:: python - @task - def identity_dict(x: int, y: int) -> Dict[str, int]: - return {"x": x, "y": y} + @task + def identity_dict(x: int, y: int) -> Dict[str, int]: + return {"x": x, "y": y} By using the typing ``Dict`` for the function return type, the ``multiple_outputs`` parameter is automatically set to true. diff --git a/tests/providers/docker/decorators/__init__.py b/tests/providers/docker/decorators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/docker/decorators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/docker/decorators/test_docker.py b/tests/providers/docker/decorators/test_docker.py new file mode 100644 index 0000000000000..5f10fdf4c2d65 --- /dev/null +++ b/tests/providers/docker/decorators/test_docker.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.decorators import task +from airflow.models.dag import DAG +from airflow.utils import timezone +from airflow.utils.state import State +from airflow.utils.types import DagRunType + +DEFAULT_DATE = timezone.datetime(2021, 9, 1) + + +class TestDockerDecorator: + def test_basic_docker_operator(self, dag_maker): + @task.docker( + image="quay.io/bitnami/python:3.9", + network_mode="bridge", + api_version="auto", + ) + def f(): + import random + + return [random.random() for _ in range(100)] + + with dag_maker(): + ret = f() + + dr = dag_maker.create_dagrun( + run_id=DagRunType.MANUAL.value, + start_date=timezone.utcnow(), + execution_date=DEFAULT_DATE, + state=State.RUNNING, + ) + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) # pylint: disable=no-member + ti = dr.get_task_instances()[0] + assert len(ti.xcom_pull()) == 100 + + def test_basic_docker_operator_with_param(self, dag_maker): + @task.docker( + image="quay.io/bitnami/python:3.9", + network_mode="bridge", + api_version="auto", + ) + def f(num_results): + import random + + return [random.random() for _ in range(num_results)] + + with dag_maker(): + ret = f(50) + + dr = dag_maker.create_dagrun( + run_id=DagRunType.MANUAL.value, + start_date=timezone.utcnow(), + state=State.RUNNING, + ) + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) # pylint: disable=no-member + ti = dr.get_task_instances()[0] + result = ti.xcom_pull() + assert isinstance(result, list) + assert len(result) == 50 + + def test_basic_docker_operator_multiple_output(self, dag_maker): + @task.docker( + image="quay.io/bitnami/python:3.9", + network_mode="bridge", + api_version="auto", + multiple_outputs=True, + ) + def return_dict(number: int): + return {"number": number + 1, "43": 43} + + test_number = 10 + with dag_maker(): + ret = return_dict(test_number) + + dr = dag_maker.create_dagrun( + run_id=DagRunType.MANUAL, + start_date=timezone.utcnow(), + state=State.RUNNING, + ) + + ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) # pylint: disable=maybe-no-member + + ti = dr.get_task_instances()[0] + assert ti.xcom_pull(key="number") == test_number + 1 + assert ti.xcom_pull(key="43") == 43 + assert ti.xcom_pull() == {"number": test_number + 1, "43": 43} + + def test_call_decorated_multiple_times(self): + """Test calling decorated function 21 times in a DAG""" + + @task.docker( + image="quay.io/bitnami/python:3.9", + network_mode="bridge", + api_version="auto", + ) + def do_run(): + return 4 + + with DAG("test", start_date=DEFAULT_DATE) as dag: + do_run() + for _ in range(20): + do_run() + + assert len(dag.task_ids) == 21 + assert dag.task_ids[-1] == 'do_run__20'