Skip to content

Commit

Permalink
Merge branch 'master' of github.com:iterative/dvc into 4504-conf-job-…
Browse files Browse the repository at this point in the history
…limit

* 'master' of github.com:iterative/dvc:
  dag: add --outs option (iterative#4739)
  Add test server and tests for webdav (iterative#4827)
  Simpler param updates with python-benedict (iterative#4780)
  checkpoints: set DVC_ROOT environment variable (iterative#4877)
  api: add support for simple wildcards (iterative#4864)
  tests: mark azure test as flaky (iterative#4881)
  setup.py: limit responses version for moto (iterative#4879)
  remote: avoid chunking on webdav. Fixes iterative#4796 (iterative#4828)
  checkpoints: `exp run` and `exp res[ume]` refactor (iterative#4855)
  • Loading branch information
I159 committed Nov 12, 2020
2 parents 05da696 + eeba567 commit df03f57
Show file tree
Hide file tree
Showing 25 changed files with 770 additions and 522 deletions.
5 changes: 3 additions & 2 deletions dvc/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,13 @@ def make_checkpoint():
import builtins
from time import sleep

from dvc.env import DVC_CHECKPOINT, DVC_ROOT
from dvc.stage.run import CHECKPOINT_SIGNAL_FILE

if os.getenv("DVC_CHECKPOINT") is None:
if os.getenv(DVC_CHECKPOINT) is None:
return

root_dir = Repo.find_root()
root_dir = os.getenv(DVC_ROOT, Repo.find_root())
signal_file = os.path.join(
root_dir, Repo.DVC_DIR, "tmp", CHECKPOINT_SIGNAL_FILE
)
Expand Down
7 changes: 7 additions & 0 deletions dvc/command/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def run(self):
no_commit=self.args.no_commit,
fname=self.args.file,
external=self.args.external,
glob=self.args.glob,
)

except DvcException:
Expand Down Expand Up @@ -57,6 +58,12 @@ def add_parser(subparsers, parent_parser):
default=False,
help="Allow targets that are outside of the DVC repository.",
)
parser.add_argument(
"--glob",
action="store_true",
default=False,
help="Allows targets containing shell-style wildcards.",
)
parser.add_argument(
"--file",
help="Specify name of the DVC-file this command will generate.",
Expand Down
37 changes: 33 additions & 4 deletions dvc/command/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _show_dot(G):
return dot_file.getvalue()


def _build(G, target=None, full=False):
def _build(G, target=None, full=False, outs=False):
import networkx as nx

from dvc.repo.graph import get_pipeline, get_pipelines
Expand All @@ -44,8 +44,25 @@ def _build(G, target=None, full=False):
else:
H = G

def _relabel(stage):
return stage.addressing
if outs:
G = nx.DiGraph()
for stage in H.nodes:
G.add_nodes_from(stage.outs)

for from_stage, to_stage in nx.edge_dfs(H):
G.add_edges_from(
[
(from_out, to_out)
for from_out in from_stage.outs
for to_out in to_stage.outs
]
)
H = G

def _relabel(node):
from dvc.stage import Stage

return node.addressing if isinstance(node, Stage) else str(node)

return nx.relabel_nodes(H, _relabel, copy=False)

Expand All @@ -64,7 +81,12 @@ def run(self):
return 1
target = stages[0]

G = _build(self.repo.graph, target=target, full=self.args.full,)
G = _build(
self.repo.graph,
target=target,
full=self.args.full,
outs=self.args.outs,
)

if self.args.dot:
logger.info(_show_dot(G))
Expand Down Expand Up @@ -108,6 +130,13 @@ def add_parser(subparsers, parent_parser):
"showing DAG consisting only of ancestors."
),
)
dag_parser.add_argument(
"-o",
"--outs",
action="store_true",
default=False,
help="Print output files instead of stages.",
)
dag_parser.add_argument(
"target",
nargs="?",
Expand Down
122 changes: 57 additions & 65 deletions dvc/command/experiments.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from dvc.command.repro import CmdRepro
from dvc.command.repro import add_arguments as add_repro_arguments
from dvc.exceptions import DvcException, InvalidArgumentError
from dvc.repo.experiments import Experiments
from dvc.utils.flatten import flatten

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -443,14 +444,6 @@ def run(self):
elif not self.args.targets:
self.args.targets = self.default_targets

if (
self.args.checkpoint_reset
and self.args.checkpoint_continue is not None
):
raise InvalidArgumentError(
"--continue and --reset cannot be used together"
)

ret = 0
for target in self.args.targets:
try:
Expand All @@ -460,13 +453,7 @@ def run(self):
run_all=self.args.run_all,
jobs=self.args.jobs,
params=self.args.params,
checkpoint=(
self.args.checkpoint
or self.args.checkpoint_continue is not None
or self.args.checkpoint_reset
),
checkpoint_continue=self.args.checkpoint_continue,
checkpoint_reset=self.args.checkpoint_reset,
checkpoint_resume=self.args.checkpoint_resume,
**self._repro_kwargs,
)
except DvcException:
Expand Down Expand Up @@ -738,65 +725,38 @@ def add_parser(subparsers, parent_parser):
help=EXPERIMENTS_RUN_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# inherit arguments from `dvc repro`
add_repro_arguments(experiments_run_parser)
experiments_run_parser.add_argument(
"--params",
action="append",
default=[],
help="Use the specified param values when reproducing pipelines.",
metavar="[<filename>:]<params_list>",
)
experiments_run_parser.add_argument(
"--queue",
action="store_true",
default=False,
help="Stage this experiment in the run queue for future execution.",
)
_add_run_common(experiments_run_parser)
experiments_run_parser.add_argument(
"--run-all",
action="store_true",
default=False,
help="Execute all experiments in the run queue.",
)
experiments_run_parser.add_argument(
"-j",
"--jobs",
type=int,
help="Run the specified number of experiments at a time in parallel.",
metavar="<number>",
"--checkpoint-resume", type=str, default=None, help=argparse.SUPPRESS,
)
experiments_run_parser.add_argument(
"--checkpoint",
action="store_true",
default=False,
help="Reproduce pipelines as a checkpoint experiment.",
experiments_run_parser.set_defaults(func=CmdExperimentsRun)

EXPERIMENTS_RESUME_HELP = "Resume checkpoint experiments."
experiments_resume_parser = experiments_subparsers.add_parser(
"resume",
parents=[parent_parser],
aliases=["res"],
description=append_doc_link(
EXPERIMENTS_RESUME_HELP, "experiments/resume"
),
help=EXPERIMENTS_RESUME_HELP,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
experiments_run_parser.add_argument(
"--continue",
_add_run_common(experiments_resume_parser)
experiments_resume_parser.add_argument(
"-r",
"--rev",
type=str,
nargs="?",
default=None,
const=":last",
dest="checkpoint_continue",
default=Experiments.LAST_CHECKPOINT,
dest="checkpoint_resume",
help=(
"Continue from the specified checkpoint experiment "
"(implies --checkpoint). If no experiment revision is provided, "
"Continue the specified checkpoint experiment. "
"If no experiment revision is provided, "
"the most recently run checkpoint experiment will be used."
),
metavar="<experiment_rev>",
)
experiments_run_parser.add_argument(
"--reset",
action="store_true",
default=False,
dest="checkpoint_reset",
help=(
"Reset checkpoint experiment if it already exists "
"(implies --checkpoint)."
),
)
experiments_run_parser.set_defaults(func=CmdExperimentsRun)
experiments_resume_parser.set_defaults(func=CmdExperimentsRun)

EXPERIMENTS_GC_HELP = "Garbage collect unneeded experiments."
EXPERIMENTS_GC_DESCRIPTION = (
Expand Down Expand Up @@ -856,3 +816,35 @@ def add_parser(subparsers, parent_parser):
help="Force garbage collection - automatically agree to all prompts.",
)
experiments_gc_parser.set_defaults(func=CmdExperimentsGC)


def _add_run_common(parser):
"""Add common args for 'exp run' and 'exp resume'."""
# inherit arguments from `dvc repro`
add_repro_arguments(parser)
parser.add_argument(
"--params",
action="append",
default=[],
help="Use the specified param values when reproducing pipelines.",
metavar="[<filename>:]<params_list>",
)
parser.add_argument(
"--queue",
action="store_true",
default=False,
help="Stage this experiment in the run queue for future execution.",
)
parser.add_argument(
"--run-all",
action="store_true",
default=False,
help="Execute all experiments in the run queue.",
)
parser.add_argument(
"-j",
"--jobs",
type=int,
help="Run the specified number of experiments at a time in parallel.",
metavar="<number>",
)
2 changes: 2 additions & 0 deletions dvc/env.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
DVC_CHECKPOINT = "DVC_CHECKPOINT"
DVC_DAEMON = "DVC_DAEMON"
DVC_PAGER = "DVC_PAGER"
DVC_ROOT = "DVC_ROOT"
35 changes: 29 additions & 6 deletions dvc/repo/add.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
@locked
@scm_context
def add(
repo, targets, recursive=False, no_commit=False, fname=None, external=False
repo,
targets,
recursive=False,
no_commit=False,
fname=None,
external=False,
glob=False,
):
if recursive and fname:
raise RecursiveAddingWhileUsingFilename()
Expand Down Expand Up @@ -57,7 +63,12 @@ def add(
)

stages = _create_stages(
repo, sub_targets, fname, pbar=pbar, external=external
repo,
sub_targets,
fname,
pbar=pbar,
external=external,
glob=glob,
)

try:
Expand Down Expand Up @@ -149,15 +160,27 @@ def _find_all_targets(repo, target, recursive):
return [target]


def _create_stages(repo, targets, fname, pbar=None, external=False):
def _create_stages(
repo, targets, fname, pbar=None, external=False, glob=False
):
from glob import iglob

from dvc.stage import Stage, create_stage

stages = []
if glob:
expanded_targets = [
exp_target
for target in targets
for exp_target in iglob(target, recursive=True)
]
else:
expanded_targets = targets

stages = []
for out in Tqdm(
targets,
expanded_targets,
desc="Creating DVC-files",
disable=len(targets) < LARGE_DIR_SIZE,
disable=len(expanded_targets) < LARGE_DIR_SIZE,
unit="file",
):
path, wdir, out = resolve_paths(repo, out)
Expand Down
Loading

0 comments on commit df03f57

Please sign in to comment.