Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DockerOperator support for Nvidia-Docker #9492

Closed
zacwellmer opened this issue Jun 23, 2020 · 10 comments · Fixed by #23554
Closed

DockerOperator support for Nvidia-Docker #9492

zacwellmer opened this issue Jun 23, 2020 · 10 comments · Fixed by #23554

Comments

@zacwellmer
Copy link

zacwellmer commented Jun 23, 2020

Description

Offer support for Nvidia-Docker. I'm not sure how feasible this is b/c it seems like docker's python package doesn't support the gpus flag (related issue w/ docker's python package). Although it does appear to be on their radar (this PR)

Use case / motivation
Support running GPU accelerated docker operators. This would also be useful to folks looking to run tasks in parallel on separate GPUs.

@zacwellmer zacwellmer added the kind:feature Feature Requests label Jun 23, 2020
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 23, 2020

Thanks for opening your first issue here! Be sure to follow the issue template!

@zacwellmer zacwellmer changed the title pass GPUs argument to DockerOperator DockerOperator support for Nvidia-Docker Jun 23, 2020
@nullhack
Copy link
Contributor

DockerOperator uses docker-py internally, so It'll not be possible to think about including It until that PR is merged.

@EpicWink
Copy link

PR merged!

@zacwellmer
Copy link
Author

@nullhack could be a good time to revisit the issue now that the PR is merged

@TediPapajorgji
Copy link
Contributor

This may be helpful, I monkey patched the DockerOperator to support GPU's in my dag, code below. I'm using package version 2.4.1 of apache-airflow-providers-docker

The key difference from the following monkey patch is the addition of (you can find it in the code below):

device_requests=[
    docker.types.DeviceRequest(count=-1, capabilities=[['gpu']]) # This part here is what we added to support gpu's
]

I placed this code inside my DAG python file right before i declare the dag with with DAG(....) as dag:

from airflow.operators.docker_operator import DockerOperator

# This is ripped from https://github.com/apache/airflow/blob/5b45a78dca284e504280964951f079fca1866226/airflow/providers/docker/operators/docker.py#L38
# to allow the monkey patch below to operater
def stringify(line: Union[str, bytes]):
    """Make sure string is returned even if bytes are passed. Docker stream can return bytes."""
    decode_method = getattr(line, 'decode', None)
    if decode_method:
        return decode_method(encoding='utf-8', errors='surrogateescape')
    else:
        return line
        
# This is a copy from https://github.com/apache/airflow/blob/5b45a78dca284e504280964951f079fca1866226/airflow/providers/docker/operators/docker.py#L257
# with a slight modification to support GPU instances
def new_run_image_with_mounts(
        self, target_mounts, add_tmp_variable: bool
) -> Optional[Union[List[str], str]]:
    if add_tmp_variable:
        self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
    else:
        self.environment.pop('AIRFLOW_TMP_DIR', None)
    if not self.cli:
        raise Exception("The 'cli' should be initialized before!")
    self.container = self.cli.create_container(
        command=self.format_command(self.command),
        name=self.container_name,
        environment={**self.environment, **self._private_environment},
        host_config=self.cli.create_host_config(
            auto_remove=False,
            mounts=target_mounts,
            network_mode=self.network_mode,
            shm_size=self.shm_size,
            dns=self.dns,
            dns_search=self.dns_search,
            cpu_shares=int(round(self.cpus * 1024)),
            mem_limit=self.mem_limit,
            cap_add=self.cap_add,
            extra_hosts=self.extra_hosts,
            privileged=self.privileged,
            device_requests=[
                docker.types.DeviceRequest(count=-1, capabilities=[['gpu']]) # This part here is what we added to support gpu's
            ],
        ),
        image=self.image,
        user=self.user,
        entrypoint=self.format_command(self.entrypoint),
        working_dir=self.working_dir,
        tty=self.tty,
    )
    logstream = self.cli.attach(container=self.container['Id'], stdout=True, stderr=True, stream=True)
    try:
        self.cli.start(self.container['Id'])

        log_lines = []
        for log_chunk in logstream:
            log_chunk = stringify(log_chunk).strip()
            log_lines.append(log_chunk)
            self.log.info("%s", log_chunk)

        result = self.cli.wait(self.container['Id'])
        if result['StatusCode'] != 0:
            joined_log_lines = "\n".join(log_lines)
            raise AirflowException(f'Docker container failed: {repr(result)} lines {joined_log_lines}')

        if self.retrieve_output:
            return self._attempt_to_retrieve_result()
        elif self.do_xcom_push:
            log_parameters = {
                'container': self.container['Id'],
                'stdout': True,
                'stderr': True,
                'stream': True,
            }
            try:
                if self.xcom_all:
                    return [stringify(line).strip() for line in self.cli.logs(**log_parameters)]
                else:
                    lines = [stringify(line).strip() for line in self.cli.logs(**log_parameters, tail=1)]
                    return lines[-1] if lines else None
            except StopIteration:
                # handle the case when there is not a single line to iterate on
                return None
        return None
    finally:
        if self.auto_remove:
            self.cli.remove_container(self.container['Id'])

# Monkey patch our modified function to the DockerOperator
DockerOperator._run_image_with_mounts = new_run_image_with_mounts

After monkey patching it i just use the docker operator as per the usual

train = DockerOperator(
        task_id='task-id-here',
        image='image here',
        api_version='auto',
        auto_remove=True,
        command='some command here',
        docker_url="unix://var/run/docker.sock",
        network_mode="bridge"
    )

You can customize this further and make thedevice_requests array a parameter to the function and then when initializing the DockerOperator pass whatever you need to it - but for the sake of this example i didn't do that.

@potiuk
Copy link
Member

potiuk commented Mar 3, 2022

Maybe you would like to submit it as a PR @TediPapajorgji ? alternateively - maybe (if I submit it and merge a PR) could you please test an RC (I could release it tomorrow/day after as I plan to release providers then,

@potiuk
Copy link
Member

potiuk commented Mar 3, 2022

(PR Would require to pass the capabilities as parameter).

@TediPapajorgji
Copy link
Contributor

@potiuk absolutely let me get a pr going! I'll make those adjustments

@TediPapajorgji
Copy link
Contributor

@potiuk here it is #21974!

@eladkal
Copy link
Contributor

eladkal commented Mar 20, 2022

According to docker/docker-py#2395 (comment)

device_requests=[
    docker.types.DeviceRequest(count=-1, capabilities=[['gpu']])
]

Is a working solution - since no better way provided by upstream lib this is what we can support. So #21974 will solve this issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment