Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#16037 Templated requirements.txt in Python operators #17349

Merged
merged 21 commits into from
Jan 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions airflow/operators/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,9 @@ class PythonVirtualenvOperator(PythonOperator):
:param python_callable: A python function with no references to outside variables,
defined with def, which will be run in a virtualenv
:type python_callable: function
:param requirements: A list of requirements as specified in a pip install command
:type requirements: list[str]
:param requirements: Either a list of requirement strings, or a (templated)
"requirements file" as specified by pip.
:type requirements: list[str] | str
:param python_version: The Python version to run the virtualenv with. Note that
both 2 and 2.7 are acceptable forms.
:type python_version: Optional[Union[str, int, float]]
Expand Down Expand Up @@ -316,6 +317,8 @@ class PythonVirtualenvOperator(PythonOperator):
:type templates_exts: list[str]
"""

template_fields = ('requirements',)
template_ext = ('.txt',)
BASE_SERIALIZABLE_CONTEXT_KEYS = {
'ds',
'ds_nodash',
Expand Down Expand Up @@ -354,7 +357,7 @@ def __init__(
self,
*,
python_callable: Callable,
requirements: Optional[Iterable[str]] = None,
requirements: Union[None, Iterable[str], str] = None,
python_version: Optional[Union[str, int, float]] = None,
use_dill: bool = False,
system_site_packages: bool = True,
Expand Down Expand Up @@ -390,14 +393,16 @@ def __init__(
templates_exts=templates_exts,
**kwargs,
)
self.requirements = list(requirements or [])
if not requirements:
self.requirements: Union[List[str], str] = []
elif isinstance(requirements, str):
self.requirements = requirements
else:
self.requirements = list(requirements)
self.string_args = string_args or []
self.python_version = python_version
self.use_dill = use_dill
self.system_site_packages = system_site_packages
if not self.system_site_packages:
if self.use_dill and 'dill' not in self.requirements:
self.requirements.append('dill')
self.pickling_library = dill if self.use_dill else pickle

def execute(self, context: Context) -> Any:
Expand All @@ -410,6 +415,19 @@ def determine_kwargs(self, context: Mapping[str, Any]) -> Mapping[str, Any]:

def execute_callable(self):
with TemporaryDirectory(prefix='venv') as tmp_dir:
requirements_file_name = f'{tmp_dir}/requirements.txt'

if not isinstance(self.requirements, str):
requirements_file_contents = "\n".join(str(dependency) for dependency in self.requirements)
else:
requirements_file_contents = self.requirements

if not self.system_site_packages and self.use_dill:
requirements_file_contents += '\ndill'

with open(requirements_file_name, 'w') as file:
file.write(requirements_file_contents)

if self.templates_dict:
self.op_kwargs['templates_dict'] = self.templates_dict

Expand All @@ -422,7 +440,7 @@ def execute_callable(self):
venv_directory=tmp_dir,
python_bin=f'python{self.python_version}' if self.python_version else None,
system_site_packages=self.system_site_packages,
requirements=self.requirements,
requirements_file_path=requirements_file_name,
)

self._write_args(input_filename)
Expand Down
40 changes: 28 additions & 12 deletions airflow/utils/python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import os
import sys
from collections import deque
from typing import List, Optional
from typing import List

import jinja2

Expand All @@ -36,10 +36,12 @@ def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages
return cmd


def _generate_pip_install_cmd(tmp_dir: str, requirements: List[str]) -> Optional[List[str]]:
if not requirements:
return None
# direct path alleviates need to activate
def _generate_pip_install_cmd_from_file(tmp_dir: str, requirements_file_path: str) -> List[str]:
cmd = [f'{tmp_dir}/bin/pip', 'install', '-r']
return cmd + [requirements_file_path]


def _generate_pip_install_cmd_from_list(tmp_dir: str, requirements: List[str]) -> List[str]:
cmd = [f'{tmp_dir}/bin/pip', 'install']
return cmd + requirements

Expand Down Expand Up @@ -75,26 +77,40 @@ def remove_task_decorator(python_source: str, task_decorator_name: str) -> str:


def prepare_virtualenv(
venv_directory: str, python_bin: str, system_site_packages: bool, requirements: List[str]
venv_directory: str,
python_bin: str,
system_site_packages: bool,
requirements: List[str] = None,
requirements_file_path: str = None,
) -> str:
"""
Creates a virtual environment and installs the additional python packages
"""Creates a virtual environment and installs the additional python packages.

:param venv_directory: The path for directory where the environment will be created
:param venv_directory: The path for directory where the environment will be created.
:type venv_directory: str
:param python_bin: Path for python binary
:param python_bin: Path to the Python executable.
:type python_bin: str
:param system_site_packages: Whether to include system_site_packages in your virtualenv.
See virtualenv documentation for more information.
:type system_site_packages: bool
:param requirements: List of additional python packages
:param requirements: List of additional python packages.
:type requirements: List[str]
:param requirements_file_path: Path to the ``requirements.txt`` file.
:type requirements_file_path: str
:return: Path to a binary file with Python in a virtual environment.
:rtype: str
"""
virtualenv_cmd = _generate_virtualenv_cmd(venv_directory, python_bin, system_site_packages)
execute_in_subprocess(virtualenv_cmd)
pip_cmd = _generate_pip_install_cmd(venv_directory, requirements)

if requirements is not None and requirements_file_path is not None:
raise Exception("Either requirements OR requirements_file_path has to be passed, but not both")

pip_cmd = None
if requirements is not None and len(requirements) != 0:
pip_cmd = _generate_pip_install_cmd_from_list(venv_directory, requirements)
if requirements_file_path is not None and requirements_file_path:
pip_cmd = _generate_pip_install_cmd_from_file(venv_directory, requirements_file_path)

if pip_cmd:
execute_in_subprocess(pip_cmd)

Expand Down
14 changes: 14 additions & 0 deletions tests/config_templates/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# comments are not ---

pyparsing==2.4.7

# --- considered as packages

{% if params and params.environ and params.environ == 'templated_unit_test' %}
funcsigs==1.0.2
{% else %}
funcsigs==0.4
{% endif %}

python-dateutil==2.8.1 # including inline comments
pytz==2020.1
5 changes: 3 additions & 2 deletions tests/decorators/test_python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@ class TestPythonVirtualenvDecorator(TestPythonBase):
def test_add_dill(self):
@task.virtualenv(use_dill=True, system_site_packages=False)
def f():
pass
"""Ensure dill is correctly installed."""
import dill # noqa: F401

with self.dag:
ret = f()

assert 'dill' in ret.operator.requirements
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

def test_no_requirements(self):
"""Tests that the python callable is invoked on task run."""
Expand Down
31 changes: 28 additions & 3 deletions tests/operators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# under the License.
import copy
import logging
import os
import sys
import unittest.mock
import warnings
Expand Down Expand Up @@ -45,6 +46,7 @@
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.types import DagRunType
from tests.test_utils import AIRFLOW_MAIN_FOLDER
from tests.test_utils.db import clear_db_runs

DEFAULT_DATE = timezone.datetime(2016, 1, 1)
Expand All @@ -59,6 +61,8 @@
'AIRFLOW_CTX_DAG_RUN_ID',
]

TEMPLATE_SEARCHPATH = os.path.join(AIRFLOW_MAIN_FOLDER, 'tests', 'config_templates')


class Call:
def __init__(self, *args, **kwargs):
Expand Down Expand Up @@ -734,6 +738,7 @@ def setUp(self):
self.dag = DAG(
'test_dag',
default_args={'owner': 'airflow', 'start_date': DEFAULT_DATE},
template_searchpath=TEMPLATE_SEARCHPATH,
schedule_interval=INTERVAL,
)
self.dag.create_dagrun(
Expand All @@ -760,10 +765,10 @@ def _run_as_operator(self, fn, python_version=sys.version_info[0], **kwargs):

def test_add_dill(self):
def f():
pass
"""Ensure dill is correctly installed."""
import dill # noqa: F401

task = self._run_as_operator(f, use_dill=True, system_site_packages=False)
assert 'dill' in task.requirements
self._run_as_operator(f, use_dill=True, system_site_packages=False)

def test_no_requirements(self):
"""Tests that the python callable is invoked on task run."""
Expand Down Expand Up @@ -810,6 +815,26 @@ def f():

self._run_as_operator(f, requirements=['funcsigs>1.0', 'dill'], system_site_packages=False)

def test_requirements_file(self):
def f():
import funcsigs # noqa: F401

self._run_as_operator(f, requirements='requirements.txt', system_site_packages=False)

def test_templated_requirements_file(self):
def f():
import funcsigs

assert funcsigs.__version__ == '1.0.2'

self._run_as_operator(
f,
requirements='requirements.txt',
use_dill=True,
params={'environ': 'templated_unit_test'},
system_site_packages=False,
)

def test_fail(self):
def f():
raise Exception
Expand Down