Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Options for installing extra packages at runtime #31

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions dask_chtc/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@
import datetime
import logging
import math
import os
import random
from pathlib import Path
from typing import Any, Dict, Iterable, Mapping, Optional, Set, Union
from typing import Any, Dict, Iterable, List, Mapping, Optional, Set, Union

import classad
import dask
import psutil
import yaml
from dask_jobqueue import HTCondorCluster
from dask_jobqueue.htcondor import HTCondorJob
from distributed.security import Security
Expand All @@ -21,6 +24,8 @@
ENTRYPOINT_SCRIPT_PATH = (PACKAGE_DIR / "entrypoint.sh").absolute()

PORT_INSIDE_CONTAINER = 8787

# This is bespoke knowledge about submit3's configuration
SCHEDULER_PORTS = set(range(3000, 4000))


Expand All @@ -35,7 +40,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)


T_PORT_ARG = Union[int, Iterable[int]]
T_CONDA_ENV = Optional[Union[os.PathLike, Dict[str, Union[str, List[str]]]]]


class CHTCCluster(HTCondorCluster):
Expand All @@ -56,10 +61,13 @@ def __init__(
self,
*,
worker_image: Optional[str] = None,
conda_env: T_CONDA_ENV = None,
conda_packages: Optional[List[str]] = None,
pip_packages: Optional[List[str]] = None,
gpu_lab: bool = False,
gpus: Optional[int] = None,
batch_name: Optional[str] = None,
python: str = "./entrypoint.sh python3",
python: str = "$_CONDOR_SCRATCH_DIR/entrypoint.sh python3",
**kwargs: Any,
):
"""
Expand All @@ -71,6 +79,16 @@ def __init__(
(`Dockerfile <https://hub.docker.com/r/daskdev/dask/dockerfile>`_).
See :ref:`this page <docker>`
for advice on building Docker images for use with Dask-CHTC.
conda_env
A conda environment, which will be used to update
(i.e., ``conda update -f environment.yml``)
the worker's environment before it starts.
Accepts either a path to a conda ``environment.yml`` file,
or a dictionary representation of such a file.
conda_packages
A list of additional packages to install via ``conda``.
pip_packages
A list of additional packages to install via ``pip``.
gpu_lab
If ``True``, workers will be allowed to run on GPULab nodes.
If this is ``True``, the default value of ``gpus`` becomes ``1``.
Expand All @@ -93,7 +111,14 @@ def __init__(
"""

kwargs = self._modify_kwargs(
kwargs, worker_image=worker_image, gpu_lab=gpu_lab, gpus=gpus, batch_name=batch_name,
kwargs,
worker_image=worker_image,
conda_env=conda_env,
conda_packages=conda_packages,
pip_packages=pip_packages,
gpu_lab=gpu_lab,
gpus=gpus,
batch_name=batch_name,
)

super().__init__(python=python, **kwargs)
Expand All @@ -104,6 +129,9 @@ def _modify_kwargs(
kwargs: Dict[str, Any],
*,
worker_image: Optional[str] = None,
conda_env: T_CONDA_ENV = None,
conda_packages: Optional[List[str]] = None,
pip_packages: Optional[List[str]] = None,
gpu_lab: bool = False,
gpus: Optional[int] = None,
batch_name: Optional[str] = None,
Expand Down Expand Up @@ -156,6 +184,18 @@ def _modify_kwargs(
# These get put in the HTCondor job submit description.
gpus = gpus or dask.config.get(f"jobqueue.{cls.config_name}.gpus")
gpu_lab = gpu_lab or dask.config.get(f"jobqueue.{cls.config_name}.gpu-lab")

# Produce a string containing the contents of a conda environment.yml
# file, which entrypoint.sh will turn into an actual file at runtime
if conda_env is None:
conda_env_val = ""
elif isinstance(conda_env, os.PathLike):
conda_env_val = Path(conda_env).read_text()
elif isinstance(conda_env, dict):
conda_env_val = yaml.safe_dump(conda_env)
else:
raise TypeError("Unexpected type for conda_env argument.")

modified["job_extra"] = merge(
# Run workers in Docker universe.
{
Expand All @@ -169,7 +209,12 @@ def _modify_kwargs(
# See --listen-address below for telling Dask to actually listen to this port.
{"container_service_names": "dask", "dask_container_port": PORT_INSIDE_CONTAINER},
# Transfer our internals and whatever else the user requested.
{"transfer_input_files": tif, "encrypt_input_files": eif},
{
"transfer_input_files": tif,
"encrypt_input_files": eif,
# Always transfer input files, even if we're on the same FileSystemDomain
"should_transfer_files": "yes",
},
# TODO: turn on encrypt_execute_directory ?
# Do not transfer any output files, ever.
{"transfer_output_files": '""'},
Expand All @@ -187,12 +232,20 @@ def _modify_kwargs(
{"requirements": "(Target.HasCHTCStaging)"},
# Support attributes to gather usage data.
{"My.IsDaskWorker": "true"},
# Packages the user wants installed before the worker starts up.
{
"My.CondaEnv": classad.quote(conda_env_val),
"My.ExtraCondaPackages": classad.quote(" ".join(conda_packages or [])),
"My.ExtraPipPackages": classad.quote(" ".join(pip_packages or [])),
},
# Capture anything the user passed in.
kwargs.get("job_extra", dask.config.get(f"jobqueue.{cls.config_name}.job-extra")),
# Overrideable utility/convenience attributes.
{
# This will cause the workers to be grouped in condor_q, with a reasonable name.
"JobBatchName": f'"{batch_name or dask.config.get(f"jobqueue.{cls.config_name}.batch-name")}"',
"JobBatchName": classad.quote(
batch_name or dask.config.get(f"jobqueue.{cls.config_name}.batch-name")
),
# Keep worker claims idle briefly for fast restarts.
"keep_claim_idle": seconds(minutes=10),
# Higher-than-default job priority.
Expand Down
45 changes: 44 additions & 1 deletion dask_chtc/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,12 +1,55 @@
#!/usr/bin/env bash

set -x
set -xu

echo "Dask-CHTC entrypoint executing..."
echo "Incoming command is:"
echo "$@"
echo

# Make sure we're in the _CONDOR_SCRATCH_DIR
cd "$_CONDOR_SCRATCH_DIR" || exit 1
pwd

# shellcheck disable=SC1091
. /opt/conda/etc/profile.d/conda.sh
conda activate base

export CONDA_PKGS_DIRS="$_CONDOR_SCRATCH_DIR/.pkgs"

conda create --clone base --prefix "$_CONDOR_SCRATCH_DIR"/.env
conda activate "$_CONDOR_SCRATCH_DIR"/.env

# Install extra user-specified packages
conda_env=$(grep CondaEnv "$_CONDOR_JOB_AD" | cut -d'"' -f2)
if [ -n "$conda_env" ]; then
echo "Updating conda environment..."
env_file="$_CONDOR_SCRATCH_DIR"/environment.yml
echo -e "$conda_env" > "$env_file"
echo "Conda environment for update:"
cat "$env_file"
echo
conda env update --file "$env_file"
fi

conda_packages=$(grep ExtraCondaPackages "$_CONDOR_JOB_AD" | cut -d'"' -f2)
if [ -n "$conda_packages" ]; then
echo "Installing extra conda packages..."
# We want the packages split into separate args
# shellcheck disable=SC2086
conda install --yes $conda_packages
fi

conda clean --all --yes

pip_packages=$(grep ExtraPipPackages "$_CONDOR_JOB_AD" | cut -d'"' -f2)
if [ -n "$pip_packages" ]; then
echo "Installing extra pip packages..."
# We want the packages split into separate args
# shellcheck disable=SC2086
pip install --user --no-cache-dir $pip_packages
fi

# Wait for the job ad to be updated with <service>_HostPort
# This happens during the first update, usually a few seconds after the job starts
echo "Waiting for HostPort information..."
Expand Down