Skip to content

Commit

Permalink
run/repro: show command on dry-run, add ">" to script output (#5041)
Browse files Browse the repository at this point in the history
* run/repro: show command on dry-run, add ">" to script output

This rollbacks the earlier commit to not show command during
the dry-run, and also prepends ">" in the command script
instead of "$" sign.

* fix tests
  • Loading branch information
skshetry authored Dec 7, 2020
1 parent 69eb86a commit 5290739
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 58 deletions.
121 changes: 73 additions & 48 deletions dvc/stage/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,61 +45,90 @@ def warn_if_fish(executable):
)


@unlocked_repo
def cmd_run(stage, *args, checkpoint_func=None, **kwargs):
def _enforce_cmd_list(cmd):
assert cmd
return cmd if isinstance(cmd, list) else [cmd]


def prepare_kwargs(stage, checkpoint_func=None):
kwargs = {"cwd": stage.wdir, "env": fix_env(None), "close_fds": True}
cmd = stage.cmd if isinstance(stage.cmd, list) else [stage.cmd]
if checkpoint_func:
# indicate that checkpoint cmd is being run inside DVC
kwargs["env"].update(_checkpoint_env(stage))

if os.name == "nt":
kwargs["shell"] = True
executable = None
else:
# NOTE: when you specify `shell=True`, `Popen` [1] will default to
# `/bin/sh` on *nix and will add ["/bin/sh", "-c"] to your command.
# But we actually want to run the same shell that we are running
# from right now, which is usually determined by the `SHELL` env
# var. So instead, we compose our command on our own, making sure
# to include special flags to prevent shell from reading any
# configs and modifying env, which may change the behavior or the
# command we are running. See [2] for more info.
#
# [1] https://github.com/python/cpython/blob/3.7/Lib/subprocess.py
# #L1426
# [2] https://github.com/iterative/dvc/issues/2506
# #issuecomment-535396799
kwargs["shell"] = False
executable = os.getenv("SHELL") or "/bin/sh"
warn_if_fish(executable)
# NOTE: when you specify `shell=True`, `Popen` [1] will default to
# `/bin/sh` on *nix and will add ["/bin/sh", "-c"] to your command.
# But we actually want to run the same shell that we are running
# from right now, which is usually determined by the `SHELL` env
# var. So instead, we compose our command on our own, making sure
# to include special flags to prevent shell from reading any
# configs and modifying env, which may change the behavior or the
# command we are running. See [2] for more info.
#
# [1] https://github.com/python/cpython/blob/3.7/Lib/subprocess.py
# #L1426
# [2] https://github.com/iterative/dvc/issues/2506
# #issuecomment-535396799
kwargs["shell"] = True if os.name == "nt" else False
return kwargs


def display_command(cmd):
logger.info("%s %s", ">", cmd)


def get_executable():
return (os.getenv("SHELL") or "/bin/sh") if os.name != "nt" else None


def _run(stage, executable, cmd, checkpoint_func, **kwargs):
main_thread = isinstance(
threading.current_thread(),
threading._MainThread, # pylint: disable=protected-access
)
for _cmd in cmd:
logger.info("$ %s", _cmd)
old_handler = None
p = None

try:
p = subprocess.Popen(_make_cmd(executable, _cmd), **kwargs)
if main_thread:
old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
exec_cmd = _make_cmd(executable, cmd)
old_handler = None
p = None

killed = threading.Event()
with checkpoint_monitor(stage, checkpoint_func, p, killed):
p.communicate()
finally:
if old_handler:
signal.signal(signal.SIGINT, old_handler)
try:
p = subprocess.Popen(exec_cmd, **kwargs)
if main_thread:
old_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)

killed = threading.Event()
with checkpoint_monitor(stage, checkpoint_func, p, killed):
p.communicate()
finally:
if old_handler:
signal.signal(signal.SIGINT, old_handler)

retcode = None if not p else p.returncode
if retcode != 0:
if killed.is_set():
raise CheckpointKilledError(_cmd, retcode)
raise StageCmdFailedError(_cmd, retcode)
retcode = None if not p else p.returncode
if retcode != 0:
if killed.is_set():
raise CheckpointKilledError(cmd, retcode)
raise StageCmdFailedError(cmd, retcode)


def cmd_run(stage, dry=False, checkpoint_func=None):
logger.info(
"Running %s" "stage '%s':",
"callback " if stage.is_callback else "",
stage.addressing,
)
commands = _enforce_cmd_list(stage.cmd)
kwargs = prepare_kwargs(stage, checkpoint_func=checkpoint_func)
executable = get_executable()

if not dry:
warn_if_fish(executable)

for cmd in commands:
display_command(cmd)
if dry:
continue

_run(stage, executable, cmd, checkpoint_func=checkpoint_func, **kwargs)


def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs):
Expand All @@ -112,12 +141,8 @@ def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs):
except RunCacheNotFoundError:
pass

callback_str = "callback " if stage.is_callback else ""
logger.info(
"Running %s" "stage '%s':", callback_str, stage.addressing,
)
if not dry:
cmd_run(stage, checkpoint_func=checkpoint_func)
run = cmd_run if dry else unlocked_repo(cmd_run)
run(stage, dry=dry, checkpoint_func=checkpoint_func)


def _checkpoint_env(stage):
Expand Down
2 changes: 1 addition & 1 deletion tests/func/test_repro.py
Original file line number Diff line number Diff line change
Expand Up @@ -1307,4 +1307,4 @@ def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy, mocker):
stage.addressing: ["changed checksum"]
}
assert dvc.reproduce(stage.addressing)[0] == stage
m.assert_called_once_with(stage, checkpoint_func=None)
m.assert_called_once_with(stage, checkpoint_func=None, dry=False)
2 changes: 1 addition & 1 deletion tests/func/test_repro_multistage.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy, mocker):

assert dvc.status([target]) == {target: ["changed command"]}
assert dvc.reproduce(target)[0] == stage
m.assert_called_once_with(stage, checkpoint_func=None)
m.assert_called_once_with(stage, checkpoint_func=None, dry=False)


def test_repro_when_new_deps_is_added_in_dvcfile(tmp_dir, dvc, run_copy):
Expand Down
8 changes: 5 additions & 3 deletions tests/func/test_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dvc.dvcfile import SingleStageFile
from dvc.main import main
from dvc.output.local import LocalOutput
from dvc.repo import Repo
from dvc.repo import Repo, lock_repo
from dvc.stage import PipelineStage, Stage
from dvc.stage.exceptions import StageFileFormatError
from dvc.stage.run import run_stage
Expand Down Expand Up @@ -306,5 +306,7 @@ def test_stage_run_checkpoint(tmp_dir, dvc, mocker, checkpoint):
callback = mocker.Mock()
else:
callback = None
run_stage(stage, checkpoint_func=callback)
mock_cmd_run.assert_called_with(stage, checkpoint_func=callback)

with lock_repo(dvc):
run_stage(stage, checkpoint_func=callback)
mock_cmd_run.assert_called_with(stage, checkpoint_func=callback, dry=False)
21 changes: 16 additions & 5 deletions tests/unit/stage/test_run.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
import logging

import pytest

from dvc.stage import Stage
from dvc.stage.run import run_stage


def test_run_stage_dry(caplog):
@pytest.mark.parametrize(
"cmd, expected",
[
("mycmd arg1 arg2", ["> mycmd arg1 arg2"]),
(["mycmd1 arg1", "mycmd2 arg2"], ["> mycmd1 arg1", "> mycmd2 arg2"]),
],
)
def test_run_stage_dry(caplog, cmd, expected):
with caplog.at_level(level=logging.INFO, logger="dvc"):
stage = Stage(None, "stage.dvc", cmd="mycmd arg1 arg2")
stage = Stage(None, "stage.dvc", cmd=cmd)
run_stage(stage, dry=True)
assert caplog.messages == [
"Running callback stage 'stage.dvc':",
]

expected.insert(
0, "Running callback stage 'stage.dvc':",
)
assert caplog.messages == expected

0 comments on commit 5290739

Please sign in to comment.