Skip to content

Commit

Permalink
Merge pull request #7 from gpetretto/develop
Browse files Browse the repository at this point in the history
split io and manager
  • Loading branch information
davidwaroquiers authored Apr 12, 2023
2 parents 543b09b + 46ad014 commit 6d054d8
Show file tree
Hide file tree
Showing 14 changed files with 1,996 additions and 48 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ maintain = [
"git-changelog>=0.6",
]
strict = []
remote = ["fabric>=3.0.0"]
msonable = ["monty>=2022.9.9",]

[project.scripts]
Expand Down
172 changes: 142 additions & 30 deletions src/qtoolkit/core/data_objects.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import annotations

import abc
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path

from qtoolkit.core.base import QBase, QEnum
from qtoolkit.core.exceptions import UnsupportedResourcesError


class SubmissionStatus(QEnum):
Expand Down Expand Up @@ -83,6 +87,18 @@ def __repr__(self):
", ".join([repr(v) for v in self._all_values]),
)

@property
@abc.abstractmethod
def qstate(self) -> QState:
raise NotImplementedError


class ProcessPlacement(QEnum):
NO_CONSTRAINTS = "NO_CONSTRAINTS"
SCATTERED = "SCATTERED"
SAME_NODE = "SAME_NODE"
EVENLY_DISTRIBUTED = "EVENLY_DISTRIBUTED"


@dataclass
class QResources(QBase):
Expand All @@ -97,49 +113,145 @@ class QResources(QBase):
Maximum amount of memory requested for a job.
nodes : int
Number of nodes requested for a job.
cpus_per_node : int
Number of cpus for each node requested for a job.
cores_per_cpu : int
Number of cores for each cpu requested for a job.
hyperthreading : int
Number of threads to be used (hyperthreading).
TODO: check this and how to combine with OpenMP environment. Also is it
something that needs to be passed down somewhere to the queueing system
(and thus, is it worth putting it here in the resources ?) ?
On PBS (zenobe) if you use to many processes with respect
to what you asked (in the case of a "shared" node), you get killed.
"""

queue_name: str = None
memory: int = 1024
nodes: int | list = 1
cpus_per_node: int = 1
cores_per_cpu: int = 1
hyperthreading: int = 1
queue_name: str | None = None
job_name: str | None = None
memory_per_thread: int | None = None
nodes: int | None = None
processes: int | None = None
processes_per_node: int | None = None
threads_per_process: int | None = None
gpus_per_job: int | None = None
time_limit: int | timedelta | None = None
account: str | None = None
qos: str | None = None
priority: int | str | None = None
output_filepath: str | Path | None = None
error_filepath: str | Path | None = None
process_placement: ProcessPlacement | None = None
email_address: str | None = None
rerunnable: bool | None = None

# TODO: how to allow heterogeneous resources (e.g. 1 node with 12 cores and
# 1 node with 4 cores or heterogeous memory requirements, e.g. "master"
# core needs more memory than the other ones)
project: str | None = None
njobs: int | None = None # for job arrays

kwargs: dict | None = None

class QJobInfo(QBase):
pass
def __post_init__(self):
if self.process_placement is None:
if self.processes and not self.processes_per_node and not self.nodes:
self.process_placement = ProcessPlacement.NO_CONSTRAINTS # type: ignore # due to QEnum
elif self.nodes and self.processes_per_node and not self.processes:
self.process_placement = ProcessPlacement.EVENLY_DISTRIBUTED
else:
msg = "When process_placement is None either define only nodes plus processes_per_node or only processes"
raise UnsupportedResourcesError(msg)

@classmethod
def no_constraints(cls, processes, **kwargs):
if "nodes" in kwargs or "processes_per_node" in kwargs:
msg = (
"nodes and processes_per_node are incompatible with no constraints jobs"
)
raise UnsupportedResourcesError(msg)
kwargs["process_placement"] = ProcessPlacement.NO_CONSTRAINTS
return cls(processes=processes, **kwargs)

@classmethod
def evenly_distributed(cls, nodes, processes_per_node, **kwargs):
if "processes" in kwargs:
msg = "processes is incompatible with evenly distributed jobs"
raise UnsupportedResourcesError(msg)
kwargs["process_placement"] = ProcessPlacement.EVENLY_DISTRIBUTED
return cls(nodes=nodes, processes_per_node=processes_per_node, **kwargs)

@classmethod
def scattered(cls, processes, **kwargs):
if "nodes" in kwargs or "processes_per_node" in kwargs:
msg = "nodes and processes_per_node are incompatible with scattered jobs"
raise UnsupportedResourcesError(msg)
kwargs["process_placement"] = ProcessPlacement.SCATTERED
return cls(processes=processes, **kwargs)

@classmethod
def same_node(cls, processes, **kwargs):
if "nodes" in kwargs or "processes_per_node" in kwargs:
msg = "nodes and processes_per_node are incompatible with same node jobs"
raise UnsupportedResourcesError(msg)
kwargs["process_placement"] = ProcessPlacement.SAME_NODE
return cls(processes=processes, **kwargs)

def get_processes_distribution(self) -> list:
# TODO consider moving this to the __post_init__
nodes = self.nodes
processes = self.processes
processes_per_node = self.processes_per_node
if self.process_placement == ProcessPlacement.SCATTERED:
if nodes is None:
nodes = processes
elif processes is None:
processes = nodes
elif nodes != processes:
msg = "ProcessPlacement.SCATTERED is incompatible with different values of nodes and processes"
raise UnsupportedResourcesError(msg)
if not nodes and not processes:
nodes = processes = 1

if processes_per_node not in (None, 1):
msg = f"ProcessPlacement.SCATTERED is incompatible with {self.processes_per_node} processes_per_node"
raise UnsupportedResourcesError(msg)
processes_per_node = 1
elif self.process_placement == ProcessPlacement.SAME_NODE:
if nodes not in (None, 1):
msg = f"ProcessPlacement.SAME_NODE is incompatible with {self.nodes} nodes"
raise UnsupportedResourcesError(msg)
nodes = 1
if processes is None:
processes = processes_per_node
elif processes_per_node is None:
processes_per_node = processes
elif processes_per_node != processes:
msg = "ProcessPlacement.SAME_NODE is incompatible with different values of nodes and processes"
raise UnsupportedResourcesError(msg)
if not processes_per_node and not processes:
processes_per_node = processes = 1
elif self.process_placement == ProcessPlacement.EVENLY_DISTRIBUTED:
if nodes is None:
nodes = 1
if processes:
msg = "ProcessPlacement.EVENLY_DISTRIBUTED is incompatible with processes attribute"
raise UnsupportedResourcesError(msg)
processes_per_node = processes_per_node or 1
elif self.process_placement == ProcessPlacement.NO_CONSTRAINTS:
if processes_per_node or nodes:
msg = "ProcessPlacement.NO_CONSTRAINTS is incompatible with processes_per_node and nodes attribute"
raise UnsupportedResourcesError(msg)
if not processes:
processes = 1

return [nodes, processes, processes_per_node]


@dataclass
class QOptions(QBase):
hold: bool = False
account: str = None
qos: str = None
priority: int = None
class QJobInfo(QBase):
memory: int | None = None # in Kb
memory_per_cpu: int | None = None # in Kb
nodes: int | None = None
cpus: int | None = None
threads_per_process: int | None = None
time_limit: int | None = None


@dataclass
class QJob(QBase):
name: str | None = None
qid: str | None = None
job_id: str | None = None
exit_status: int | None = None
state: QState | None = None # Standard
sub_state: QSubState | None = None
resources: QResources | None = None
job_info: QJobInfo | None = None
info: QJobInfo | None = None
account: str | None = None
runtime: int | None = None
queue_name: str | None = None
25 changes: 25 additions & 0 deletions src/qtoolkit/core/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
class QTKException(Exception):
"""
Base class for all the exceptions generated by qtoolkit.
"""


class CommandFailedError(QTKException):
"""
Exception raised when the execution of a command has failed,
typically by a non-zero return code.
"""


class OutputParsingError(QTKException):
"""
Exception raised when errors are recognized during the parsing
of the outputs of command.
"""


class UnsupportedResourcesError(QTKException):
"""
Exception raised when the resources requested are not supported
in qtoolkit for the chosen scheduler.
"""
24 changes: 22 additions & 2 deletions src/qtoolkit/host/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ def __init__(self, config: HostConfig | None = None) -> None:
# self.user = user

@abc.abstractmethod
def execute(self, command: str | list[str], stdin=None, stdout=None, stderr=None):
def execute(
self,
command: str | list[str],
workdir: str | Path | None = None,
# stdin=None,
# stdout=None,
# stderr=None,
):
"""Execute the given command on the host
Parameters
----------
command: str or list of str
Command to execute, as a str or list of str
workdir: str or None
path where the command will be executed.
stdin: None, PIPE or file-like
Standard input, /dev/null if None
stdout: None, PIPE or file-like
Expand All @@ -42,9 +51,20 @@ def execute(self, command: str | list[str], stdin=None, stdout=None, stderr=None
Local process object associated to the connection, if dryrun is False,
else None
"""
# TODO: define a common error that is raised or a returned in case the procedure
# fails to avoid handling different kind of errors for the different hosts
raise NotImplementedError

@abc.abstractmethod
def mkdir(self, directory, recursive: bool = True, exist_ok: bool = True):
def mkdir(self, directory, recursive: bool = True, exist_ok: bool = True) -> bool:
"""Create directory on the host."""
# TODO: define a common error that is raised or a returned in case the procedure
# fails to avoid handling different kind of errors for the different hosts
raise NotImplementedError

@abc.abstractmethod
def write_text_file(self, filepath, content):
"""Write content to a file on the host."""
# TODO: define a common error that is raised or a returned in case the procedure
# fails to avoid handling different kind of errors for the different hosts
raise NotImplementedError
28 changes: 21 additions & 7 deletions src/qtoolkit/host/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
from pathlib import Path

from qtoolkit.host.base import BaseHost
from qtoolkit.utils import cd


class LocalHost(BaseHost):
# def __init__(self, config):
# self.config = config
def execute(self, command):
def execute(self, command: str | list[str], workdir: str | Path | None = None):
"""Execute the given command on the host
Note that the command is executed with shell=True, so commands can
be exposed to command injection. Consider whether to escape part of
the input if it comes from external users.
Parameters
----------
command: str or list of str
Expand All @@ -26,13 +31,22 @@ def execute(self, command):
exit_code : int
Exit code of the command.
"""
if isinstance(command, str):
command = command.split()
proc = subprocess.run(command, capture_output=True)
if isinstance(command, (list, tuple)):
command = " ".join(command)
if not workdir:
workdir = Path.cwd()
else:
workdir = str(workdir)
with cd(workdir):
proc = subprocess.run(command, capture_output=True, shell=True)
return proc.stdout.decode(), proc.stderr.decode(), proc.returncode

def mkdir(self, directory, recursive=True, exist_ok=True) -> None:
Path(directory).mkdir(parents=recursive, exist_ok=exist_ok)
def mkdir(self, directory, recursive=True, exist_ok=True) -> bool:
try:
Path(directory).mkdir(parents=recursive, exist_ok=exist_ok)
except OSError:
return False
return True

def write_file(self, filepath, content) -> None:
def write_text_file(self, filepath, content) -> None:
Path(filepath).write_text(content)
Loading

0 comments on commit 6d054d8

Please sign in to comment.