Skip to content

Commit

Permalink
exp: use process manager dirs for serialized executors
Browse files Browse the repository at this point in the history
- partially refactor existing pidfile behavior into using proc manager
- old pidfiles are now json-serialized executor info files placed in
  proc manager subdirs (alongside proc manager pidfile/logs/etc)
- move multiprocessing based (attached) execution into separate
  execution path from future "detached" execution support
  • Loading branch information
pmrowla committed Dec 15, 2021
1 parent fa819b0 commit 25ef812
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 54 deletions.
6 changes: 5 additions & 1 deletion dvc/repo/experiments/executor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,9 @@ def filter_pipeline(stages):
"\t%s",
", ".join(untracked),
)
info.result_hash = exp_hash
info.result_ref = ref
info.result_force = repro_force

# ideally we would return stages here like a normal repro() call, but
# stages is not currently picklable and cannot be returned across
Expand Down Expand Up @@ -528,7 +531,8 @@ def _repro_dvc(
raise
finally:
if infofile is not None:
remove(infofile)
with modify_json(infofile) as d:
d.update(info.asdict())
dvc.close()
os.chdir(old_cwd)

Expand Down
130 changes: 82 additions & 48 deletions dvc/repo/experiments/executor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os
from abc import ABC
from collections import defaultdict, deque
from typing import TYPE_CHECKING, Deque, Dict, Optional, Tuple, Type
from collections.abc import Mapping
from typing import TYPE_CHECKING, Deque, Dict, Generator, Optional, Tuple, Type

from dvc.proc.manager import ProcessManager

Expand All @@ -27,7 +28,7 @@
logger = logging.getLogger(__name__)


class BaseExecutorManager(ABC):
class BaseExecutorManager(ABC, Mapping):
"""Manages executors for a collection of experiments to be run."""

EXECUTOR_CLS: Type = BaseExecutor
Expand All @@ -42,49 +43,68 @@ def __init__(
self.scm = scm
makedirs(wdir, exist_ok=True)
self.wdir = wdir
self.executors = self._load_infos()
self._queue: Deque[Tuple[str, "BaseExecutor"]] = deque()
self.proc = ProcessManager(self.pid_dir)
self._attached: Dict[str, "BaseExecutor"] = {}
self._detached: Dict[str, "BaseExecutor"] = dict(self._load_infos())
self._queue: Deque[Tuple[str, "BaseExecutor"]] = deque()

def __getitem__(self, key: str) -> "BaseExecutor":
try:
return self._attached[key]
except KeyError:
pass
return self._detached[key]

def __iter__(self):
yield from self._attached
yield from self._detached

def __len__(self):
return len(self._attached) + len(self._detached)

@property
def pid_dir(self) -> str:
return os.path.join(self.wdir, EXEC_PID_DIR)

def enqueue(self, rev: str, executor: "BaseExecutor"):
assert rev not in self
self._queue.append((rev, executor))

def _load_infos(self) -> Dict[str, "BaseExecutor"]:
# TODO: pending process manager changes
# import json
# from urllib.parse import urlparse
# from .base import ExecutorInfo
# from .ssh import SSHExecutor

# def make_executor(info: "ExecutorInfo"):
# if info.git_url:
# scheme = urlparse(info.git_url)
# if scheme == "file":
# cls = TempDirExecutor
# elif scheme == "ssh":
# cls = SSHExecutor
# else:
# raise NotImplementedError
# else:
# cls = WorkspaceExecutor
# return cls.from_info(info)

infos: Dict[str, "BaseExecutor"] = {}
# for name in self.proc.processes():
# infofile = os.path.join(
# name, f"{name}{BaseExecutor.INFOFILE_EXT}"
# )
# try:
# with open(infofile, encoding="utf-8") as fobj:
# info = ExecutorInfo.from_dict(json.load(fobj))
# infos[name] = make_executor(info)
# except OSError:
# continue
return infos
def _load_infos(self) -> Generator[Tuple[str, "BaseExecutor"], None, None]:
import json
from urllib.parse import urlparse

from .base import ExecutorInfo
from .ssh import SSHExecutor

def make_executor(info: "ExecutorInfo"):
if info.git_url:
scheme = urlparse(info.git_url).scheme
if scheme == "file":
cls: Type = TempDirExecutor
elif scheme == "ssh":
cls = SSHExecutor
else:
raise NotImplementedError
else:
cls = WorkspaceExecutor
return cls.from_info(info)

for name in self.proc:
infofile = self.get_infofile_path(name)
try:
with open(infofile, encoding="utf-8") as fobj:
info = ExecutorInfo.from_dict(json.load(fobj))
yield name, make_executor(info)
except OSError:
continue

def get_infofile_path(self, name: str) -> str:
return os.path.join(
self.pid_dir,
name,
f"{name}{BaseExecutor.INFOFILE_EXT}",
)

@classmethod
def from_stash_entries(
Expand Down Expand Up @@ -120,8 +140,14 @@ def from_stash_entries(
scm.remove_ref(ref)
return manager

def exec_queue(self, jobs: Optional[int] = 1):
def exec_queue(self, jobs: Optional[int] = 1, detach: bool = False):
"""Run dvc repro for queued executors in parallel."""
if detach:
raise NotImplementedError
# TODO use ProcessManager.spawn() to support detached runs
return self._exec_attached(jobs=jobs)

def _exec_attached(self, jobs: Optional[int] = 1):
import signal
from concurrent.futures import (
CancelledError,
Expand All @@ -141,10 +167,7 @@ def exec_queue(self, jobs: Optional[int] = 1):
futures = {}
while self._queue:
rev, executor = self._queue.popleft()
infofile = os.path.join(
self.pid_dir,
f"{rev}{executor.INFOFILE_EXT}",
)
infofile = self.get_infofile_path(rev)
future = workers.submit(
executor.reproduce,
info=executor.info,
Expand All @@ -154,6 +177,7 @@ def exec_queue(self, jobs: Optional[int] = 1):
log_level=logger.getEffectiveLevel(),
)
futures[future] = (rev, executor)
self._attached[rev] = executor

try:
wait(futures)
Expand Down Expand Up @@ -194,7 +218,7 @@ def exec_queue(self, jobs: Optional[int] = 1):
rev[:7],
)
finally:
executor.cleanup()
self.cleanup_executor(rev, executor)

return result

Expand Down Expand Up @@ -227,6 +251,16 @@ def on_diverged(ref: str, checkpoint: bool):

return results

def cleanup_executor(self, rev: str, executor: "BaseExecutor"):
from dvc.utils.fs import remove

executor.cleanup()
try:
self.proc.remove(rev)
except KeyError:
pass
remove(os.path.join(self.pid_dir, rev))


class TempDirExecutorManager(BaseExecutorManager):
EXECUTOR_CLS = TempDirExecutor
Expand Down Expand Up @@ -272,7 +306,7 @@ def _collect_executor(self, executor, exec_result) -> Dict[str, str]:
results[exp_rev] = exec_result.exp_hash
return results

def exec_queue(self, jobs: Optional[int] = 1):
def exec_queue(self, jobs: Optional[int] = 1, detach: bool = False):
"""Run a single WorkspaceExecutor.
Workspace execution is done within the main DVC process
Expand All @@ -282,12 +316,12 @@ def exec_queue(self, jobs: Optional[int] = 1):
from dvc.stage.monitor import CheckpointKilledError

assert len(self._queue) == 1
assert not detach
result: Dict[str, Dict[str, str]] = defaultdict(dict)
rev, executor = self._queue.popleft()
infofile = os.path.join(
self.pid_dir,
f"{rev}{executor.INFOFILE_EXT}",
)

exec_name = "workspace"
infofile = self.get_infofile_path(exec_name)
try:
exec_result = executor.reproduce(
info=executor.info,
Expand All @@ -313,5 +347,5 @@ def exec_queue(self, jobs: Optional[int] = 1):
f"Failed to reproduce experiment '{rev[:7]}'"
) from exc
finally:
executor.cleanup()
self.cleanup_executor(exec_name, executor)
return result
18 changes: 13 additions & 5 deletions tests/func/experiments/test_show.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,9 +387,13 @@ def test_show_sort(tmp_dir, scm, dvc, exp_stage, caplog):

def test_show_running_workspace(tmp_dir, scm, dvc, exp_stage, capsys):
pid_dir = os.path.join(dvc.tmp_dir, EXEC_TMP_DIR, EXEC_PID_DIR)
makedirs(pid_dir, True)
info = make_executor_info(location=BaseExecutor.DEFAULT_LOCATION)
pidfile = os.path.join(pid_dir, f"workspace{BaseExecutor.INFOFILE_EXT}")
pidfile = os.path.join(
pid_dir,
"workspace",
f"workspace{BaseExecutor.INFOFILE_EXT}",
)
makedirs(os.path.dirname(pidfile), True)
(tmp_dir / pidfile).dump_json(info.asdict())

assert dvc.experiments.show()["workspace"] == {
Expand Down Expand Up @@ -418,9 +422,13 @@ def test_show_running_executor(tmp_dir, scm, dvc, exp_stage):
exp_rev = dvc.experiments.scm.resolve_rev(f"{EXPS_STASH}@{{0}}")

pid_dir = os.path.join(dvc.tmp_dir, EXEC_TMP_DIR, EXEC_PID_DIR)
makedirs(pid_dir, True)
info = make_executor_info(location=BaseExecutor.DEFAULT_LOCATION)
pidfile = os.path.join(pid_dir, f"{exp_rev}{BaseExecutor.INFOFILE_EXT}")
pidfile = os.path.join(
pid_dir,
exp_rev,
f"{exp_rev}{BaseExecutor.INFOFILE_EXT}",
)
makedirs(os.path.dirname(pidfile), True)
(tmp_dir / pidfile).dump_json(info.asdict())

results = dvc.experiments.show()
Expand Down Expand Up @@ -450,7 +458,6 @@ def test_show_running_checkpoint(
exp_ref = first(exp_refs_by_rev(scm, checkpoint_rev))

pid_dir = os.path.join(dvc.tmp_dir, EXEC_TMP_DIR, EXEC_PID_DIR)
makedirs(pid_dir, True)
executor = (
BaseExecutor.DEFAULT_LOCATION
if workspace
Expand All @@ -463,6 +470,7 @@ def test_show_running_checkpoint(
)
rev = "workspace" if workspace else stash_rev
pidfile = os.path.join(pid_dir, f"{rev}{BaseExecutor.INFOFILE_EXT}")
makedirs(os.path.dirname(pidfile), True)
(tmp_dir / pidfile).dump_json(info.asdict())

mocker.patch.object(
Expand Down

0 comments on commit 25ef812

Please sign in to comment.