Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow deployment tool #36

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ AMSDBStage = "ams_wf.AMSDBStage:main"
AMSOrchestrator = "ams_wf.AMSOrchestrator:main"
AMSStore = "ams_wf.AMSStore:main"
AMSTrain = "ams_wf.AMSTrain:main"
AMSDeploy = "ams_wf.AMSDeploy:main"

[project.urls]
"Homepage" = "https://github.com/LLNL/AMS/"

[tool.setuptools]
package-dir = {"" = "src/AMSWorkflow"}
package-dir = {"" = "src/AMSWorkflow/"}
packages = ["ams_wf", "ams"]

# Black formatting
[tool.black]
line-length = 120
preview = true
include = '\.pyi?$'
exclude = '''
/(
Expand Down
3 changes: 1 addition & 2 deletions scripts/bootstrap_flux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ if [[ "$MACHINE" == "lassen" ]] ; then
module load pmi-shim

PMIX_MCA_gds="^ds12,ds21" \
jsrun -a 1 -c ALL_CPUS -g ALL_GPUS -n ${FLUX_NODES} \
--bind=none --smpiargs="-disable_gpu_hooks" \
jsrun -a 1 -c ALL_CPUS -g ALL_GPUS -n ${FLUX_NODES} --bind=none --smpiargs="-disable_gpu_hooks" \
flux start -o,-S,log-filename=$FLUX_LOG -v $FLUX_SLEEP_WRAPPER $FLUX_SERVER &
elif [[ "$MACHINE" == "pascal" || "$MACHINE" == "ruby" ]] ; then
srun -n ${FLUX_NODES} -N ${FLUX_NODES} --pty --mpi=none --mpibind=off \
Expand Down
2 changes: 1 addition & 1 deletion scripts/rmq_add_secrets.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,4 @@ else
echo "[$(date +'%m%d%Y-%T')@$(hostname)] Added secrets successfully."
fi

# check_cmd oc logout
# check_cmd oc logout
96 changes: 96 additions & 0 deletions src/AMSWorkflow/ams/deploy_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import sys
import os
import select
import tempfile
import subprocess as sp
from enum import Enum


class RootSched(Enum):
SLURM = 1
LSF = 2


def _run_daemon(cmd, shell=False):
print(f"Going to run {cmd}")
proc = sp.Popen(
cmd,
shell=shell,
stdin=None,
stdout=sp.PIPE,
stderr=sp.PIPE,
bufsize=1,
text=True,
universal_newlines=True,
close_fds=True,
)
return proc


def _read_flux_uri(proc, timeout=5):
"""
Reads the first line from the flux start command's stdout and puts it into a queue.
:param timeout: The maximum of time we wait for writting to stdout
:param proc: The process from which to read stdout.
"""

# Time to wait for I/O plus the time already waited
total_wait_time = 0
poll_interval = 0.5 # Poll interval in seconds

while total_wait_time < timeout:
# Check if there is data to read from stdout
ready_to_read = select.select([proc.stdout], [], [], poll_interval)[0]
if ready_to_read:
first_line = proc.stdout.readline()
if "ssh" in first_line:
return first_line
total_wait_time += poll_interval
return None


def spawn_rmq_broker(flux_uri):
# TODO We need to implement this, my current specification is limited
# We probably need to access to flux, to spawn a daemon inside the flux allocation
raise NotImplementedError("spawn_rmq_broker is not implemented, spawn it manually and provide the credentials")
return None, None


def start_flux(scheduler, nnodes=None):
def bootstrap_with_slurm(nnodes):
def generate_sleep_script():
script_fn = tempfile.NamedTemporaryFile(prefix="ams_flux_bootstrap", suffix=".sh", delete=False, mode="w")
script = "\n".join(
[
"#!/usr/bin/env bash",
"echo \"ssh://$(hostname)$(flux getattr local-uri | sed -e 's!local://!!')\"",
"sleep inf",
]
)
script_fn.write(script)
script_fn.close()
os.chmod(script_fn.name, 0o777)
return script_fn.name

if nnodes is None:
nnodes = os.environ.get("SLURM_NNODES", None)

bootstrap_cmd = f"srun -N {nnodes} -n {nnodes} --pty --mpi=none --mpibind=off flux start"
script = generate_sleep_script()
print(f"Script Name is {script}")
flux_get_uri_cmd = f"{generate_sleep_script()}"

daemon = _run_daemon(f'{bootstrap_cmd} "{flux_get_uri_cmd}"', shell=True)
flux_uri = _read_flux_uri(daemon, timeout=10)
print("Got flux uri: ", flux_uri)
if flux_uri is None:
print("Fatal Error, Cannot read flux")
daemon.terminate()
raise RuntimeError("Cannot Get FLUX URI")

return daemon, flux_uri, script

if scheduler == RootSched.SLURM:
return bootstrap_with_slurm(nnodes)

raise NotImplementedError("We are only supporting bootstrap through SLURM")
163 changes: 163 additions & 0 deletions src/AMSWorkflow/ams/job_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from dataclasses import dataclass
from pathlib import Path
import os
import sys
import shutil
from warnings import warn
from typing import List, Dict, Optional, ClassVar
from flux.job import JobspecV1
import flux.job as fjob

from ams.loader import load_class


@dataclass(kw_only=True)
class BaseJob:
"""
Class Modeling a Job scheduled by AMS. There can be five types of JOBs (Physics, Stagers, Training, RMQServer and TrainingDispatcher)
"""

name: str
executable: str
nodes: int
tasks_per_node: int
args: List[str] = list()
exclusive: bool = True
cores_per_task: int = 1
environ: Dict[str, str] = dict()
orderId: ClassVar[int] = 0
gpus_per_task: Optional[int] = None
stdout: Optional[str] = None
stderr: Optional[str] = None

def _construct_command(self):
command = [self.executable] + self.args
return command

def _construct_environ(self, forward_environ):
environ = self.environ
if forward_environ is not None:
if not isinstance(forward_environ, type(os.environ)) and not isinstance(forward_environ, dict):
raise TypeError(f"Unsupported forward_environ type ({type(forward_environ)})")
for k, v in forward_environ:
if k in environ:
warn(f"Key {k} already exists in environment ({environ[k]}), prioritizing existing one ({v})")
else:
environ[k] = forward_environ[k]
return environ

def _construct_redirect_paths(self, redirectDir):
stdDir = Path.cwd()
if redirectDir is not None:
stdDir = Path(redirectDir)

if self.stdout is None:
stdout = f"{stdDir}/{self.name}_{BaseJob.orderId}.out"
else:
stdout = f"{stdDir}/{self.stdout}_{BaseJob.orderId}.out"

if self.stderr is None:
stderr = f"{stdDir}/{self.name}_{BaseJob.orderId}.err"
else:
stderr = f"{stdDir}/{self.stderr}_{BaseJob.orderId}.err"

BaseJob.orderId += 1

return stdout, stderr

def schedule(self, flux_handle, forward_environ=None, redirectDir=None, pre_signed=False, waitable=True):
jobspec = JobspecV1.from_command(
command=self._construct_command(),
num_tasks=self.tasks_per_node * self.nodes,
num_nodes=self.nodes,
cores_per_task=self.cores_per_task,
gpus_per_task=self.gpus_per_task,
exclusive=self.exclusive,
)

stdout, stderr = self._construct_redirect_paths(redirectDir)
environ = self._construct_environ(forward_environ)
jobspec.environment = environ
jobspec.stdout = stdout
jobspec.stderr = stderr

return jobspec, fjob.submit(flux_handle, jobspec, pre_signed=pre_signed, waitable=waitable)


@dataclass(kw_only=True)
class PhysicsJob(BaseJob):
def _verify(self):
is_executable = shutil.which(self.executable) is not None
is_path = Path(self.executable).is_file()
return is_executable or is_path

def __post_init__(self):
if not self._verify():
raise RuntimeError(
f"[PhysicsJob] executable is neither a executable nor a system command {self.executable}"
)


@dataclass(kw_only=True, init=False)
class Stager(BaseJob):
def _get_stager_default_cores(self):
"""
We need the following cores:
1 RMQ Client to receive messages
1 Process to store to filesystem
1 Process to make public to kosh
"""
return 3

def _verify(self, pruner_path, pruner_cls):
assert Path(pruner_path).is_file(), "Path to Pruner class should exist"
user_class = load_class(pruner_path, pruner_cls)
print(f"Loaded Pruner Class {user_class.__name__}")

def __init__(
self,
name: str,
num_cores: int,
db_path: str,
pruner_cls: str,
pruner_path: str,
pruner_args: List[str],
num_gpus: Optional[int],
**kwargs,
):
executable = sys.executable

self._verify(pruner_path, pruner_cls)

# TODO: Here we are accessing both the stager arguments and the pruner_arguments. Is is an oppotunity to emit
# an early error message. But, this would require extending argparse or something else. Noting for future reference
cli_arguments = [
"-m",
"ams_wf.AMSDBStage",
"-db",
db_path,
"--policy",
"process",
"--dest",
str(Path(db_path) / Path("candidates")),
"--db-type",
"dhdf5",
"--store",
"-m",
"fs",
"--class",
pruner_cls,
]
cli_arguments += pruner_args

num_cores = self._get_stager_default_cores() + num_cores
super().__init__(
name=name,
executable=executable,
nodes=1,
tasks_per_node=1,
cores_per_task=num_cores,
args=cli_arguments,
gpus_per_task=num_gpus,
**kwargs,
)
Loading
Loading