Skip to content

Commit

Permalink
Adds option to disable mounting temporary folder in DockerOperator (#…
Browse files Browse the repository at this point in the history
…16932)

* Adds option to disable mounting temporary folder in DockerOperator

The DockerOperator by default mounts temporary folder to inside
the container in order to allow to store files bigger than
default size of disk for the container, however this did not work
when remote Docker engine or Docker-In-Docker solution was used.

This worked before the #15843 change, because the /tmp has
been ignored, however when we change to "Mounts", the "/tmp"
mount fails when using remote docker engine.

This PR adds parameter that allows to disable this temporary
directory mounting (and adds a note that it can be replaced
with mounting existing volumes). Also it prints a warning
if the directory cannot be mounted and attempts to re-run
such failed attempt without mounting the temporary
directory which brings back backwards-compatible behaviour
for remote engines and docker-in-docker.

Fixes: #16803
Fixes: #16806
  • Loading branch information
potiuk authored Jul 15, 2021
1 parent fc0250f commit bc00415
Show file tree
Hide file tree
Showing 2 changed files with 258 additions and 61 deletions.
151 changes: 90 additions & 61 deletions airflow/providers/docker/operators/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Dict, Iterable, List, Optional, Union

from docker import APIClient, tls
from docker.errors import APIError
from docker.types import Mount

from airflow.exceptions import AirflowException
Expand All @@ -32,12 +33,23 @@ class DockerOperator(BaseOperator):
"""
Execute a command inside a docker container.
A temporary directory is created on the host and
mounted into a container to allow storing files
By default, a temporary directory is
created on the host and mounted into a container to allow storing files
that together exceed the default disk size of 10GB in a container.
The path to the mounted directory can be accessed
In this case The path to the mounted directory can be accessed
via the environment variable ``AIRFLOW_TMP_DIR``.
If the volume cannot be mounted, warning is printed and an attempt is made to execute the docker
command without the temporary folder mounted. This is to make it works by default with remote docker
engine or when you run docker-in-docker solution and temporary directory is not shared with the
docker engine. Warning is printed in logs in this case.
If you know you run DockerOperator with remote engine or via docker-in-docker
you should set ``mount_tmp_dir`` parameter to False. In this case, you can still use
``mounts`` parameter to mount already existing named volumes in your Docker Engine
to achieve similar capability where you can store files exceeding default disk size
of the container,
If a login to a private registry is required prior to pulling the image, a
Docker connection needs to be configured in Airflow and the connection ID
be provided with the parameter ``docker_conn_id``.
Expand Down Expand Up @@ -88,6 +100,9 @@ class DockerOperator(BaseOperator):
:type tls_hostname: str or bool
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
:type tls_ssl_version: str
:param mount_tmp_dir: Specify whether the temporary directory should be bind-mounted
from the host to the container. Defaults to True
:type mount_tmp_dir: bool
:param tmp_dir: Mount point inside the container to
a temporary directory created on the host by the operator.
The path is also made available via the environment variable
Expand Down Expand Up @@ -154,6 +169,7 @@ def __init__(
tls_client_key: Optional[str] = None,
tls_hostname: Optional[Union[str, bool]] = None,
tls_ssl_version: Optional[str] = None,
mount_tmp_dir: bool = True,
tmp_dir: str = '/tmp/airflow',
user: Optional[Union[str, int]] = None,
mounts: Optional[List[Mount]] = None,
Expand Down Expand Up @@ -193,6 +209,7 @@ def __init__(
self.tls_client_key = tls_client_key
self.tls_hostname = tls_hostname
self.tls_ssl_version = tls_ssl_version
self.mount_tmp_dir = mount_tmp_dir
self.tmp_dir = tmp_dir
self.user = user
self.mounts = mounts or []
Expand Down Expand Up @@ -227,66 +244,80 @@ def get_hook(self) -> DockerHook:
def _run_image(self) -> Optional[str]:
"""Run a Docker container with the provided image"""
self.log.info('Starting docker container from image %s', self.image)
if not self.cli:
raise Exception("The 'cli' should be initialized before!")
if self.mount_tmp_dir:
with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir:
tmp_mount = Mount(self.tmp_dir, host_tmp_dir, "bind")
try:
return self._run_image_with_mounts(self.mounts + [tmp_mount], add_tmp_variable=True)
except APIError as e:
if self.host_tmp_dir in str(e):
self.log.warning(
"Using remote engine or docker-in-docker and mounting temporary "
"volume from host is not supported. Falling back to "
"`mount_tmp_dir=False` mode. You can set `mount_tmp_dir` parameter"
" to False to disable mounting and remove the warning"
)
return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)
raise
else:
return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)

with TemporaryDirectory(prefix='airflowtmp', dir=self.host_tmp_dir) as host_tmp_dir:
if not self.cli:
raise Exception("The 'cli' should be initialized before!")
tmp_mount = Mount(self.tmp_dir, host_tmp_dir, "bind")
self.container = self.cli.create_container(
command=self.format_command(self.command),
name=self.container_name,
environment={**self.environment, **self._private_environment},
host_config=self.cli.create_host_config(
auto_remove=False,
mounts=self.mounts + [tmp_mount],
network_mode=self.network_mode,
shm_size=self.shm_size,
dns=self.dns,
dns_search=self.dns_search,
cpu_shares=int(round(self.cpus * 1024)),
mem_limit=self.mem_limit,
cap_add=self.cap_add,
extra_hosts=self.extra_hosts,
privileged=self.privileged,
),
image=self.image,
user=self.user,
entrypoint=self.format_command(self.entrypoint),
working_dir=self.working_dir,
tty=self.tty,
)

lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)

try:
self.cli.start(self.container['Id'])
def _run_image_with_mounts(self, target_mounts, add_tmp_variable: bool) -> Optional[str]:
if add_tmp_variable:
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
else:
self.environment.pop('AIRFLOW_TMP_DIR', None)
self.container = self.cli.create_container(
command=self.format_command(self.command),
name=self.container_name,
environment={**self.environment, **self._private_environment},
host_config=self.cli.create_host_config(
auto_remove=False,
mounts=target_mounts,
network_mode=self.network_mode,
shm_size=self.shm_size,
dns=self.dns,
dns_search=self.dns_search,
cpu_shares=int(round(self.cpus * 1024)),
mem_limit=self.mem_limit,
cap_add=self.cap_add,
extra_hosts=self.extra_hosts,
privileged=self.privileged,
),
image=self.image,
user=self.user,
entrypoint=self.format_command(self.entrypoint),
working_dir=self.working_dir,
tty=self.tty,
)
lines = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
try:
self.cli.start(self.container['Id'])

line = ''
res_lines = []
for line in lines:
line = line.strip()
if hasattr(line, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
line = line.decode('utf-8')
res_lines.append(line)
self.log.info(line)
line = ''
res_lines = []
for line in lines:
line = line.strip()
if hasattr(line, 'decode'):
# Note that lines returned can also be byte sequences so we have to handle decode here
line = line.decode('utf-8')
res_lines.append(line)
self.log.info(line)

result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
res_lines = "\n".join(res_lines)
raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")
result = self.cli.wait(self.container['Id'])
if result['StatusCode'] != 0:
res_lines = "\n".join(res_lines)
raise AirflowException('docker container failed: ' + repr(result) + f"lines {res_lines}")

ret = None
if self.do_xcom_push:
ret = (
self.cli.logs(container=self.container['Id'])
if self.xcom_all
else line.encode('utf-8')
)
return ret
finally:
if self.auto_remove:
self.cli.remove_container(self.container['Id'])
ret = None
if self.do_xcom_push:
ret = self.cli.logs(container=self.container['Id']) if self.xcom_all else line.encode('utf-8')
return ret
finally:
if self.auto_remove:
self.cli.remove_container(self.container['Id'])

def execute(self, context) -> Optional[str]:
self.cli = self._get_cli()
Expand All @@ -312,8 +343,6 @@ def execute(self, context) -> Optional[str]:
if latest_status.get(output_id) != output_status:
self.log.info("%s: %s", output_id, output_status)
latest_status[output_id] = output_status

self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
return self._run_image()

def _get_cli(self) -> APIClient:
Expand Down
168 changes: 168 additions & 0 deletions tests/providers/docker/operators/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
import logging
import unittest
from unittest import mock
from unittest.mock import call

import pytest
from docker.errors import APIError

from airflow.exceptions import AirflowException

Expand Down Expand Up @@ -119,6 +121,172 @@ def test_execute(self):
operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value
)

def test_execute_no_temp_dir(self):
operator = DockerOperator(
api_version='1.19',
command='env',
environment={'UNIT': 'TEST'},
private_environment={'PRIVATE': 'MESSAGE'},
image='ubuntu:latest',
network_mode='bridge',
owner='unittest',
task_id='unittest',
mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
mount_tmp_dir=False,
entrypoint='["sh", "-c"]',
working_dir='/container/path',
shm_size=1000,
host_tmp_dir='/host/airflow',
container_name='test_container',
tty=True,
)
operator.execute(None)

self.client_class_mock.assert_called_once_with(
base_url='unix://var/run/docker.sock', tls=None, version='1.19'
)

self.client_mock.create_container.assert_called_once_with(
command='env',
name='test_container',
environment={'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'},
host_config=self.client_mock.create_host_config.return_value,
image='ubuntu:latest',
user=None,
entrypoint=['sh', '-c'],
working_dir='/container/path',
tty=True,
)
self.client_mock.create_host_config.assert_called_once_with(
mounts=[
Mount(source='/host/path', target='/container/path', type='bind'),
],
network_mode='bridge',
shm_size=1000,
cpu_shares=1024,
mem_limit=None,
auto_remove=False,
dns=None,
dns_search=None,
cap_add=None,
extra_hosts=None,
privileged=False,
)
self.tempdir_mock.assert_not_called()
self.client_mock.images.assert_called_once_with(name='ubuntu:latest')
self.client_mock.attach.assert_called_once_with(
container='some_id', stdout=True, stderr=True, stream=True
)
self.client_mock.pull.assert_called_once_with('ubuntu:latest', stream=True, decode=True)
self.client_mock.wait.assert_called_once_with('some_id')
assert (
operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value
)

def test_execute_fallback_temp_dir(self):
self.client_mock.create_container.side_effect = [
APIError(message="wrong path: " + "/host/airflow"),
{'Id': 'some_id'},
]
operator = DockerOperator(
api_version='1.19',
command='env',
environment={'UNIT': 'TEST'},
private_environment={'PRIVATE': 'MESSAGE'},
image='ubuntu:latest',
network_mode='bridge',
owner='unittest',
task_id='unittest',
mounts=[Mount(source='/host/path', target='/container/path', type='bind')],
mount_tmp_dir=True,
entrypoint='["sh", "-c"]',
working_dir='/container/path',
shm_size=1000,
host_tmp_dir='/host/airflow',
container_name='test_container',
tty=True,
)
with self.assertLogs(operator.log, level=logging.WARNING) as captured:
operator.execute(None)
assert (
"WARNING:airflow.task.operators:Using remote engine or docker-in-docker "
"and mounting temporary volume from host is not supported" in captured.output[0]
)
self.client_class_mock.assert_called_once_with(
base_url='unix://var/run/docker.sock', tls=None, version='1.19'
)
self.client_mock.create_container.assert_has_calls(
[
call(
command='env',
name='test_container',
environment={'AIRFLOW_TMP_DIR': '/tmp/airflow', 'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'},
host_config=self.client_mock.create_host_config.return_value,
image='ubuntu:latest',
user=None,
entrypoint=['sh', '-c'],
working_dir='/container/path',
tty=True,
),
call(
command='env',
name='test_container',
environment={'UNIT': 'TEST', 'PRIVATE': 'MESSAGE'},
host_config=self.client_mock.create_host_config.return_value,
image='ubuntu:latest',
user=None,
entrypoint=['sh', '-c'],
working_dir='/container/path',
tty=True,
),
]
)
self.client_mock.create_host_config.assert_has_calls(
[
call(
mounts=[
Mount(source='/host/path', target='/container/path', type='bind'),
Mount(source='/mkdtemp', target='/tmp/airflow', type='bind'),
],
network_mode='bridge',
shm_size=1000,
cpu_shares=1024,
mem_limit=None,
auto_remove=False,
dns=None,
dns_search=None,
cap_add=None,
extra_hosts=None,
privileged=False,
),
call(
mounts=[
Mount(source='/host/path', target='/container/path', type='bind'),
],
network_mode='bridge',
shm_size=1000,
cpu_shares=1024,
mem_limit=None,
auto_remove=False,
dns=None,
dns_search=None,
cap_add=None,
extra_hosts=None,
privileged=False,
),
]
)
self.tempdir_mock.assert_called_once_with(dir='/host/airflow', prefix='airflowtmp')
self.client_mock.images.assert_called_once_with(name='ubuntu:latest')
self.client_mock.attach.assert_called_once_with(
container='some_id', stdout=True, stderr=True, stream=True
)
self.client_mock.pull.assert_called_once_with('ubuntu:latest', stream=True, decode=True)
self.client_mock.wait.assert_called_once_with('some_id')
assert (
operator.cli.pull('ubuntu:latest', stream=True, decode=True) == self.client_mock.pull.return_value
)

def test_private_environment_is_private(self):
operator = DockerOperator(
private_environment={'PRIVATE': 'MESSAGE'}, image='ubuntu:latest', task_id='unittest'
Expand Down

0 comments on commit bc00415

Please sign in to comment.