Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proc: initial (local) process manager implementation #7048

Merged
merged 2 commits into from
Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added dvc/proc/__init__.py
Empty file.
20 changes: 20 additions & 0 deletions dvc/proc/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from dvc.exceptions import DvcException


class ProcessNotTerminatedError(DvcException):
def __init__(self, name):
super().__init__(f"Managed process '{name}' has not been terminated.")


class ProcessNotFoundError(DvcException):
def __init__(self, name):
super().__init__(f"Managed process '{name}' does not exist.")


class TimeoutExpired(DvcException):
def __init__(self, cmd, timeout):
super().__init__(
f"'{cmd}' did not complete before timeout '{timeout}'"
)
self.cmd = cmd
self.timeout = timeout
92 changes: 92 additions & 0 deletions dvc/proc/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Serverless process manager."""

import json
import logging
import os
from typing import Generator, List, Optional, Union

from shortuuid import uuid

from .process import ManagedProcess, ProcessInfo

logger = logging.getLogger(__name__)


class ProcessManager:
"""Manager for controlling background ManagedProcess(es).

Spawned process entries are kept in the manager directory until they
are explicitly removed (with remove() or cleanup()) so that return
value and log information can be accessed after a process has completed.
"""

def __init__(self, wdir: Optional[str] = None):
self.wdir = wdir or "."

def __iter__(self):
return self.processes()

def __getitem__(self, key: str) -> "ProcessInfo":
info_path = os.path.join(self.wdir, key, f"{key}.json")
try:
with open(info_path, encoding="utf-8") as fobj:
return ProcessInfo.from_dict(json.load(fobj))
except FileNotFoundError:
raise KeyError

def get(self, key: str, default=None):
try:
return self[key]
except KeyError:
return default
Comment on lines +26 to +41
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also in the future we probably want this to just mirror the standard dict/Mapping interface (and in processes()) so that we can return the paired name + info as needed instead of just the info from this initial implementation


def processes(self) -> Generator["ProcessInfo", None, None]:
if not os.path.exists(self.wdir):
return
for name in os.listdir(self.wdir):
try:
yield self[name]
except KeyError:
continue

def spawn(self, args: Union[str, List[str]], name: Optional[str] = None):
"""Run the given command in the background."""
name = name or uuid()
pid = ManagedProcess.spawn(
args,
wdir=os.path.join(self.wdir, name),
name=name,
)
logger.debug(
"Spawned managed process '%s' (PID: '%d')",
name,
pid,
)

def send_signal(self, name: str, signal: int):
"""Send `signal` to the specified named process."""
raise NotImplementedError

def kill(self, name: str):
"""Kill the specified named process."""
raise NotImplementedError

def terminate(self, name: str):
"""Terminate the specified named process."""
raise NotImplementedError

def remove(self, name: str, force: bool = False):
"""Remove the specified named process from this manager.

If the specified process is still running, it will be forcefully killed
if `force` is True`, otherwise an exception will be raised.

Raises:
ProcessNotTerminatedError if the specified process is still
running and was not forcefully killed.
"""
raise NotImplementedError

def cleanup(self):
"""Remove stale (terminated) processes from this manager."""
raise NotImplementedError
Comment on lines +66 to +92
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karajan1001 the next set of tasks will be to fill out these methods.

  • send_signal, terminate, kill are all related and would be grouped together in the first task. Implementation/behavior for these methods should work the same as they do in subprocess and multiprocessing (https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal).
  • remove and cleanup can be implemented once terminate/kill are done (since force-removing a child process directory will require killing the process if it is still running)

177 changes: 177 additions & 0 deletions dvc/proc/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
import json
import logging
import os
import shlex
import subprocess
from contextlib import AbstractContextManager
from dataclasses import asdict, dataclass
from typing import List, Optional, TextIO, Union

from funcy import cached_property
from shortuuid import uuid

from dvc.utils.fs import makedirs

from .exceptions import TimeoutExpired

logger = logging.getLogger(__name__)


@dataclass
class ProcessInfo:
pid: int
stdin: Optional[str]
stdout: Optional[str]
stderr: Optional[str]
returncode: Optional[int]

@classmethod
def from_dict(cls, d):
pmrowla marked this conversation as resolved.
Show resolved Hide resolved
return cls(**d)


class ManagedProcess(AbstractContextManager):
"""Run the specified command with redirected output.

stdout and stderr will both be redirected to <name>.out.
Interactive processes (requiring stdin input) are currently unsupported.

Parameters:
args: Command to be run.
wdir: If specified, redirected output files will be placed in `wdir`.
name: Name to use for this process, if not specified a UUID will be
generated instead.
"""

def __init__(
self,
args: Union[str, List[str]],
wdir: Optional[str] = None,
name: Optional[str] = None,
):
self.args: List[str] = (
shlex.split(args, posix=os.name == "posix")
if isinstance(args, str)
else list(args)
)
self.wdir = wdir
self.name = name or uuid()
self.returncode: Optional[int] = None
self._stdout: Optional[TextIO] = None
self._stderr: Optional[TextIO] = None

self._run()

def __exit__(self, exc_type, exc_value, traceback):
self.wait()

def _close_fds(self):
if self._stdout:
self._stdout.close()
self._stdout = None
if self._stderr:
self._stderr.close()
self._stderr = None

def _make_path(self, path: str) -> str:
return os.path.join(self.wdir, path) if self.wdir else path

@cached_property
def stdout_path(self) -> str:
return self._make_path(f"{self.name}.out")

@cached_property
def info_path(self) -> str:
return self._make_path(f"{self.name}.json")

@cached_property
def pidfile_path(self) -> str:
return self._make_path(f"{self.name}.pid")

@property
def info(self) -> "ProcessInfo":
return ProcessInfo(
pid=self.pid,
stdin=None,
stdout=self.stdout_path,
stderr=None,
returncode=self.returncode,
)

def _make_wdir(self):
if self.wdir:
makedirs(self.wdir, exist_ok=True)

def _dump(self):
self._make_wdir()
with open(self.info_path, "w", encoding="utf-8") as fobj:
json.dump(asdict(self.info), fobj)
with open(self.pidfile_path, "w", encoding="utf-8") as fobj:
fobj.write(str(self.pid))

def _run(self):
self._make_wdir()
logger.debug(
"Appending output to '%s'",
self.stdout_path,
)
self._stdout = open(self.stdout_path, "ab")
try:
self._proc = subprocess.Popen(
self.args,
stdin=subprocess.DEVNULL,
stdout=self._stdout,
stderr=subprocess.STDOUT,
close_fds=True,
shell=False,
)
self.pid: int = self._proc.pid
self._dump()
except Exception:
if self._proc is not None:
self._proc.kill()
self._close_fds()
raise

def wait(self, timeout: Optional[int] = None) -> Optional[int]:
"""Block until a process started with `run` has completed.

Raises:
TimeoutExpired if `timeout` was set and the process
did not terminate after `timeout` seconds.
"""
if self.returncode is not None:
return self.returncode
try:
self._proc.wait(timeout=timeout)
except subprocess.TimeoutExpired as exc:
raise TimeoutExpired(exc.cmd, exc.timeout) from exc
self.returncode = self._proc.returncode
self._close_fds()
self._dump()
return self.returncode

@classmethod
def spawn(cls, *args, **kwargs) -> Optional[int]:
"""Spawn a ManagedProcess command in the background.

Returns: The spawned process PID.
"""
import multiprocessing as mp

proc = mp.Process(
target=cls._spawn,
args=args,
kwargs=kwargs,
daemon=True,
)
proc.start()
# Do not terminate the child daemon when the main process exits
# pylint: disable=protected-access
mp.process._children.discard(proc) # type: ignore[attr-defined]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of a hack, but even if we subclassed multiprocessing.Process we would just end up with an overridden

def start(self):
    super().start()
    _children.discard(self)

which I'm not sure is any cleaner

But in general, the idea behind using multiprocessing is that it allows us to use spawn instead of fork on windows + mac (#4294), and also allows us to directly run the target python function we want instead of adding the overhead from using dvc.daemon + a new dedicated CLI command

return proc.pid

@classmethod
def _spawn(cls, *args, **kwargs):
with cls(*args, **kwargs):
pass
Empty file added tests/unit/proc/__init__.py
Empty file.
60 changes: 60 additions & 0 deletions tests/unit/proc/test_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json
import subprocess

import pytest

from dvc.proc.process import ManagedProcess, ProcessInfo

TEST_PID = 1234


@pytest.fixture(autouse=True)
def mock_popen(mocker):
mocker.patch(
"subprocess.Popen",
return_value=mocker.MagicMock(pid=TEST_PID, returncode=None),
)


@pytest.mark.parametrize(
"args",
[
"/bin/foo -o option",
["/bin/foo", "-o", "option"],
],
)
def test_init_args(tmp_dir, args, mocker):
expected = ["/bin/foo", "-o", "option"]
proc = ManagedProcess(args)
assert expected == proc.args


def test_run(tmp_dir, mocker):
proc = ManagedProcess("/bin/foo")
assert TEST_PID == proc.pid

with open(proc.info_path, encoding="utf-8") as fobj:
info = ProcessInfo.from_dict(json.load(fobj))
assert TEST_PID == info.pid


def test_wait(tmp_dir, mocker):
from dvc.proc.exceptions import TimeoutExpired

proc = ManagedProcess("/bin/foo")
proc._proc.wait = mocker.Mock(
side_effect=subprocess.TimeoutExpired("/bin/foo", 5)
)
with pytest.raises(TimeoutExpired):
proc.wait(5)

proc._proc.wait = mocker.Mock(return_value=None)
proc._proc.returncode = 0
assert 0 == proc.wait()
proc._proc.wait.assert_called_once()

# once subprocess return code is set, future ManagedProcess.wait() calls
# should not block
proc._proc.wait.reset_mock()
assert 0 == proc.wait()
proc._proc.wait.assert_not_called()