Skip to content

Commit

Permalink
Add a Docker Taskflow decorator (#15330)
Browse files Browse the repository at this point in the history
Add the ability to run @task.docker on a python function and turn it into a DockerOperator that can run that python function remotely.
```
@task.docker(
    image="quay.io/bitnami/python:3.8.8",
    force_pull=True,
    docker_url="unix://var/run/docker.sock",
    network_mode="bridge",
    api_version='auto',
)
def f():
    import random
    return [random.random() for i in range(10000000)]
```

One notable aspect of this architecture is that we had to build it to make as few assumptions about user setups as possible. We could not share a volume between the worker and the container as this would break if the user runs the airflow worker on a docker container. We could not assume that users would have any specialized system libraries on their images (this implementation only requires python 3 and bash).

To work with these requirements, we use base64 encoding to store a jinja generated python file and inputs (which are generated using the same functions used by the PythonVirtualEnvOperator). Once the container starts, it uses these environment variables to deserialize the strings, run the function, and store the result in a file located at /tmp/script.out.

Once the function completes, we create a sleep loop until the DockerOperator retrieves the result via docker's get_archive API. This result can then be deserialized using pickle and sent to Airflow's XCom library in the same fashion as a python or python_virtualenv result.


Co-authored-by: Ash Berlin-Taylor <[email protected]>
Co-authored-by: Ash Berlin-Taylor <[email protected]>
  • Loading branch information
3 people authored Sep 20, 2021
1 parent bada372 commit a9772cf
Show file tree
Hide file tree
Showing 18 changed files with 845 additions and 175 deletions.
137 changes: 22 additions & 115 deletions airflow/decorators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,129 +15,36 @@
# specific language governing permissions and limitations
# under the License.

from typing import Callable, Dict, Iterable, List, Optional, Union
from typing import TYPE_CHECKING

from airflow.decorators.python import python_task
from airflow.decorators.python_virtualenv import _virtualenv_task
from airflow.decorators.python import PythonDecoratorMixin, python_task # noqa
from airflow.decorators.python_virtualenv import PythonVirtualenvDecoratorMixin
from airflow.decorators.task_group import task_group # noqa
from airflow.models.dag import dag # noqa
from airflow.providers_manager import ProvidersManager


class _TaskDecorator:
def __call__(
self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG.
class _TaskDecorator(PythonDecoratorMixin, PythonVirtualenvDecoratorMixin):
def __getattr__(self, name):
if name.startswith("__"):
raise AttributeError(f'{type(self).__name__} has no attribute {name!r}')
decorators = ProvidersManager().taskflow_decorators
if name not in decorators:
raise AttributeError(f"task decorator {name!r} not found")
return decorators[name]

:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
return self.python(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)

@staticmethod
def python(python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs):
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. This decorator can be reused in a single DAG.
# [START mixin_for_autocomplete]
if TYPE_CHECKING:
try:
from airflow.providers.docker.decorators.docker import DockerDecoratorMixin

:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
return python_task(python_callable=python_callable, multiple_outputs=multiple_outputs, **kwargs)

@staticmethod
def virtualenv(
python_callable: Optional[Callable] = None,
multiple_outputs: Optional[bool] = None,
requirements: Optional[Iterable[str]] = None,
python_version: Optional[Union[str, int, float]] = None,
use_dill: bool = False,
system_site_packages: bool = True,
string_args: Optional[Iterable[str]] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[List[str]] = None,
**kwargs,
):
"""
Allows one to run a function in a virtualenv that is
created and destroyed automatically (with certain caveats).
The function must be defined using def, and not be
part of a class. All imports must happen inside the function
and no variables outside of the scope may be referenced. A global scope
variable named virtualenv_string_args will be available (populated by
string_args). In addition, one can pass stuff through op_args and op_kwargs, and one
can use a return value.
Note that if your virtualenv runs in a different Python major version than Airflow,
you cannot use return values, op_args, op_kwargs, or use any macros that are being provided to
Airflow through plugins. You can use string_args though.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:PythonVirtualenvOperator`
:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtualenv
:type python_callable: function
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
:param requirements: A list of requirements as specified in a pip install command
:type requirements: list[str]
:param python_version: The Python version to run the virtualenv with. Note that
both 2 and 2.7 are acceptable forms.
:type python_version: Optional[Union[str, int, float]]
:param use_dill: Whether to use dill to serialize
the args and result (pickle is default). This allow more complex types
but requires you to include dill in your requirements.
:type use_dill: bool
:param system_site_packages: Whether to include
system_site_packages in your virtualenv.
See virtualenv documentation for more information.
:type system_site_packages: bool
:param op_args: A list of positional arguments to pass to python_callable.
:type op_args: list
:param op_kwargs: A dict of keyword arguments to pass to python_callable.
:type op_kwargs: dict
:param string_args: Strings that are present in the global var virtualenv_string_args,
available to python_callable at runtime as a list[str]. Note that args are split
by newline.
:type string_args: list[str]
:param templates_dict: a dictionary where the values are templates that
will get templated by the Airflow engine sometime between
``__init__`` and ``execute`` takes place and are made available
in your callable's context after the template has been applied
:type templates_dict: dict of str
:param templates_exts: a list of file extensions to resolve while
processing templated fields, for examples ``['.sql', '.hql']``
:type templates_exts: list[str]
"""
return _virtualenv_task(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
requirements=requirements,
python_version=python_version,
use_dill=use_dill,
system_site_packages=system_site_packages,
string_args=string_args,
templates_dict=templates_dict,
templates_exts=templates_exts,
**kwargs,
)
class _DockerTask(_TaskDecorator, DockerDecoratorMixin):
pass

_TaskDecorator = _DockerTask
except ImportError:
pass
# [END mixin_for_autocomplete]

task = _TaskDecorator()
3 changes: 1 addition & 2 deletions airflow/decorators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ def __init__(

def execute(self, context: Dict):
return_value = super().execute(context)
self._handle_output(return_value=return_value, context=context, xcom_push=self.xcom_push)
return return_value
return self._handle_output(return_value=return_value, context=context, xcom_push=self.xcom_push)

def _handle_output(self, return_value: Any, context: Dict, xcom_push: Callable):
"""
Expand Down
40 changes: 39 additions & 1 deletion airflow/decorators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,49 @@ def __init__(
T = TypeVar("T", bound=Callable)


class PythonDecoratorMixin:
"""
Helper class for inheritance. This class is only used for the __init__.pyi so that IDEs
will autocomplete docker decorator functions
:meta private:
"""

def __call__(
self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
return self.python(python_callable, multiple_outputs, **kwargs)

def python(
self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_PythonDecoratedOperator,
**kwargs,
)


def python_task(
python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
"""
Python operator decorator. Wraps a function into an Airflow operator.
Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
:param python_callable: Function to decorate
Expand Down
45 changes: 27 additions & 18 deletions airflow/decorators/python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,33 @@ def get_python_source(self):
T = TypeVar("T", bound=Callable)


def _virtualenv_task(
python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
class PythonVirtualenvDecoratorMixin:
"""
Python operator decorator. Wraps a function into an Airflow operator.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
Helper class for inheritance. This class is only used for the __init__.pyi so that IDEs
will autocomplete docker decorator functions
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
:meta private:
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_PythonVirtualenvDecoratedOperator,
**kwargs,
)

def virtualenv(
self, python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs
):
"""
Wraps a python function into an Airflow operator to run via a Python virtual environment.
Accepts kwargs for operator kwarg. Can be reused in a single DAG.
:param python_callable: Function to decorate
:type python_callable: Optional[Callable]
:param multiple_outputs: if set, function return value will be
unrolled to multiple XCom values. List/Tuples will unroll to xcom values
with index as key. Dict will unroll to xcom values with keys as XCom keys.
Defaults to False.
:type multiple_outputs: bool
"""
return task_decorator_factory(
python_callable=python_callable,
multiple_outputs=multiple_outputs,
decorated_operator_class=_PythonVirtualenvDecoratedOperator,
**kwargs,
)
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def extract():
# [END extract_virtualenv]

# [START transform_docker]
@task.virtualenv(multiple_outputs=True)
@task.docker(image='python:3.9-slim-buster', multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
Expand Down
12 changes: 12 additions & 0 deletions airflow/provider.yaml.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,18 @@
"type": "object",
"description": "Additional extras that the provider should have"
},
"task-decorators": {
"type": "array",
"description": "Decorators to use with the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
},
"secrets-backends": {
"type": "array",
"description": "Secrets Backend class names",
Expand Down
12 changes: 12 additions & 0 deletions airflow/provider_info.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@
"items": {
"type": "string"
}
},
"task-decorators": {
"type": "array",
"description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
}
},
"required": [
Expand Down
17 changes: 17 additions & 0 deletions airflow/providers/docker/decorators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit a9772cf

Please sign in to comment.