From 5b014f998cff254542ae3ec268c5da77f18f2318 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Wed, 18 Oct 2023 23:44:51 +0200 Subject: [PATCH 1/4] Add decorators for external and venv python branching --- airflow/decorators/__init__.py | 6 ++ airflow/decorators/__init__.pyi | 97 +++++++++++++++++++ airflow/decorators/branch_external_python.py | 56 +++++++++++ airflow/decorators/branch_virtualenv.py | 56 +++++++++++ docs/apache-airflow/core-concepts/dags.rst | 2 + .../decorators/test_branch_external_python.py | 90 +++++++++++++++++ tests/decorators/test_branch_virtualenv.py | 94 ++++++++++++++++++ 7 files changed, 401 insertions(+) create mode 100644 airflow/decorators/branch_external_python.py create mode 100644 airflow/decorators/branch_virtualenv.py create mode 100644 tests/decorators/test_branch_external_python.py create mode 100644 tests/decorators/test_branch_virtualenv.py diff --git a/airflow/decorators/__init__.py b/airflow/decorators/__init__.py index 2b2fccf8fa5aa7..31bcfb263c5c17 100644 --- a/airflow/decorators/__init__.py +++ b/airflow/decorators/__init__.py @@ -19,7 +19,9 @@ from typing import Any, Callable from airflow.decorators.base import TaskDecorator +from airflow.decorators.branch_external_python import branch_external_python_task from airflow.decorators.branch_python import branch_task +from airflow.decorators.branch_virtualenv import branch_virtualenv_task from airflow.decorators.external_python import external_python_task from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task @@ -41,6 +43,8 @@ "virtualenv_task", "external_python_task", "branch_task", + "branch_virtualenv_task", + "branch_external_python_task", "short_circuit_task", "sensor_task", "setup", @@ -55,6 +59,8 @@ class TaskDecoratorCollection: virtualenv = staticmethod(virtualenv_task) external_python = staticmethod(external_python_task) branch = staticmethod(branch_task) + branch_virtualenv = staticmethod(branch_virtualenv_task) + branch_external_python = staticmethod(branch_external_python_task) short_circuit = staticmethod(short_circuit_task) sensor = staticmethod(sensor_task) diff --git a/airflow/decorators/__init__.pyi b/airflow/decorators/__init__.pyi index e41ae930e14dbb..0c3e94bf5c38ca 100644 --- a/airflow/decorators/__init__.pyi +++ b/airflow/decorators/__init__.pyi @@ -26,7 +26,9 @@ from typing import Any, Callable, Collection, Container, Iterable, Mapping, over from kubernetes.client import models as k8s from airflow.decorators.base import FParams, FReturn, Task, TaskDecorator +from airflow.decorators.branch_external_python import branch_external_python_task from airflow.decorators.branch_python import branch_task +from airflow.decorators.branch_virtualenv import branch_virtualenv_task from airflow.decorators.external_python import external_python_task from airflow.decorators.python import python_task from airflow.decorators.python_virtualenv import virtualenv_task @@ -47,6 +49,8 @@ __all__ = [ "virtualenv_task", "external_python_task", "branch_task", + "branch_virtualenv_task", + "branch_external_python_task", "short_circuit_task", "sensor_task", "setup", @@ -194,6 +198,99 @@ class TaskDecoratorCollection: @overload def branch(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... @overload + def branch_virtualenv( + self, + *, + multiple_outputs: bool | None = None, + # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by + # _PythonVirtualenvDecoratedOperator. + requirements: None | Iterable[str] | str = None, + python_version: None | str | int | float = None, + use_dill: bool = False, + system_site_packages: bool = True, + templates_dict: Mapping[str, Any] | None = None, + pip_install_options: list[str] | None = None, + skip_on_exit_code: int | Container[int] | None = None, + index_urls: None | Collection[str] | str = None, + venv_cache_path: None | str = None, + show_return_value_in_logs: bool = True, + **kwargs, + ) -> TaskDecorator: + """Create a decorator to wrap the decorated callable into a BranchPythonVirtualenvOperator. + + For more information on how to use this decorator, see :ref:`concepts:branching`. + Accepts arbitrary for operator kwarg. Can be reused in a single DAG. + + :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values. + Dict will unroll to XCom values with keys as XCom keys. Defaults to False. + :param requirements: Either a list of requirement strings, or a (templated) + "requirements file" as specified by pip. + :param python_version: The Python version to run the virtual environment with. Note that + both 2 and 2.7 are acceptable forms. + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but requires you to include dill in your requirements. + :param system_site_packages: Whether to include + system_site_packages in your virtual environment. + See virtualenv documentation for more information. + :param pip_install_options: a list of pip install options when installing requirements + See 'pip install -h' for available options + :param skip_on_exit_code: If python_callable exits with this exit code, leave the task + in ``skipped`` state (default: None). If set to ``None``, any non-zero + exit code will be treated as a failure. + :param index_urls: an optional list of index urls to load Python packages from. + If not provided the system pip conf will be used to source packages from. + :param venv_cache_path: Optional path to the virtual environment parent folder in which the + virtual environment will be cached, creates a sub-folder venv-{hash} whereas hash will be replaced + with a checksum of requirements. If not provided the virtual environment will be created and + deleted in a temp folder for every execution. + :param show_return_value_in_logs: a bool value whether to show return_value + logs. Defaults to True, which allows return value log output. + It can be set to False to prevent log output of return value when you return huge data + such as transmission a large amount of XCom to TaskAPI. + """ + @overload + def branch_virtualenv(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ... + @overload + def branch_external_python( + self, + *, + python: str, + multiple_outputs: bool | None = None, + # 'python_callable', 'op_args' and 'op_kwargs' since they are filled by + # _PythonVirtualenvDecoratedOperator. + use_dill: bool = False, + templates_dict: Mapping[str, Any] | None = None, + show_return_value_in_logs: bool = True, + **kwargs, + ) -> TaskDecorator: + """Create a decorator to wrap the decorated callable into a BranchExternalPythonOperator. + + For more information on how to use this decorator, see :ref:`concepts:branching`. + Accepts arbitrary for operator kwarg. Can be reused in a single DAG. + + :param python: Full path string (file-system specific) that points to a Python binary inside + a virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path + (so usually start with "/" or "X:/" depending on the filesystem/os used). + :param multiple_outputs: If set, function return value will be unrolled to multiple XCom values. + Dict will unroll to XCom values with keys as XCom keys. Defaults to False. + :param use_dill: Whether to use dill to serialize + the args and result (pickle is default). This allow more complex types + but requires you to include dill in your requirements. + :param templates_dict: a dictionary where the values are templates that + will get templated by the Airflow engine sometime between + ``__init__`` and ``execute`` takes place and are made available + in your callable's context after the template has been applied. + :param show_return_value_in_logs: a bool value whether to show return_value + logs. Defaults to True, which allows return value log output. + It can be set to False to prevent log output of return value when you return huge data + such as transmission a large amount of XCom to TaskAPI. + """ + @overload + def branch_external_python( + self, python_callable: Callable[FParams, FReturn] + ) -> Task[FParams, FReturn]: ... + @overload def short_circuit( self, *, diff --git a/airflow/decorators/branch_external_python.py b/airflow/decorators/branch_external_python.py new file mode 100644 index 00000000000000..8e945541c594e5 --- /dev/null +++ b/airflow/decorators/branch_external_python.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable + +from airflow.decorators.base import task_decorator_factory +from airflow.decorators.python import _PythonDecoratedOperator +from airflow.operators.python import BranchExternalPythonOperator + +if TYPE_CHECKING: + from airflow.decorators.base import TaskDecorator + + +class _BranchExternalPythonDecoratedOperator(_PythonDecoratedOperator, BranchExternalPythonOperator): + """Wraps a Python callable and captures args/kwargs when called for execution.""" + + custom_operator_name: str = "@task.branch_external_python" + + +def branch_external_python_task( + python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs +) -> TaskDecorator: + """ + Wrap a python function into a BranchExternalPythonOperator. + + For more information on how to use this operator, take a look at the guide: + :ref:`concepts:branching` + + Accepts kwargs for operator kwarg. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_BranchExternalPythonDecoratedOperator, + **kwargs, + ) diff --git a/airflow/decorators/branch_virtualenv.py b/airflow/decorators/branch_virtualenv.py new file mode 100644 index 00000000000000..3e4c3fcaf1b8ed --- /dev/null +++ b/airflow/decorators/branch_virtualenv.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable + +from airflow.decorators.base import task_decorator_factory +from airflow.decorators.python import _PythonDecoratedOperator +from airflow.operators.python import BranchPythonVirtualenvOperator + +if TYPE_CHECKING: + from airflow.decorators.base import TaskDecorator + + +class _BranchPythonVirtualenvDecoratedOperator(_PythonDecoratedOperator, BranchPythonVirtualenvOperator): + """Wraps a Python callable and captures args/kwargs when called for execution.""" + + custom_operator_name: str = "@task.branch_virtualenv" + + +def branch_virtualenv_task( + python_callable: Callable | None = None, multiple_outputs: bool | None = None, **kwargs +) -> TaskDecorator: + """ + Wrap a python function into a BranchPythonVirtualenvOperator. + + For more information on how to use this operator, take a look at the guide: + :ref:`concepts:branching` + + Accepts kwargs for operator kwarg. Can be reused in a single DAG. + + :param python_callable: Function to decorate + :param multiple_outputs: if set, function return value will be + unrolled to multiple XCom values. Dict will unroll to xcom values with keys as XCom keys. + Defaults to False. + """ + return task_decorator_factory( + python_callable=python_callable, + multiple_outputs=multiple_outputs, + decorated_operator_class=_BranchPythonVirtualenvDecoratedOperator, + **kwargs, + ) diff --git a/docs/apache-airflow/core-concepts/dags.rst b/docs/apache-airflow/core-concepts/dags.rst index 2cd3886be25e84..d446988e76b20c 100644 --- a/docs/apache-airflow/core-concepts/dags.rst +++ b/docs/apache-airflow/core-concepts/dags.rst @@ -371,6 +371,8 @@ As with the callable for ``@task.branch``, this method can return the ID of a do else: return None +Similar like ``@task.branch`` decorator for regular Python code there are also branch decorators which use a virtual environment called ``@task.branch_virtualenv`` or external python called ``@task.branch_external_python``. + .. _concepts:latest-only: diff --git a/tests/decorators/test_branch_external_python.py b/tests/decorators/test_branch_external_python.py new file mode 100644 index 00000000000000..01e1f9b9683ece --- /dev/null +++ b/tests/decorators/test_branch_external_python.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import sys + +import pytest + +from airflow.decorators import task +from airflow.utils.state import State + + +class Test_BranchPythonDecoratedOperator: + @pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"]) + def test_branch_one(self, dag_maker, branch_task_name): + @task + def dummy_f(): + pass + + @task + def task_1(): + pass + + @task + def task_2(): + pass + + if ( + branch_task_name == "task_1" + ): # Note we manually need to carry the literal value into the venc code :-( + + @task.branch_external_python(task_id="branching", python=sys.executable) + def branch_operator(): + return "task_1" + + else: + + @task.branch_external_python(task_id="branching", python=sys.executable) + def branch_operator(): + return "task_2" + + with dag_maker(): + branchoperator = branch_operator() + df = dummy_f() + task_1 = task_1() + task_2 = task_2() + + df.set_downstream(branchoperator) + branchoperator.set_downstream(task_1) + branchoperator.set_downstream(task_2) + + dr = dag_maker.create_dagrun() + df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + branchoperator.operator.run( + start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True + ) + task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + tis = dr.get_task_instances() + + for ti in tis: + if ti.task_id == "dummy_f": + assert ti.state == State.SUCCESS + if ti.task_id == "branching": + assert ti.state == State.SUCCESS + + if ti.task_id == "task_1" and branch_task_name == "task_1": + assert ti.state == State.SUCCESS + elif ti.task_id == "task_1": + assert ti.state == State.SKIPPED + + if ti.task_id == "task_2" and branch_task_name == "task_2": + assert ti.state == State.SUCCESS + elif ti.task_id == "task_2": + assert ti.state == State.SKIPPED diff --git a/tests/decorators/test_branch_virtualenv.py b/tests/decorators/test_branch_virtualenv.py new file mode 100644 index 00000000000000..861ba154af177e --- /dev/null +++ b/tests/decorators/test_branch_virtualenv.py @@ -0,0 +1,94 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import pytest + +from airflow.decorators import task +from airflow.utils.state import State + + +class Test_BranchPythonDecoratedOperator: + @pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"]) + def test_branch_one(self, dag_maker, branch_task_name): + @task + def dummy_f(): + pass + + @task + def task_1(): + pass + + @task + def task_2(): + pass + + if ( + branch_task_name == "task_1" + ): # Note we manually need to carry the literal value into the venc code :-( + + @task.branch_virtualenv(task_id="branching", requirements=["funcsigs"]) + def branch_operator(): + import funcsigs + + print(f"We successfully imported funcsigs version {funcsigs.__version__}") + return "task_1" + + else: + + @task.branch_virtualenv(task_id="branching", requirements=["funcsigs"]) + def branch_operator(): + import funcsigs + + print(f"We successfully imported funcsigs version {funcsigs.__version__}") + return "task_2" + + with dag_maker(): + branchoperator = branch_operator() + df = dummy_f() + task_1 = task_1() + task_2 = task_2() + + df.set_downstream(branchoperator) + branchoperator.set_downstream(task_1) + branchoperator.set_downstream(task_2) + + dr = dag_maker.create_dagrun() + df.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + branchoperator.operator.run( + start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True + ) + task_1.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + task_2.operator.run(start_date=dr.execution_date, end_date=dr.execution_date, ignore_ti_state=True) + tis = dr.get_task_instances() + + for ti in tis: + if ti.task_id == "dummy_f": + assert ti.state == State.SUCCESS + if ti.task_id == "branching": + assert ti.state == State.SUCCESS + + if ti.task_id == "task_1" and branch_task_name == "task_1": + assert ti.state == State.SUCCESS + elif ti.task_id == "task_1": + assert ti.state == State.SKIPPED + + if ti.task_id == "task_2" and branch_task_name == "task_2": + assert ti.state == State.SUCCESS + elif ti.task_id == "task_2": + assert ti.state == State.SKIPPED From 78f5e852ef1f64a096993156397acb664aeeac4d Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 19 Oct 2023 23:19:22 +0200 Subject: [PATCH 2/4] Add examples and documentation for decorators --- .../example_dags/example_branch_operator.py | 114 +++++++++++++++++- .../example_branch_operator_decorator.py | 104 ++++++++++++++-- docs/apache-airflow/howto/operator/python.rst | 104 +++++++++++++++- 3 files changed, 306 insertions(+), 16 deletions(-) diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 92fdf3b250c894..594c6a4cb14953 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -15,36 +15,56 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Example DAG demonstrating the usage of the BranchPythonOperator.""" +"""Example DAG demonstrating the usage of the Classic branching Python operators. + +It is showcasing the basic BranchPythonOperator and its sisters BranchExternalPythonOperator +and BranchPythonVirtualenvOperator.""" from __future__ import annotations import random +import sys +import tempfile +from pathlib import Path import pendulum from airflow.models.dag import DAG from airflow.operators.empty import EmptyOperator -from airflow.operators.python import BranchPythonOperator +from airflow.operators.python import ( + BranchExternalPythonOperator, + BranchPythonOperator, + BranchPythonVirtualenvOperator, + ExternalPythonOperator, + PythonOperator, + PythonVirtualenvOperator, +) from airflow.utils.edgemodifier import Label from airflow.utils.trigger_rule import TriggerRule +PATH_TO_PYTHON_BINARY = sys.executable + with DAG( dag_id="example_branch_operator", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule="@daily", tags=["example", "example2"], + orientation="TB", ) as dag: run_this_first = EmptyOperator( task_id="run_this_first", ) - options = ["branch_a", "branch_b", "branch_c", "branch_d"] + options = ["a", "b", "c", "d"] + + # Example branching on standard Python tasks + # [START howto_operator_branch_python] branching = BranchPythonOperator( task_id="branching", - python_callable=lambda: random.choice(options), + python_callable=lambda: f"branch_{random.choice(options)}", ) + # [END howto_operator_branch_python] run_this_first >> branching join = EmptyOperator( @@ -53,8 +73,9 @@ ) for option in options: - t = EmptyOperator( - task_id=option, + t = PythonOperator( + task_id=f"branch_{option}", + python_callable=lambda: print("Hello World"), ) empty_follow = EmptyOperator( @@ -63,3 +84,84 @@ # Label is optional here, but it can help identify more complex branches branching >> Label(option) >> t >> empty_follow >> join + + # Example the same with external Python calls + + # [START howto_operator_branch_ext_py] + def branch_with_external_python(choices): + import random + + return f"ext_py_{random.choice(choices)}" + + branching_ext_py = BranchExternalPythonOperator( + task_id="branching_ext_python", + python=PATH_TO_PYTHON_BINARY, + python_callable=branch_with_external_python, + op_args=[options], + ) + # [END howto_operator_branch_ext_py] + join >> branching_ext_py + + join_ext_py = EmptyOperator( + task_id="join_ext_python", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) + + def hello_world_with_external_python(): + print("Hello World from external Python") + + for option in options: + t = ExternalPythonOperator( + task_id=f"ext_py_{option}", + python=PATH_TO_PYTHON_BINARY, + python_callable=hello_world_with_external_python, + ) + + # Label is optional here, but it can help identify more complex branches + branching_ext_py >> Label(option) >> t >> join_ext_py + + # Example the same with Python virtual environments + + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = Path(tempfile.gettempdir()) + + def branch_with_venv(choices): + import random + + import numpy as np + + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" + + branching_venv = BranchPythonVirtualenvOperator( + task_id="branching_venv", + requirements=["numpy~=1.24.4"], + venv_cache_path=VENV_CACHE_PATH, + python_callable=branch_with_venv, + op_args=[options], + ) + # [END howto_operator_branch_virtualenv] + join_ext_py >> branching_venv + + join_venv = EmptyOperator( + task_id="join_venv", + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, + ) + + def hello_world_with_venv(): + import numpy as np + + print(f"Hello World with some numpy stuff: {np.arange(6)}") + + for option in options: + t = PythonVirtualenvOperator( + task_id=f"venv_{option}", + requirements=["numpy~=1.24.4"], + venv_cache_path=VENV_CACHE_PATH, + python_callable=hello_world_with_venv, + ) + + # Label is optional here, but it can help identify more complex branches + branching_venv >> Label(option) >> t >> join_venv diff --git a/airflow/example_dags/example_branch_operator_decorator.py b/airflow/example_dags/example_branch_operator_decorator.py index b250c1207338c3..5d42ff6b2726fc 100644 --- a/airflow/example_dags/example_branch_operator_decorator.py +++ b/airflow/example_dags/example_branch_operator_decorator.py @@ -15,10 +15,17 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Example DAG demonstrating the usage of the ``@task.branch`` TaskFlow API decorator.""" +"""Example DAG demonstrating the usage of the branching TaskFlow API decorators. + +It shows how to use standard Python ``@task.branch`` as well as the external Python +version ``@task.branch_external_python`` which calls an external Python interpreter and +the ``@task.branch_virtualenv`` which builds a temporary Python virtual environment. +""" from __future__ import annotations import random +import sys +import tempfile import pendulum @@ -28,31 +35,110 @@ from airflow.utils.edgemodifier import Label from airflow.utils.trigger_rule import TriggerRule +PATH_TO_PYTHON_BINARY = sys.executable + with DAG( dag_id="example_branch_python_operator_decorator", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule="@daily", tags=["example", "example2"], + orientation="TB", ) as dag: run_this_first = EmptyOperator(task_id="run_this_first") - options = ["branch_a", "branch_b", "branch_c", "branch_d"] + options = ["a", "b", "c", "d"] + + # Example branching on standard Python tasks - @task.branch(task_id="branching") - def random_choice(choices: list[str]) -> str: - return random.choice(choices) + # [START howto_operator_branch_python] + @task.branch() + def branching(choices: list[str]) -> str: + return f"branch_{random.choice(choices)}" - random_choice_instance = random_choice(choices=options) + # [END howto_operator_branch_python] + + random_choice_instance = branching(choices=options) run_this_first >> random_choice_instance join = EmptyOperator(task_id="join", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) for option in options: - t = EmptyOperator(task_id=option) - empty_follow = EmptyOperator(task_id="follow_" + option) + @task(task_id=f"branch_{option}") + def some_task(): + print("doing something in Python") + + t = some_task() + empty = EmptyOperator(task_id=f"follow_{option}") + + # Label is optional here, but it can help identify more complex branches + random_choice_instance >> Label(option) >> t >> empty >> join + + # Example the same with external Python calls + + # [START howto_operator_branch_ext_py] + @task.branch_external_python(python=PATH_TO_PYTHON_BINARY) + def branching_ext_python(choices) -> str: + import random + + return f"ext_py_{random.choice(choices)}" + + # [END howto_operator_branch_ext_py] + + random_choice_ext_py = branching_ext_python(choices=options) + + join >> random_choice_ext_py + + join_ext_py = EmptyOperator(task_id="join_ext_py", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + + for option in options: + + @task.external_python(task_id=f"ext_py_{option}", python=PATH_TO_PYTHON_BINARY) + def some_ext_py_task(): + print("doing something in external Python") + + t = some_ext_py_task() + + # Label is optional here, but it can help identify more complex branches + random_choice_ext_py >> Label(option) >> t >> join_ext_py + + # Example the same with Python virtual environments + + # [START howto_operator_branch_virtualenv] + # Note: Passing a caching dir allows to keep the virtual environment over multiple runs + # Run the example a second time and see that it re-uses it and is faster. + VENV_CACHE_PATH = tempfile.gettempdir() + + @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH) + def branching_virtualenv(choices) -> str: + import random + + import numpy as np + + print(f"Some numpy stuff: {np.arange(6)}") + return f"venv_{random.choice(choices)}" + + # [END howto_operator_branch_virtualenv] + + random_choice_venv = branching_virtualenv(choices=options) + + join_ext_py >> random_choice_venv + + join_venv = EmptyOperator(task_id="join_venv", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) + + for option in options: + + @task.virtualenv( + task_id=f"venv_{option}", requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH + ) + def some_venv_task(): + import numpy as np + + print(f"Some numpy stuff: {np.arange(6)}") + + t = some_venv_task() # Label is optional here, but it can help identify more complex branches - random_choice_instance >> Label(option) >> t >> empty_follow >> join + random_choice_venv >> Label(option) >> t >> join_venv diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index 6adbbe8b9e6775..02302d59692768 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -103,7 +103,15 @@ Otherwise you won't have access to the most context variables of Airflow in ``op If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and ``lazy_object_proxy``. -If additional parameters for package installation are needed pass them in ``requirements.txt`` as in the example below: +.. warning:: + The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code. + As in the examples you need to add all imports again and you can not rely on variables from the global Python context. + + If you want to pass variables into the classic :class:`~airflow.operators.python.PythonVirtualenvOperator` use + ``op_args`` and ``op_kwargs``. + +If additional parameters for package installation are needed pass them in via the ``pip_install_options`` parameter or use a +``requirements.txt`` as in the example below: .. code-block:: @@ -196,6 +204,100 @@ Otherwise you won't have access to the most context variables of Airflow in ``op If you want the context related to datetime objects like ``data_interval_start`` you can add ``pendulum`` and ``lazy_object_proxy`` to your virtual environment. +.. warning:: + The Python function body defined to be executed is cut out of the DAG into a temporary file w/o surrounding code. + As in the examples you need to add all imports again and you can not rely on variables from the global Python context. + + If you want to pass variables into the classic :class:`~airflow.operators.python.ExternalPythonOperator` use + ``op_args`` and ``op_kwargs``. + +.. _howto/operator:PythonBranchOperator: + +PythonBranchOperator +==================== + +Use the ``@task.branch`` decorator to execute Python :ref:`branching ` tasks. + +.. warning:: + The ``@task.branch`` decorator is recommended over the classic :class:`~airflow.operators.python.PythonBranchOperator` + to execute Python code. + +TaskFlow example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_python] + :end-before: [END howto_operator_branch_python] + +Classic example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_python] + :end-before: [END howto_operator_branch_python] + +Argument passing and templating options are the same like with :ref:`howto/operator:PythonOperator`. + +.. _howto/operator:PythonBranchOperator: + +BranchPythonVirtualenvOperator +============================== + +Use the ``@task.branch_virtualenv`` decorator to execute Python :ref:`branching ` tasks and is a hybrid of +the branch decorator with execution in a virtual environment. + +.. warning:: + The ``@task.branch_virtualenv`` decorator is recommended over the classic + :class:`~airflow.operators.python.BranchPythonVirtualenvOperator` to execute Python code. + +TaskFlow example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_virtualenv] + :end-before: [END howto_operator_branch_virtualenv] + +Classic example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_virtualenv] + :end-before: [END howto_operator_branch_virtualenv] + +Argument passing and templating options are the same like with :ref:`howto/operator:PythonVirtualenvOperator`. + +BranchExternalPythonOperator +============================ + +Use the ``@task.branch_external_python`` decorator to execute Python :ref:`branching ` tasks and is a hybrid of +the branch decorator with execution in an external Python environment. + +.. warning:: + The ``@task.branch_external_python`` decorator is recommended over the classic + :class:`~airflow.operators.python.BranchExternalPythonOperator` to execute Python code. + +TaskFlow example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator_decorator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_ext_py] + :end-before: [END howto_operator_branch_ext_py] + +Classic example of using the operator: + +.. exampleinclude:: /../../airflow/example_dags/example_branch_operator.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_branch_ext_py] + :end-before: [END howto_operator_branch_ext_py] + +Argument passing and templating options are the same like with :ref:`howto/operator:ExternalPythonOperator`. + .. _howto/operator:ShortCircuitOperator: ShortCircuitOperator From 265442ef34a00d16cf32ddaa498816bbe6b7eb7e Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 19 Oct 2023 23:52:26 +0200 Subject: [PATCH 3/4] Fix documentation build and references --- airflow/operators/python.py | 8 ++++++++ docs/apache-airflow/howto/operator/python.rst | 4 +++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/airflow/operators/python.py b/airflow/operators/python.py index 118d1d4bdb1c95..0eadb5441ada67 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -729,6 +729,10 @@ class BranchPythonVirtualenvOperator(PythonVirtualenvOperator, BranchMixIn): these paths can't move forward. The ``skipped`` states are propagated downstream to allow for the DAG state to fill up and the DAG run's state to be inferred. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:BranchPythonVirtualenvOperator` """ def execute(self, context: Context) -> Any: @@ -910,6 +914,10 @@ class BranchExternalPythonOperator(ExternalPythonOperator, BranchMixIn): Extends ExternalPythonOperator, so expects to get Python: virtual environment that should be used (in ``VENV/bin`` folder). Should be absolute path, so it can run on separate virtual environment similarly to ExternalPythonOperator. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:BranchExternalPythonOperator` """ def execute(self, context: Context) -> Any: diff --git a/docs/apache-airflow/howto/operator/python.rst b/docs/apache-airflow/howto/operator/python.rst index 02302d59692768..6cfb5a335d24a8 100644 --- a/docs/apache-airflow/howto/operator/python.rst +++ b/docs/apache-airflow/howto/operator/python.rst @@ -240,7 +240,7 @@ Classic example of using the operator: Argument passing and templating options are the same like with :ref:`howto/operator:PythonOperator`. -.. _howto/operator:PythonBranchOperator: +.. _howto/operator:BranchPythonVirtualenvOperator: BranchPythonVirtualenvOperator ============================== @@ -270,6 +270,8 @@ Classic example of using the operator: Argument passing and templating options are the same like with :ref:`howto/operator:PythonVirtualenvOperator`. +.. _howto/operator:BranchExternalPythonOperator: + BranchExternalPythonOperator ============================ From 6608c5681afd1f9e25668b38ed4c2497aaa2ecc7 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Fri, 20 Oct 2023 23:42:32 +0200 Subject: [PATCH 4/4] Fix pytest checking exact layout of example DAG --- tests/jobs/test_backfill_job.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/tests/jobs/test_backfill_job.py b/tests/jobs/test_backfill_job.py index 8d592dfe65c9db..195a959ee9b1cc 100644 --- a/tests/jobs/test_backfill_job.py +++ b/tests/jobs/test_backfill_job.py @@ -244,11 +244,23 @@ def test_backfill_multi_dates(self): "branch_b", "branch_c", "branch_d", - "follow_branch_a", - "follow_branch_b", - "follow_branch_c", - "follow_branch_d", + "follow_a", + "follow_b", + "follow_c", + "follow_d", "join", + "branching_ext_python", + "ext_py_a", + "ext_py_b", + "ext_py_c", + "ext_py_d", + "join_ext_python", + "branching_venv", + "venv_a", + "venv_b", + "venv_c", + "venv_d", + "join_venv", ), ], [