diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index a8d3d0008e..1eb7b281c5 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -8,25 +8,15 @@ logger = logging.getLogger(__name__) -def _stage_repr(stage): - from dvc.stage import PipelineStage - - return ( - "{}:{}".format(stage.relpath, stage.name) - if isinstance(stage, PipelineStage) - else stage.relpath - ) - - class CmdPipelineShow(CmdBase): def _show(self, target, commands, outs, locked): import networkx from dvc import dvcfile from dvc.utils import parse_target - path, name = parse_target(target) - stage = dvcfile.Dvcfile(self.repo, path).stages[name] - G = self.repo.pipeline_graph + path, name, tag = parse_target(target) + stage = dvcfile.Dvcfile(self.repo, path, tag=tag).stages[name] + G = self.repo.graph stages = networkx.dfs_postorder_nodes(G, stage) if locked: stages = [s for s in stages if s.locked] @@ -40,7 +30,7 @@ def _show(self, target, commands, outs, locked): for out in stage.outs: logger.info(str(out)) else: - logger.info(_stage_repr(stage)) + logger.info(stage.addressing) def _build_graph(self, target, commands=False, outs=False): import networkx @@ -48,8 +38,8 @@ def _build_graph(self, target, commands=False, outs=False): from dvc.repo.graph import get_pipeline from dvc.utils import parse_target - path, name = parse_target(target) - target_stage = dvcfile.Dvcfile(self.repo, path).stages[name] + path, name, tag = parse_target(target) + target_stage = dvcfile.Dvcfile(self.repo, path, tag=tag).stages[name] G = get_pipeline(self.repo.pipelines, target_stage) nodes = set() @@ -62,7 +52,7 @@ def _build_graph(self, target, commands=False, outs=False): for out in stage.outs: nodes.add(str(out)) else: - nodes.add(_stage_repr(stage)) + nodes.add(stage.addressing) edges = [] for from_stage, to_stage in networkx.edge_dfs(G, target_stage): @@ -75,7 +65,7 @@ def _build_graph(self, target, commands=False, outs=False): for to_out in to_stage.outs: edges.append((str(from_out), str(to_out))) else: - edges.append((_stage_repr(from_stage), _stage_repr(to_stage))) + edges.append((from_stage.addressing, to_stage.addressing)) return list(nodes), edges, networkx.is_tree(G) @@ -163,7 +153,7 @@ def run(self): pipelines = self.repo.pipelines for pipeline in pipelines: for stage in pipeline: - logger.info(_stage_repr(stage)) + logger.info(stage.addressing) if len(pipeline) != 0: logger.info("=" * 80) logger.info("{} pipelines total".format(len(pipelines))) diff --git a/dvc/dvcfile.py b/dvc/dvcfile.py index cd8d2c3cb2..9eb35d1942 100644 --- a/dvc/dvcfile.py +++ b/dvc/dvcfile.py @@ -1,5 +1,5 @@ +import contextlib import os -import re import logging import dvc.prompt as prompt @@ -7,11 +7,7 @@ from voluptuous import MultipleInvalid from dvc import serialize from dvc.exceptions import DvcException -from dvc.loader import SingleStageLoader, StageLoader -from dvc.schema import ( - COMPILED_SINGLE_STAGE_SCHEMA, - COMPILED_MULTI_STAGE_SCHEMA, -) +from dvc.stage.loader import SingleStageLoader, StageLoader from dvc.stage.exceptions import ( StageFileBadNameError, StageFileDoesNotExistError, @@ -20,6 +16,7 @@ StageFileAlreadyExistsError, ) from dvc.utils import relpath +from dvc.utils.collections import apply_diff from dvc.utils.stage import ( dump_stage_file, parse_stage, @@ -30,196 +27,244 @@ DVC_FILE = "Dvcfile" DVC_FILE_SUFFIX = ".dvc" -TAG_REGEX = r"^(?P.*)@(?P[^\\/@:]*)$" +PIPELINE_FILE = "pipelines.yaml" +PIPELINE_LOCK = "pipelines.lock" -class MultiStageFileLoadError(DvcException): - def __init__(self, file): - super().__init__("Cannot load multi-stage file: '{}'".format(file)) +class LockfileCorruptedError(DvcException): + def __init__(self, path): + super().__init__("Lockfile '{}' is corrupted.".format(path)) -class Dvcfile: - def __init__(self, repo, path): +def is_valid_filename(path): + return path.endswith(DVC_FILE_SUFFIX) or os.path.basename(path) in [ + DVC_FILE, + PIPELINE_FILE, + ] + + +def is_dvc_file(path): + return os.path.isfile(path) and ( + is_valid_filename(path) or os.path.basename(path) == PIPELINE_LOCK + ) + + +def check_dvc_filename(path): + if not is_valid_filename(path): + raise StageFileBadNameError( + "bad DVC-file name '{}'. DVC-files should be named " + "'Dvcfile' or have a '.dvc' suffix (e.g. '{}.dvc').".format( + relpath(path), os.path.basename(path) + ) + ) + + +class FileMixin: + SCHEMA = None + + def __init__(self, repo, path, **kwargs): self.repo = repo - self.path, self.tag = self._get_path_tag(path) + self.path = path def __repr__(self): return "{}: {}".format( - DVC_FILE, relpath(self.path, self.repo.root_dir) + self.__class__.__name__, relpath(self.path, self.repo.root_dir) ) + def __hash__(self): + return hash(self.path) + + def __eq__(self, other): + return self.repo == other.repo and os.path.abspath( + self.path + ) == os.path.abspath(other.path) + def __str__(self): - return "{}: {}".format(DVC_FILE, self.relpath) + return "{}: {}".format(self.__class__.__name__, self.relpath) @property def relpath(self): return relpath(self.path) + def exists(self): + return self.repo.tree.exists(self.path) + + def _load(self): + # it raises the proper exceptions by priority: + # 1. when the file doesn't exists + # 2. filename is not a DVC-file + # 3. path doesn't represent a regular file + if not self.exists(): + raise StageFileDoesNotExistError(self.path) + check_dvc_filename(self.path) + if not self.repo.tree.isfile(self.path): + raise StageFileIsNotDvcFileError(self.path) + + with self.repo.tree.open(self.path) as fd: + stage_text = fd.read() + d = parse_stage(stage_text, self.path) + self.validate(d, self.path) + return d, stage_text + + @classmethod + def validate(cls, d, fname=None): + assert cls.SCHEMA + try: + cls.SCHEMA(d) + except MultipleInvalid as exc: + raise StageFileFormatError(fname, exc) + + def remove_with_prompt(self, force=False): + raise NotImplementedError + + def remove(self, force=False): + with contextlib.suppress(FileNotFoundError): + os.unlink(self.path) + + def dump(self, stage, **kwargs): + raise NotImplementedError + + +class SingleStageFile(FileMixin): + from dvc.schema import COMPILED_SINGLE_STAGE_SCHEMA as SCHEMA + + def __init__(self, repo, path, tag=None): + super().__init__(repo, path) + self.tag = tag + @property def stage(self): data, raw = self._load() - if not self.is_multi_stage(data): - return SingleStageLoader.load_stage(self, data, raw) - raise MultiStageFileLoadError(self.path) - - @property - def lockfile(self): - return os.path.splitext(self.path)[0] + ".lock" + return SingleStageLoader.load_stage(self, data, raw, tag=self.tag) @property def stages(self): - from . import lockfile - data, raw = self._load() - if self.is_multi_stage(data): - lockfile_data = lockfile.load(self.repo, self.lockfile) - return StageLoader(self, data.get("stages", {}), lockfile_data) - return SingleStageLoader(self, data, raw) - - @classmethod - def is_valid_filename(cls, path): - return ( - path.endswith(DVC_FILE_SUFFIX) - or os.path.basename(path) == DVC_FILE - ) + return SingleStageLoader(self, data, raw, tag=self.tag) - @classmethod - def is_stage_file(cls, path): - return os.path.isfile(path) and cls.is_valid_filename(path) - - @classmethod - def check_dvc_filename(cls, path): - if not cls.is_valid_filename(path): - raise StageFileBadNameError( - "bad DVC-file name '{}'. DVC-files should be named " - "'Dvcfile' or have a '.dvc' suffix (e.g. '{}.dvc').".format( - relpath(path), os.path.basename(path) - ) - ) + def dump(self, stage, **kwargs): + """Dumps given stage appropriately in the dvcfile.""" + from dvc.stage import PipelineStage - def exists(self): - return self.repo.tree.exists(self.path) + assert not isinstance(stage, PipelineStage) + check_dvc_filename(self.path) + logger.debug( + "Saving information to '{file}'.".format(file=relpath(self.path)) + ) + dump_stage_file(self.path, serialize.to_single_stage_file(stage)) + self.repo.scm.track_file(relpath(self.path)) - def check_file_exists(self): + def remove_with_prompt(self, force=False): if not self.exists(): - raise StageFileDoesNotExistError(self.path) + return - def check_isfile(self): - if not self.repo.tree.isfile(self.path): - raise StageFileIsNotDvcFileError(self.path) + msg = ( + "'{}' already exists. Do you wish to run the command and " + "overwrite it?".format(relpath(self.path)) + ) + if not (force or prompt.confirm(msg)): + raise StageFileAlreadyExistsError(self.path) - @staticmethod - def _get_path_tag(s): - regex = re.compile(TAG_REGEX) - match = regex.match(s) - if not match: - return s, None - return match.group("path"), match.group("tag") + self.remove() - def dump(self, stage, update_dvcfile=False): - """Dumps given stage appropriately in the dvcfile.""" - from dvc.stage import create_stage, PipelineStage, Stage - if not isinstance(stage, PipelineStage): - self.dump_single_stage(stage) - return +class PipelineFile(FileMixin): + """Abstraction for pipelines file, .yaml + .lock combined.""" - self.dump_lockfile(stage) - if update_dvcfile and not stage.is_data_source: - self.dump_multistage_dvcfile(stage) + from dvc.schema import COMPILED_MULTI_STAGE_SCHEMA as SCHEMA - for out in filter(lambda o: o.use_cache, stage.outs): - s = create_stage( - Stage, - stage.repo, - os.path.join(stage.wdir, out.def_path + DVC_FILE_SUFFIX), - wdir=stage.wdir, - ) - s.outs = [out] - s.md5 = s._compute_md5() - Dvcfile(s.repo, s.path).dump_single_stage(s) + @property + def _lockfile(self): + return Lockfile(self.repo, os.path.splitext(self.path)[0] + ".lock") + + def dump(self, stage, update_pipeline=False, **kwargs): + """Dumps given stage appropriately in the dvcfile.""" + from dvc.stage import PipelineStage - def dump_lockfile(self, stage): - from . import lockfile + assert isinstance(stage, PipelineStage) + check_dvc_filename(self.path) + self._dump_lockfile(stage) + if update_pipeline and not stage.is_data_source: + self._dump_pipeline_file(stage) - lockfile.dump(self.repo, self.lockfile, serialize.to_lockfile(stage)) - self.repo.scm.track_file(relpath(self.lockfile)) + def _dump_lockfile(self, stage): + self._lockfile.dump(stage) - def dump_multistage_dvcfile(self, stage): + def _dump_pipeline_file(self, stage): data = {} if self.exists(): with open(self.path, "r") as fd: data = parse_stage_for_update(fd.read(), self.path) - if not self.is_multi_stage(data): - raise MultiStageFileLoadError(self.path) else: open(self.path, "w+").close() data["stages"] = data.get("stages", {}) - data["stages"].update(serialize.to_dvcfile(stage)) + stage_data = serialize.to_pipeline_file(stage) + if data["stages"].get(stage.name): + orig_stage_data = data["stages"][stage.name] + apply_diff(stage_data[stage.name], orig_stage_data) + else: + data["stages"].update(stage_data) - dump_stage_file(self.path, COMPILED_MULTI_STAGE_SCHEMA(data)) + dump_stage_file(self.path, data) self.repo.scm.track_file(relpath(self.path)) - def dump_single_stage(self, stage): - self.check_dvc_filename(self.path) - - logger.debug( - "Saving information to '{file}'.".format(file=relpath(self.path)) + @property + def stage(self): + raise DvcException( + "PipelineFile has multiple stages. Please specify it's name." ) - dump_stage_file(self.path, serialize.to_single_stage_file(stage)) - self.repo.scm.track_file(relpath(self.path)) + @property + def stages(self): + data, _ = self._load() + lockfile_data = self._lockfile.load() + return StageLoader(self, data.get("stages", {}), lockfile_data) - def _load(self): - # it raises the proper exceptions by priority: - # 1. when the file doesn't exists - # 2. filename is not a DVC-file - # 3. path doesn't represent a regular file - self.check_file_exists() - self.check_dvc_filename(self.path) - self.check_isfile() + def remove(self, force=False): + if not force: + logger.warning("Cannot remove pipeline file.") + return - with self.repo.tree.open(self.path) as fd: - stage_text = fd.read() - d = parse_stage(stage_text, self.path) - return d, stage_text + super().remove() + self._lockfile.remove() - @staticmethod - def validate_single_stage(d, fname=None): - Dvcfile._validate(COMPILED_SINGLE_STAGE_SCHEMA, d, fname) - @staticmethod - def validate_multi_stage(d, fname=None): - Dvcfile._validate(COMPILED_MULTI_STAGE_SCHEMA, d, fname) +class Lockfile(FileMixin): + from dvc.schema import COMPILED_LOCKFILE_SCHEMA as SCHEMA - @staticmethod - def _validate(schema, d, fname=None): + def load(self): + if not self.exists(): + return {} + with self.repo.tree.open(self.path) as fd: + data = parse_stage(fd.read(), self.path) try: - schema(d) - except MultipleInvalid as exc: - raise StageFileFormatError(fname, exc) - - def is_multi_stage(self, d=None): - if d is None: - d = self._load()[0] - check_multi_stage = d.get("stages") - if check_multi_stage: - self.validate_multi_stage(d, self.path) - return True - - self.validate_single_stage(d, self.path) - return False + self.validate(data, fname=self.path) + except StageFileFormatError: + raise LockfileCorruptedError(self.path) + return data - def remove_with_prompt(self, force=False): + def dump(self, stage, **kwargs): + stage_data = serialize.to_lockfile(stage) if not self.exists(): - return + data = stage_data + open(self.path, "w+").close() + else: + with self.repo.tree.open(self.path, "r") as fd: + data = parse_stage_for_update(fd.read(), self.path) + data.update(stage_data) + + dump_stage_file(self.path, data) + self.repo.scm.track_file(relpath(self.path)) - msg = ( - "'{}' already exists. Do you wish to run the command and " - "overwrite it?".format(relpath(self.path)) - ) - if not (force or prompt.confirm(msg)): - raise StageFileAlreadyExistsError(self.path) - os.unlink(self.path) +class Dvcfile: + def __new__(cls, repo, path, **kwargs): + assert path + assert repo + + _, ext = os.path.splitext(path) + if ext in [".yaml", ".yml"]: + return PipelineFile(repo, path, **kwargs) + # fallback to single stage file for better error messages + return SingleStageFile(repo, path, **kwargs) diff --git a/dvc/exceptions.py b/dvc/exceptions.py index 72d2154474..6e6a5e588e 100644 --- a/dvc/exceptions.py +++ b/dvc/exceptions.py @@ -25,10 +25,14 @@ def __init__(self, output, stages): assert all(hasattr(stage, "relpath") for stage in stages) msg = ( "file/directory '{}' is specified as an output in more than one " - "stage: {}\n" + "stage: \n{}\n" "This is not allowed. Consider using a different output name." - ).format(output, "\n ".join(s.relpath for s in stages)) + ).format( + output, "\n".join("\t{}".format(s.addressing) for s in stages) + ) super().__init__(msg) + self.stages = stages + self.output = output class OutputNotFoundError(DvcException): @@ -60,8 +64,8 @@ class StagePathAsOutputError(DvcException): def __init__(self, stage, output): assert isinstance(output, str) super().__init__( - "'{stage}' is within an output '{output}' of another stage".format( - stage=stage.relpath, output=output + "{stage} is within an output '{output}' of another stage".format( + stage=stage, output=output ) ) @@ -129,7 +133,9 @@ def __init__(self): class CyclicGraphError(DvcException): def __init__(self, stages): assert isinstance(stages, list) - stages = "\n".join("\t- {}".format(stage.relpath) for stage in stages) + stages = "\n".join( + "\t- {}".format(stage.addressing) for stage in stages + ) msg = ( "you've introduced a cycle in your pipeline that involves " "the following stages:" diff --git a/dvc/lockfile.py b/dvc/lockfile.py deleted file mode 100644 index 19b37317c3..0000000000 --- a/dvc/lockfile.py +++ /dev/null @@ -1,49 +0,0 @@ -import json - -from typing import TYPE_CHECKING -from collections import OrderedDict - -from dvc.exceptions import DvcException -from dvc.schema import COMPILED_LOCKFILE_SCHEMA -from voluptuous import MultipleInvalid - -if TYPE_CHECKING: - from dvc.repo import Repo - - -class LockfileCorruptedError(DvcException): - def __init__(self, path): - super().__init__("Lockfile '{}' is corrupted.".format(path)) - - -def exists(repo: "Repo", path: str) -> bool: - return repo.tree.exists(path) - - -def read(repo: "Repo", path: str) -> dict: - with repo.tree.open(path) as f: - return json.load(f, object_pairs_hook=OrderedDict) - - -def write(repo: "Repo", path: str, data: dict) -> None: - with repo.tree.open(path, "w+") as f: - json.dump(data, f) - - -def load(repo: "Repo", path: str) -> dict: - if not exists(repo, path): - return {} - try: - return COMPILED_LOCKFILE_SCHEMA(read(repo, path)) - except MultipleInvalid: - raise LockfileCorruptedError(path) - - -def dump(repo: "Repo", path: str, stage_data: dict): - if not exists(repo, path): - data = stage_data - else: - data = read(repo, path) - data.update(stage_data) - - write(repo, path, COMPILED_LOCKFILE_SCHEMA(data)) diff --git a/dvc/output/base.py b/dvc/output/base.py index 8f144bae37..191214136d 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -148,6 +148,10 @@ def supported(cls, url): def cache_path(self): return self.cache.checksum_to_path_info(self.checksum).url + @property + def checksum_type(self): + return self.remote.PARAM_CHECKSUM + @property def checksum(self): return self.info.get(self.remote.PARAM_CHECKSUM) @@ -156,6 +160,9 @@ def checksum(self): def checksum(self, checksum): self.info[self.remote.PARAM_CHECKSUM] = checksum + def get_checksum(self): + return self.remote.get_checksum(self.path_info) + @property def is_dir_checksum(self): return self.remote.is_dir_checksum(self.checksum) @@ -168,7 +175,7 @@ def save_info(self): return self.remote.save_info(self.path_info) def changed_checksum(self): - return self.checksum != self.remote.get_checksum(self.path_info) + return self.checksum != self.get_checksum() def changed_cache(self, filter_info=None): if not self.use_cache or not self.checksum: @@ -438,9 +445,9 @@ def get_used_cache(self, **kwargs): if self.exists: msg += ( "\n" - "You can also use `dvc commit {stage}` to associate " - "existing '{out}' with '{stage}'.".format( - out=self, stage=self.stage.relpath + "You can also use `dvc commit {stage.addressing}` " + "to associate existing '{out}' with {stage}.".format( + out=self, stage=self.stage ) ) logger.warning(msg) @@ -459,7 +466,7 @@ def get_used_cache(self, **kwargs): @classmethod def _validate_output_path(cls, path): - from dvc.dvcfile import Dvcfile + from dvc.dvcfile import is_valid_filename - if Dvcfile.is_valid_filename(path): + if is_valid_filename(path): raise cls.IsStageFileError(path) diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index c8d6631863..9040e0dffc 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -18,6 +18,7 @@ from dvc.remote.base import RemoteActionNotImplemented from dvc.utils.fs import path_isin from .graph import check_acyclic, get_pipeline, get_pipelines +from ..utils import parse_target def locked(f): @@ -175,7 +176,7 @@ def _ignore(self): self.scm.ignore_list(flist) - def check_modified_graph(self, new_stages, old_stages=None): + def check_modified_graph(self, new_stages): """Generate graph including the new stage to check for errors""" # Building graph might be costly for the ones with many DVC-files, # so we provide this undocumented hack to skip it. See [1] for @@ -190,30 +191,14 @@ def check_modified_graph(self, new_stages, old_stages=None): # # [1] https://github.com/iterative/dvc/issues/2671 if not getattr(self, "_skip_graph_checks", False): - self._collect_graph((old_stages or self.stages) + new_stages) + self._collect_graph(self.stages + new_stages) def _collect_inside(self, path, graph): import networkx as nx - stages = nx.dfs_postorder_nodes(graph or self.pipeline_graph) + stages = nx.dfs_postorder_nodes(graph) return [stage for stage in stages if path_isin(stage.path, path)] - def collect_for_pipelines( - self, path=None, name=None, recursive=False, graph=None - ): - from ..dvcfile import Dvcfile - - if not path: - return list(graph) if graph else self.pipeline_stages - - path = os.path.abspath(path) - if recursive and os.path.isdir(path): - return self._collect_inside(path, graph or self.pipeline_graph) - - dvcfile = Dvcfile(self, path) - dvcfile.check_file_exists() - return [dvcfile.stages[name]] - def collect(self, target, with_deps=False, recursive=False, graph=None): import networkx as nx from ..dvcfile import Dvcfile @@ -221,33 +206,38 @@ def collect(self, target, with_deps=False, recursive=False, graph=None): if not target: return list(graph) if graph else self.stages - target = os.path.abspath(target) - if recursive and os.path.isdir(target): - return self._collect_inside(target, graph or self.graph) - - stage = Dvcfile(self, target).stage + return self._collect_inside( + os.path.abspath(target), graph or self.graph + ) - # Optimization: do not collect the graph for a specific target + file, name, tag = parse_target(target) + dvcfile = Dvcfile(self, file, tag=tag) + stages = list(dvcfile.stages.filter(name).values()) if not with_deps: - return [stage] + return stages - pipeline = get_pipeline(get_pipelines(graph or self.graph), stage) - return list(nx.dfs_postorder_nodes(pipeline, stage)) + res = set() + for stage in stages: + pipeline = get_pipeline(get_pipelines(graph or self.graph), stage) + res.update(nx.dfs_postorder_nodes(pipeline, stage)) + return res def collect_granular(self, target, *args, **kwargs): - from ..dvcfile import Dvcfile + from ..dvcfile import Dvcfile, is_valid_filename if not target: return [(stage, None) for stage in self.stages] - # Optimization: do not collect the graph for a specific .dvc target - if Dvcfile.is_valid_filename(target) and not kwargs.get("with_deps"): - return [(Dvcfile(self, target).stage, None)] + file, name, tag = parse_target(target) + if is_valid_filename(file) and not kwargs.get("with_deps"): + # Optimization: do not collect the graph for a specific .dvc target + stages = Dvcfile(self, file, tag=tag).stages.filter(name) + return [(stage, None) for stage in stages.values()] try: - (out,) = self.find_outs_by_path(target, strict=False) - filter_info = PathInfo(os.path.abspath(target)) + (out,) = self.find_outs_by_path(file, strict=False) + filter_info = PathInfo(os.path.abspath(file)) return [(out.stage, filter_info)] except OutputNotFoundError: stages = self.collect(target, *args, **kwargs) @@ -383,9 +373,9 @@ def _collect_graph(self, stages): "rerun command with non overlapping outs paths." ).format( str(parent), - parent.stage.relpath, + parent.stage.addressing, str(overlapping), - overlapping.stage.relpath, + overlapping.stage.addressing, ) raise OverlappingOutputPathsError(parent, overlapping, msg) @@ -419,7 +409,7 @@ def graph(self): @cached_property def pipelines(self): - return get_pipelines(self.pipeline_graph) + return get_pipelines(self.graph) @cached_property def stages(self): @@ -432,60 +422,27 @@ def stages(self): NOTE: For large repos, this could be an expensive operation. Consider using some memoization. """ - return self._collect_stages()[0] - - @cached_property - def pipeline_stages(self): - return self._collect_stages()[1] - - @cached_property - def pipeline_graph(self): - return self._collect_graph(self.pipeline_stages) + return self._collect_stages() def _collect_stages(self): - from dvc.dvcfile import Dvcfile - from dvc.stage import PipelineStage + from dvc.dvcfile import Dvcfile, is_valid_filename - pipeline_stages = [] - output_stages = [] - ignored_outs = set() + stages = [] outs = set() for root, dirs, files in self.tree.walk(self.root_dir): - for fname in files: - path = os.path.join(root, fname) - if not Dvcfile.is_valid_filename(path): - continue - dvcfile = Dvcfile(self, path) - for stage in dvcfile.stages.values(): - if isinstance(stage, PipelineStage): - ignored_outs.update( - out.path_info for out in stage.outs - ) - pipeline_stages.append(stage) - else: - # Old single-stages are used for both - # outputs and pipelines. - output_stages.append(stage) - - outs.update( - out.fspath - for out in stage.outs - if out.scheme == "local" + for file_name in filter(is_valid_filename, files): + path = os.path.join(root, file_name) + stages.extend(list(Dvcfile(self, path).stages.values())) + outs.update( + out.fspath + for stage in stages + for out in ( + out for out in stage.outs if out.scheme == "local" ) - + ) dirs[:] = [d for d in dirs if os.path.join(root, d) not in outs] - - # DVC files are generated by multi-stage for data management. - # We need to ignore those stages for pipelines_stages, but still - # should be collected for output stages. - pipeline_stages.extend( - stage - for stage in output_stages - if not stage.outs - or all(out.path_info not in ignored_outs for out in stage.outs) - ) - return output_stages, pipeline_stages + return stages def find_outs_by_path(self, path, outs=None, recursive=False, strict=True): if not outs: @@ -582,5 +539,3 @@ def _reset(self): self.__dict__.pop("stages", None) self.__dict__.pop("pipelines", None) self.__dict__.pop("dvcignore", None) - self.__dict__.pop("pipeline_graph", None) - self.__dict__.pop("pipeline_stages", None) diff --git a/dvc/repo/add.py b/dvc/repo/add.py index 05fa020c1f..ae96520076 100644 --- a/dvc/repo/add.py +++ b/dvc/repo/add.py @@ -4,10 +4,11 @@ import colorama from . import locked -from ..dvcfile import Dvcfile +from dvc.dvcfile import Dvcfile, is_dvc_file from ..exceptions import ( RecursiveAddingWhileUsingFilename, OverlappingOutputPathsError, + OutputDuplicationError, ) from ..output.base import OutputDoesNotExistError from ..progress import Tqdm @@ -63,11 +64,15 @@ def add(repo, targets, recursive=False, no_commit=False, fname=None): ).format( out=exc.overlapping_out.path_info, parent=exc.parent.path_info, - parent_stage=exc.parent.stage.relpath, + parent_stage=exc.parent.stage.addressing, ) raise OverlappingOutputPathsError( exc.parent, exc.overlapping_out, msg ) + except OutputDuplicationError as exc: + raise OutputDuplicationError( + exc.output, set(exc.stages) - set(stages) + ) with Tqdm( total=len(stages), @@ -107,7 +112,7 @@ def _find_all_targets(repo, target, recursive): unit="file", ) if not repo.is_dvc_internal(fname) - if not Dvcfile.is_stage_file(fname) + if not is_dvc_file(fname) if not repo.scm.belongs_to_scm(fname) if not repo.scm.is_tracked(fname) ] diff --git a/dvc/repo/destroy.py b/dvc/repo/destroy.py index c180778beb..8fbdbee197 100644 --- a/dvc/repo/destroy.py +++ b/dvc/repo/destroy.py @@ -5,7 +5,8 @@ @locked def _destroy_stages(repo): for stage in repo.stages: - stage.remove(remove_outs=False) + stage.unprotect_outs() + stage.dvcfile.remove(force=True) # NOTE: not locking `destroy`, as `remove` will need to delete `.dvc` dir, diff --git a/dvc/repo/get.py b/dvc/repo/get.py index 81c90c7afb..0c450a1953 100644 --- a/dvc/repo/get.py +++ b/dvc/repo/get.py @@ -22,11 +22,11 @@ def __init__(self): @staticmethod def get(url, path, out=None, rev=None): from dvc.external_repo import external_repo - from dvc.dvcfile import Dvcfile + from dvc.dvcfile import is_valid_filename out = resolve_output(path, out) - if Dvcfile.is_valid_filename(out): + if is_valid_filename(out): raise GetDVCFileError() # Creating a directory right beside the output to make sure that they diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index dbf71c0aeb..a04581ff71 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -4,6 +4,7 @@ from dvc.repo.scm_context import scm_context from dvc.utils import resolve_output, resolve_paths, relpath from dvc.utils.fs import path_isin +from ..exceptions import OutputDuplicationError @locked_repo @@ -35,7 +36,10 @@ def imp_url(self, url, out=None, fname=None, erepo=None, locked=True): dvcfile = Dvcfile(self, stage.path) dvcfile.remove_with_prompt(force=True) - self.check_modified_graph([stage]) + try: + self.check_modified_graph([stage]) + except OutputDuplicationError as exc: + raise OutputDuplicationError(exc.output, set(exc.stages) - {stage}) stage.run() diff --git a/dvc/repo/lock.py b/dvc/repo/lock.py index 9e26941148..7845d34931 100644 --- a/dvc/repo/lock.py +++ b/dvc/repo/lock.py @@ -6,10 +6,10 @@ def lock(self, target, unlock=False): from .. import dvcfile from dvc.utils import parse_target - path, target = parse_target(target) - dvcfile = dvcfile.Dvcfile(self, path) - stage = dvcfile.stages[target] + path, name, tag = parse_target(target) + dvcfile = dvcfile.Dvcfile(self, path, tag=tag) + stage = dvcfile.stages[name] stage.locked = False if unlock else True - dvcfile.dump(stage, update_dvcfile=True) + dvcfile.dump(stage, update_pipeline=True) return stage diff --git a/dvc/repo/move.py b/dvc/repo/move.py index 4b27bc8175..aa58a42600 100644 --- a/dvc/repo/move.py +++ b/dvc/repo/move.py @@ -46,7 +46,7 @@ def move(self, from_path, to_path): stage = out.stage if not stage.is_data_source: - raise MoveNotDataSourceError(stage.relpath) + raise MoveNotDataSourceError(stage.addressing) stage_name = os.path.splitext(os.path.basename(stage.path))[0] from_name = os.path.basename(from_out.fspath) diff --git a/dvc/repo/remove.py b/dvc/repo/remove.py index c24ede47ef..f807270441 100644 --- a/dvc/repo/remove.py +++ b/dvc/repo/remove.py @@ -1,14 +1,22 @@ +import logging + from . import locked +from ..utils import parse_target + +logger = logging.getLogger(__name__) @locked def remove(self, target, outs_only=False): from ..dvcfile import Dvcfile - stage = Dvcfile(self, target).stage - if outs_only: + path, name, _ = parse_target(target) + dvcfile = Dvcfile(self, path) + stages = list(dvcfile.stages.filter(name).values()) + for stage in stages: stage.remove_outs(force=True) - else: - stage.remove(force=True) - return stage + if not outs_only: + dvcfile.remove() + + return stages diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index 8aa95af4a2..e77934e89e 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -12,8 +12,8 @@ def _reproduce_stage(stage, **kwargs): if stage.locked: logger.warning( - "DVC-file '{path}' is locked. Its dependencies are" - " not going to be reproduced.".format(path=stage.relpath) + "{} is locked. Its dependencies are" + " not going to be reproduced.".format(stage) ) stage = stage.reproduce(**kwargs) @@ -73,15 +73,15 @@ def reproduce( if not interactive: kwargs["interactive"] = self.config["core"].get("interactive", False) - active_graph = _get_active_graph(self.pipeline_graph) + active_graph = _get_active_graph(self.graph) active_pipelines = get_pipelines(active_graph) - path, name = parse_target(target) + path, name, tag = parse_target(target) if pipeline or all_pipelines: if all_pipelines: pipelines = active_pipelines else: - dvcfile = Dvcfile(self, path) + dvcfile = Dvcfile(self, path, tag=tag) stage = dvcfile.stages[name] pipelines = [get_pipeline(active_pipelines, stage)] @@ -91,9 +91,7 @@ def reproduce( if pipeline.in_degree(stage) == 0: targets.append(stage) else: - targets = self.collect_for_pipelines( - path, name=name, recursive=recursive, graph=active_graph - ) + targets = self.collect(target, recursive=recursive, graph=active_graph) return _reproduce_stages(active_graph, targets, **kwargs) diff --git a/dvc/repo/run.py b/dvc/repo/run.py index 806cc8da65..63e76b5c0f 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -2,13 +2,18 @@ from . import locked from .scm_context import scm_context -from ..exceptions import DvcException -from ..utils import relpath +from dvc.stage.exceptions import DuplicateStageName, InvalidStageName from funcy import first, concat +from ..exceptions import OutputDuplicationError -def _get_file_path(**kwargs): + +def is_valid_name(name: str): + return not {"\\", "/", "@", ":"} & set(name) + + +def _get_file_path(kwargs): from dvc.dvcfile import DVC_FILE_SUFFIX, DVC_FILE out = first( @@ -33,30 +38,36 @@ def _get_file_path(**kwargs): @scm_context def run(self, fname=None, no_exec=False, **kwargs): from dvc.stage import PipelineStage, Stage, create_stage - from dvc.dvcfile import DVC_FILE, Dvcfile + from dvc.dvcfile import Dvcfile, PIPELINE_FILE - stage_cls, path = PipelineStage, fname or DVC_FILE - if not kwargs.get("name"): + stage_cls = PipelineStage + path = PIPELINE_FILE + stage_name = kwargs.get("name") + if not stage_name: kwargs.pop("name", None) - stage_cls, path = Stage, fname or _get_file_path(**kwargs) + stage_cls = Stage + path = fname or _get_file_path(kwargs) + else: + if not is_valid_name(stage_name): + raise InvalidStageName stage = create_stage(stage_cls, repo=self, path=path, **kwargs) if stage is None: return None - dvcfile = Dvcfile(self, path) - if dvcfile.exists() and not dvcfile.is_multi_stage(): - if stage_cls == PipelineStage: - raise DvcException( - "'{}' is a single-stage dvcfile. Please use " - "`-f ` and try again.`.".format( - relpath(dvcfile.path) - ) - ) - dvcfile.remove_with_prompt(force=kwargs.get("overwrite", True)) - - self.check_modified_graph([stage], self.pipeline_stages) + dvcfile = Dvcfile(self, stage.path) + if dvcfile.exists(): + if stage_name and stage_name in dvcfile.stages: + raise DuplicateStageName(stage_name, dvcfile) + if stage_cls != PipelineStage: + dvcfile.remove_with_prompt(force=kwargs.get("overwrite", True)) + + try: + self.check_modified_graph([stage]) + except OutputDuplicationError as exc: + raise OutputDuplicationError(exc.output, set(exc.stages) - {stage}) + if not no_exec: stage.run(no_commit=kwargs.get("no_commit", False)) - dvcfile.dump(stage, update_dvcfile=True) + dvcfile.dump(stage, update_pipeline=True) return stage diff --git a/dvc/repo/status.py b/dvc/repo/status.py index 40780c66ec..402aca0996 100644 --- a/dvc/repo/status.py +++ b/dvc/repo/status.py @@ -16,10 +16,8 @@ def _joint_status(stages): for stage in stages: if stage.locked and not stage.is_repo_import: logger.warning( - "DVC-file '{path}' is locked. Its dependencies are" - " not going to be shown in the status output.".format( - path=stage.relpath - ) + "{} is locked. Its dependencies are" + " not going to be shown in the status output.".format(stage) ) status.update(stage.status(check_updates=True)) diff --git a/dvc/schema.py b/dvc/schema.py index fab8f17a00..3a1417542e 100644 --- a/dvc/schema.py +++ b/dvc/schema.py @@ -1,4 +1,5 @@ from dvc.stage.params import StageParams, OutputParams +from dvc.output import CHECKSUMS_SCHEMA from dvc import dependency, output from voluptuous import Any, Schema, Optional, Required @@ -16,10 +17,11 @@ StageParams.PARAM_ALWAYS_CHANGED: bool, } +DATA_SCHEMA = {**CHECKSUMS_SCHEMA, Required("path"): str} LOCK_FILE_STAGE_SCHEMA = { Required(StageParams.PARAM_CMD): str, - Required(StageParams.PARAM_DEPS): {str: output.CHECKSUM_SCHEMA}, - Required(StageParams.PARAM_OUTS): {str: output.CHECKSUM_SCHEMA}, + StageParams.PARAM_DEPS: [DATA_SCHEMA], + StageParams.PARAM_OUTS: [DATA_SCHEMA], } LOCKFILE_SCHEMA = {str: LOCK_FILE_STAGE_SCHEMA} diff --git a/dvc/serialize.py b/dvc/serialize.py index c6594aefbf..2e93729461 100644 --- a/dvc/serialize.py +++ b/dvc/serialize.py @@ -1,4 +1,3 @@ -from collections import OrderedDict from typing import TYPE_CHECKING from dvc.utils.collections import apply_diff @@ -22,7 +21,7 @@ def _get_outs(stage: "PipelineStage"): return outs_bucket -def to_dvcfile(stage: "PipelineStage"): +def to_pipeline_file(stage: "PipelineStage"): return { stage.name: { key: value @@ -39,34 +38,25 @@ def to_dvcfile(stage: "PipelineStage"): } -def to_lockfile(stage: "PipelineStage") -> OrderedDict: +def to_lockfile(stage: "PipelineStage") -> dict: assert stage.cmd assert stage.name - deps = OrderedDict( - [ - (dep.def_path, dep.remote.get_checksum(dep.path_info),) - for dep in stage.deps - if dep.remote.get_checksum(dep.path_info) - ] - ) - outs = OrderedDict( - [ - (out.def_path, out.remote.get_checksum(out.path_info),) - for out in stage.outs - if out.remote.get_checksum(out.path_info) - ] - ) - return OrderedDict( - [ - ( - stage.name, - OrderedDict( - [("cmd", stage.cmd), ("deps", deps,), ("outs", outs)] - ), - ) - ] - ) + res = {"cmd": stage.cmd} + deps = [ + {"path": dep.def_path, dep.checksum_type: dep.get_checksum()} + for dep in stage.deps + ] + outs = [ + {"path": out.def_path, out.checksum_type: out.get_checksum()} + for out in stage.outs + ] + if stage.deps: + res["deps"] = deps + if stage.outs: + res["outs"] = outs + + return {stage.name: res} def to_single_stage_file(stage: "Stage"): diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 8a085c7c1b..c101262aa3 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -54,11 +54,11 @@ def loads_from(cls, repo, path, wdir, data): def create_stage(cls, repo, path, **kwargs): - from dvc.dvcfile import Dvcfile + from dvc.dvcfile import check_dvc_filename wdir = os.path.abspath(kwargs.get("wdir", None) or os.curdir) path = os.path.abspath(path) - Dvcfile.check_dvc_filename(path) + check_dvc_filename(path) cls._check_stage_path(repo, wdir, is_wdir=kwargs.get("wdir")) cls._check_stage_path(repo, os.path.dirname(path)) @@ -147,9 +147,22 @@ def dvcfile(self, dvcfile): def __repr__(self): return "Stage: '{path}'".format( + path=self.path_in_repo if self.path else "No path" + ) + + def __str__(self): + return "stage: '{path}'".format( path=self.relpath if self.path else "No path" ) + @property + def addressing(self): + """ + Useful for alternative presentations where we don't need + `Stage:` prefix. + """ + return self.relpath + def __hash__(self): return hash(self.path_in_repo) @@ -173,9 +186,6 @@ def is_data_source(self): """Whether the DVC-file was created with `dvc add` or `dvc import`""" return self.cmd is None - def changed_md5(self): - return self.md5 != self._compute_md5() - @property def is_callback(self): """ @@ -202,9 +212,9 @@ def _changed_deps(self): if self.is_callback: logger.warning( - "DVC-file '{fname}' is a \"callback\" stage " + '{stage} is a "callback" stage ' "(has a command and no dependencies) and thus always " - "considered as changed.".format(fname=self.relpath) + "considered as changed.".format(stage=self) ) return True @@ -215,9 +225,9 @@ def _changed_deps(self): status = dep.status() if status: logger.debug( - "Dependency '{dep}' of '{stage}' changed because it is " + "Dependency '{dep}' of {stage} changed because it is " "'{status}'.".format( - dep=dep, stage=self.relpath, status=status[str(dep)] + dep=dep, stage=self, status=status[str(dep)] ) ) return True @@ -229,20 +239,20 @@ def _changed_outs(self): status = out.status() if status: logger.debug( - "Output '{out}' of '{stage}' changed because it is " + "Output '{out}' of {stage} changed because it is " "'{status}'".format( - out=out, stage=self.relpath, status=status[str(out)] + out=out, stage=self, status=status[str(out)] ) ) return True return False - def _changed_md5(self): - if self.changed_md5(): + def stage_changed(self, warn=False): + changed = self.md5 != self._compute_md5() + if changed and warn: logger.warning("DVC-file '{}' changed.".format(self.relpath)) - return True - return False + return changed @rwlocked(read=["deps", "outs"]) def changed(self): @@ -256,7 +266,9 @@ def changed(self): def _changed(self): # Short-circuit order: stage md5 is fast, deps are expected to change return ( - self._changed_md5() or self._changed_deps() or self._changed_outs() + self.stage_changed(warn=True) + or self._changed_deps() + or self._changed_outs() ) @rwlocked(write=["outs"]) @@ -267,8 +279,8 @@ def remove_outs(self, ignore_remove=False, force=False): out.unprotect() else: logger.debug( - "Removing output '{out}' of '{stage}'.".format( - out=out, stage=self.relpath + "Removing output '{out}' of {stage}.".format( + out=out, stage=self ) ) out.remove(ignore_remove=ignore_remove) @@ -283,7 +295,7 @@ def remove(self, force=False, remove_outs=True): self.remove_outs(ignore_remove=True, force=force) else: self.unprotect_outs() - os.unlink(self.path) + self.dvcfile.remove() @rwlocked(read=["deps"], write=["outs"]) def reproduce(self, interactive=False, **kwargs): @@ -292,8 +304,8 @@ def reproduce(self, interactive=False, **kwargs): return None msg = ( - "Going to reproduce '{stage}'. " - "Are you sure you want to continue?".format(stage=self.relpath) + "Going to reproduce {stage}. " + "Are you sure you want to continue?".format(stage=self) ) if interactive and not prompt.confirm(msg): @@ -301,7 +313,7 @@ def reproduce(self, interactive=False, **kwargs): self.run(**kwargs) - logger.debug("'{stage}' was reproduced".format(stage=self.relpath)) + logger.debug("{stage} was reproduced".format(stage=self)) return self @@ -470,7 +482,7 @@ def _compute_md5(self): OutputBase.PARAM_PERSIST, ], ) - logger.debug("Computed stage '{}' md5: '{}'".format(self.relpath, m)) + logger.debug("Computed {} md5: '{}'".format(self, m)) return m def save(self): @@ -494,19 +506,19 @@ def check_can_commit(self, force): changed_deps = self._changed_entries(self.deps) changed_outs = self._changed_entries(self.outs) - if changed_deps or changed_outs or self.changed_md5(): + if changed_deps or changed_outs or self.stage_changed(): msg = ( "dependencies {}".format(changed_deps) if changed_deps else "" ) msg += " and " if (changed_deps and changed_outs) else "" msg += "outputs {}".format(changed_outs) if changed_outs else "" msg += "md5" if not (changed_deps or changed_outs) else "" - msg += " of '{}' changed. ".format(self.relpath) + msg += " of {} changed. ".format(self) msg += "Are you sure you want to commit it?" if not force and not prompt.confirm(msg): raise StageCommitError( - "unable to commit changed '{}'. Use `-f|--force` to " - "force.".format(self.relpath) + "unable to commit changed {}. Use `-f|--force` to " + "force.".format(self) ) self.save() @@ -614,9 +626,7 @@ def run(self, dry=False, no_commit=False, force=False): if self.locked: logger.info( - "Verifying outputs in locked stage '{stage}'".format( - stage=self.relpath - ) + "Verifying outputs in locked {stage}".format(stage=self) ) if not dry: self.check_missing_outputs() @@ -630,14 +640,14 @@ def run(self, dry=False, no_commit=False, force=False): if not dry: if ( not force - and not self._changed_md5() + and not self.stage_changed(warn=True) and self._already_cached() ): self.outs[0].checkout() else: self.deps[0].download(self.outs[0]) elif self.is_data_source: - msg = "Verifying data sources in '{}'".format(self.relpath) + msg = "Verifying data sources in {}".format(self) logger.info(msg) if not dry: self.check_missing_outputs() @@ -708,6 +718,9 @@ def _status(entries): return ret + def stage_status(self): + return ["changed checksum"] if self.stage_changed() else [] + @rwlocked(read=["deps", "outs"]) def status(self, check_updates=False): ret = [] @@ -723,14 +736,12 @@ def status(self, check_updates=False): if outs_status: ret.append({"changed outs": outs_status}) - if self.changed_md5(): - ret.append("changed checksum") - + ret.extend(self.stage_status()) if self.is_callback or self.always_changed: ret.append("always changed") if ret: - return {self.relpath: ret} + return {self.addressing: ret} return {} @@ -757,10 +768,12 @@ def get_used_cache(self, *args, **kwargs): class PipelineStage(Stage): - def __init__(self, name=None, **kwargs): - super().__init__(**kwargs) + def __init__(self, *args, name=None, meta=None, **kwargs): + super().__init__(*args, **kwargs) self.name = name self.cmd_changed = False + # This is how the Stage will discover any discrepancies + self.meta = meta or {} def __eq__(self, other): return super().__eq__(other) and self.name == other.name @@ -773,10 +786,14 @@ def __repr__(self): path=self.relpath if self.path else "No path", name=self.name ) - def _changed(self): - if self.cmd_changed: - logger.warning("'cmd' of {} has changed.".format(self)) - return self.cmd_changed or self._changed_deps() or self._changed_outs() + def __str__(self): + return "stage: '{path}:{name}'".format( + path=self.relpath if self.path else "No path", name=self.name + ) + + @property + def addressing(self): + return super().addressing + ":" + self.name def reload(self): return self.dvcfile.stages[self.name] @@ -784,3 +801,11 @@ def reload(self): @property def is_cached(self): return self.name in self.dvcfile.stages and super().is_cached + + def stage_status(self): + return ["changed command"] if self.cmd_changed else [] + + def stage_changed(self, warn=False): + if self.cmd_changed and warn: + logger.warning("'cmd' of {} has changed.".format(self)) + return self.cmd_changed diff --git a/dvc/stage/exceptions.py b/dvc/stage/exceptions.py index f5211ec856..5528b53dd9 100644 --- a/dvc/stage/exceptions.py +++ b/dvc/stage/exceptions.py @@ -3,7 +3,7 @@ class StageCmdFailedError(DvcException): def __init__(self, stage): - msg = "stage '{}' cmd '{}' failed".format(stage.relpath, stage.cmd) + msg = "{} cmd '{}' failed".format(stage, stage.cmd) super().__init__(msg) @@ -15,12 +15,12 @@ def __init__(self, fname, e): class StageFileDoesNotExistError(DvcException): def __init__(self, fname): - from dvc.dvcfile import DVC_FILE_SUFFIX, Dvcfile + from dvc.dvcfile import DVC_FILE_SUFFIX, is_dvc_file msg = "'{}' does not exist.".format(fname) sname = fname + DVC_FILE_SUFFIX - if Dvcfile.is_stage_file(sname): + if is_dvc_file(sname): msg += " Do you mean '{}'?".format(sname) super().__init__(msg) @@ -34,12 +34,12 @@ def __init__(self, relpath): class StageFileIsNotDvcFileError(DvcException): def __init__(self, fname): - from dvc.dvcfile import Dvcfile, DVC_FILE_SUFFIX + from dvc.dvcfile import DVC_FILE_SUFFIX, is_dvc_file msg = "'{}' is not a DVC-file".format(fname) sname = fname + DVC_FILE_SUFFIX - if Dvcfile.is_stage_file(sname): + if is_dvc_file(sname): msg += " Do you mean '{}'?".format(sname) super().__init__(msg) @@ -92,3 +92,39 @@ def __init__(self, missing_files): msg = "missing data '{}': {}".format(source, ", ".join(missing_files)) super().__init__(msg) + + +class StageNotFound(KeyError, DvcException): + def __init__(self, file, name): + super().__init__( + "Stage with '{}' name not found inside '{}' file".format( + name, file.relpath + ) + ) + + +class StageNameUnspecified(DvcException): + def __init__(self, file): + super().__init__( + "Stage name not provided." + "Please specify the name as: `{0}:stage_name`".format( + file.relpath + ) + ) + + +class DuplicateStageName(DvcException): + def __init__(self, name, file): + super().__init__( + "Stage with name '{name}' already exists in '{relpath}'.".format( + name=name, relpath=file.relpath + ) + ) + + +class InvalidStageName(DvcException): + def __init__(self,): + super().__init__( + "Stage name cannot contain invalid characters: " + "'\\', '/', '@' and ':'." + ) diff --git a/dvc/loader.py b/dvc/stage/loader.py similarity index 70% rename from dvc/loader.py rename to dvc/stage/loader.py index 6e850e5b9c..9636956010 100644 --- a/dvc/loader.py +++ b/dvc/stage/loader.py @@ -6,21 +6,11 @@ from itertools import chain from dvc import dependency, output -from dvc.exceptions import DvcException -from dvc.utils import relpath +from .exceptions import StageNameUnspecified, StageNotFound logger = logging.getLogger(__name__) -class StageNotFound(KeyError, DvcException): - def __init__(self, file, name): - super().__init__( - "Stage with '{}' name not found inside '{}' file".format( - name, relpath(file) - ) - ) - - def resolve_paths(path, wdir=None): path = os.path.abspath(path) wdir = wdir or os.curdir @@ -34,21 +24,39 @@ def __init__(self, dvcfile, stages_data, lockfile_data=None): self.stages_data = stages_data or {} self.lockfile_data = lockfile_data or {} + def filter(self, item=None): + if not item: + return self + + if item not in self: + raise StageNotFound(self.dvcfile, item) + + data = {item: self.stages_data[item]} if item in self else {} + return self.__class__(self.dvcfile, data, self.lockfile_data) + @staticmethod def _fill_lock_checksums(stage, lock_data): - from .stage import Stage + from .params import StageParams - outs = stage.outs if not stage.cmd_changed else [] items = chain( - ((Stage.PARAM_DEPS, dep) for dep in stage.deps), - ((Stage.PARAM_OUTS, out) for out in outs), + ((StageParams.PARAM_DEPS, dep) for dep in stage.deps), + ((StageParams.PARAM_OUTS, out) for out in stage.outs), ) + + checksums = { + key: {item["path"]: item for item in lock_data.get(key, {})} + for key in [StageParams.PARAM_DEPS, StageParams.PARAM_OUTS] + } for key, item in items: - item.checksum = lock_data.get(key, {}).get(item.def_path) + item.checksum = ( + checksums.get(key, {}) + .get(item.def_path, {}) + .get(item.checksum_type) + ) @classmethod def load_stage(cls, dvcfile, name, stage_data, lock_data): - from .stage import PipelineStage, Stage, loads_from + from . import PipelineStage, Stage, loads_from path, wdir = resolve_paths( dvcfile.path, stage_data.get(Stage.PARAM_WDIR) @@ -66,13 +74,17 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data): return stage def __getitem__(self, name): + if not name: + raise StageNameUnspecified(self.dvcfile) + if name not in self: - raise StageNotFound(self.dvcfile.path, name) + raise StageNotFound(self.dvcfile, name) if not self.lockfile_data.get(name): logger.warning( "No lock entry found for '%s:%s'", self.dvcfile.relpath, name ) + return self.load_stage( self.dvcfile, name, @@ -91,10 +103,14 @@ def __contains__(self, name): class SingleStageLoader(collections.abc.Mapping): - def __init__(self, dvcfile, stage_data, stage_text=None): + def __init__(self, dvcfile, stage_data, stage_text=None, tag=None): self.dvcfile = dvcfile self.stage_data = stage_data or {} self.stage_text = stage_text + self.tag = tag + + def filter(self, item=None): + return self def __getitem__(self, item): if item: @@ -106,16 +122,16 @@ def __getitem__(self, item): # during `load`, we remove attributes from stage data, so as to # not duplicate, therefore, for MappingView, we need to deepcopy. return self.load_stage( - self.dvcfile, deepcopy(self.stage_data), self.stage_text + self.dvcfile, deepcopy(self.stage_data), self.stage_text, self.tag ) @classmethod - def load_stage(cls, dvcfile, d, stage_text): + def load_stage(cls, dvcfile, d, stage_text, tag=None): from dvc.stage import Stage, loads_from path, wdir = resolve_paths(dvcfile.path, d.get(Stage.PARAM_WDIR)) stage = loads_from(Stage, dvcfile.repo, path, wdir, d) - stage._stage_text, stage.tag = stage_text, dvcfile.tag + stage._stage_text, stage.tag = stage_text, tag stage.deps = dependency.loadd_from( stage, d.get(Stage.PARAM_DEPS) or [] ) diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index e824cf095e..3365646129 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -23,7 +23,9 @@ LOCAL_CHUNK_SIZE = 2 ** 20 # 1 MB LARGE_FILE_SIZE = 2 ** 30 # 1 GB LARGE_DIR_SIZE = 100 -TARGET_REGEX = re.compile(r"^(?P.*):(?P[^\\/@:]*)$") +TARGET_REGEX = re.compile( + r"(?P.*?)(:(?P[^\\/@:]*))??(@(?P[^\\/@:]*))??$" +) def dos2unix(data): @@ -377,14 +379,29 @@ def format_link(link): ) -def parse_target(target, default="Dvcfile"): +def parse_target(target, default=None): + from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK + from dvc.exceptions import DvcException + if not target: - return None, None + return None, None, None match = TARGET_REGEX.match(target) if not match: - return target, None - path, name = match.group("path"), match.group("name") + return target, None, None + + path, name, tag = ( + match.group("path"), + match.group("name"), + match.group("tag"), + ) if not path: - logger.warning("Assuming file to be '%s'", default) - return path or default, name + path = default or PIPELINE_FILE + logger.warning("Assuming file to be '%s'", path) + + if os.path.basename(path) == PIPELINE_LOCK: + raise DvcException( + "Did you mean: `{}`?".format(target.replace(".lock", ".yaml", 1)) + ) + + return path, name, tag diff --git a/tests/dir_helpers.py b/tests/dir_helpers.py index ad374a2dc8..1eac1e59c4 100644 --- a/tests/dir_helpers.py +++ b/tests/dir_helpers.py @@ -60,6 +60,7 @@ "tmp_dir", "scm", "dvc", + "local_remote", "run_copy", "erepo_dir", "git_dir", @@ -256,6 +257,17 @@ def _git_init(path): git.close() +@pytest.fixture +def local_remote(request, tmp_dir, dvc, make_tmp_dir): + path = make_tmp_dir("local-remote") + with dvc.config.edit() as conf: + conf["remote"]["upstream"] = {"url": fspath(path)} + conf["core"]["remote"] = "upstream" + if "scm" in request.fixturenames: + tmp_dir.scm_add([dvc.config.files["repo"]], commit="add remote") + return path + + @pytest.fixture def run_copy(tmp_dir, dvc): tmp_dir.gen( diff --git a/tests/func/test_add.py b/tests/func/test_add.py index 7458156500..2c6ee387c6 100644 --- a/tests/func/test_add.py +++ b/tests/func/test_add.py @@ -12,11 +12,15 @@ import dvc as dvc_module from dvc.cache import Cache from dvc.dvcfile import DVC_FILE_SUFFIX -from dvc.exceptions import DvcException, OverlappingOutputPathsError -from dvc.exceptions import RecursiveAddingWhileUsingFilename -from dvc.exceptions import StageFileCorruptedError +from dvc.exceptions import ( + DvcException, + OverlappingOutputPathsError, + OutputDuplicationError, + RecursiveAddingWhileUsingFilename, + StageFileCorruptedError, +) from dvc.main import main -from dvc.output.base import OutputAlreadyTrackedError +from dvc.output.base import OutputAlreadyTrackedError, OutputIsStageFileError from dvc.remote import LocalRemote from dvc.repo import Repo as DvcRepo from dvc.stage import Stage @@ -703,3 +707,20 @@ def test_add_optimization_for_hardlink_on_empty_files(tmp_dir, dvc, mocker): for stage in stages: assert os.path.exists(stage.path) assert os.path.exists(stage.outs[0].cache_path) + + +def test_output_duplication_for_pipeline_tracked(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + with pytest.raises(OutputDuplicationError): + dvc.add("bar") + + +def test_add_pipeline_file(tmp_dir, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE + + tmp_dir.dvc_gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + + with pytest.raises(OutputIsStageFileError): + dvc.add(PIPELINE_FILE) diff --git a/tests/func/test_checkout.py b/tests/func/test_checkout.py index c1a8966a94..56c58f23d4 100644 --- a/tests/func/test_checkout.py +++ b/tests/func/test_checkout.py @@ -758,3 +758,29 @@ def test_checkout_for_external_outputs(tmp_dir, dvc): ) stats = dvc.checkout(force=True) assert stats == {**empty_checkout, "modified": [str(file_path)]} + + +def test_checkouts_for_pipeline_tracked_outs(tmp_dir, dvc, scm, run_copy): + from dvc.dvcfile import PIPELINE_FILE + + tmp_dir.gen("foo", "foo") + stage1 = run_copy("foo", "bar", name="copy-foo-bar") + tmp_dir.gen("lorem", "lorem") + stage2 = run_copy("lorem", "ipsum", name="copy-lorem-ipsum") + + for out in ["bar", "ipsum"]: + (tmp_dir / out).unlink() + assert dvc.checkout(["bar"])["added"] == ["bar"] + + (tmp_dir / "bar").unlink() + assert set(dvc.checkout([PIPELINE_FILE])["added"]) == {"bar", "ipsum"} + + for out in ["bar", "ipsum"]: + (tmp_dir / out).unlink() + assert set(dvc.checkout([stage1.addressing])["added"]) == {"bar"} + + (tmp_dir / "bar").unlink() + assert set(dvc.checkout([stage2.addressing])["added"]) == {"ipsum"} + + (tmp_dir / "ipsum").unlink() + assert set(dvc.checkout()["added"]) == {"bar", "ipsum"} diff --git a/tests/func/test_commit.py b/tests/func/test_commit.py index 26f0f0fa69..51171caf01 100644 --- a/tests/func/test_commit.py +++ b/tests/func/test_commit.py @@ -38,13 +38,14 @@ def test_commit_force(tmp_dir, dvc): assert dvc.status([stage.path]) == {} -def test_commit_with_deps(tmp_dir, dvc, run_copy): +@pytest.mark.parametrize("run_kw", [{}, {"name": "copy"}]) +def test_commit_with_deps(tmp_dir, dvc, run_copy, run_kw): tmp_dir.gen("foo", "foo") (foo_stage,) = dvc.add("foo", no_commit=True) assert foo_stage is not None assert len(foo_stage.outs) == 1 - stage = run_copy("foo", "file", no_commit=True) + stage = run_copy("foo", "file", no_commit=True, **run_kw) assert stage is not None assert len(stage.outs) == 1 diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index 4bec99b058..d3d1d47ac0 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -7,7 +7,7 @@ import pytest -from dvc.compat import fspath +from dvc.compat import fspath, fspath_py35 from dvc.cache import NamedCache from dvc.data_cloud import DataCloud from dvc.main import main @@ -21,6 +21,7 @@ from dvc.remote import S3Remote from dvc.remote import SSHRemote from dvc.remote.base import STATUS_DELETED, STATUS_NEW, STATUS_OK +from dvc.stage.exceptions import StageNotFound from dvc.utils import file_md5 from dvc.utils.fs import remove from dvc.utils.stage import dump_stage_file, load_stage_file @@ -531,7 +532,7 @@ def _test(self): self._caplog.clear() self.main(["status", "-c"]) expected_warning = ( - "Output 'bar'(Stage: 'bar.dvc') is missing version info. " + "Output 'bar'(stage: 'bar.dvc') is missing version info. " "Cache for it will not be collected. " "Use `dvc repro` to get your pipeline up to date." ) @@ -752,10 +753,7 @@ def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): assert dvc.pull()["downloaded"] == 0 - for item in ["foo", "new_dir", dvc.cache.local.cache_dir]: - remove(item) - os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) - clean_repos() + clean(["foo", "new_dir"], dvc) assert dvc.pull(force=True)["downloaded"] == 2 @@ -764,3 +762,88 @@ def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): assert (tmp_dir / "new_dir").exists() assert (tmp_dir / "new_dir" / "bar").read_text() == "bar" + + +def clean(outs, dvc=None): + if dvc: + outs = outs + [dvc.cache.local.cache_dir] + for path in outs: + print(path) + remove(path) + if dvc: + os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) + clean_repos() + + +def recurse_list_dir(d): + return [ + os.path.join(d, f) for _, _, filenames in os.walk(d) for f in filenames + ] + + +def test_dvc_pull_pipeline_stages(tmp_dir, dvc, local_remote, run_copy): + (stage0,) = tmp_dir.dvc_gen("foo", "foo") + stage1 = run_copy("foo", "bar") + stage2 = run_copy("bar", "foobar", name="copy-bar-foobar") + outs = ["foo", "bar", "foobar"] + + dvc.push() + clean(outs, dvc) + dvc.pull() + assert all((tmp_dir / file).exists() for file in outs) + + for out, stage in zip(outs, [stage0, stage1, stage2]): + for target in [stage.addressing, out]: + clean(outs, dvc) + stats = dvc.pull([target]) + assert stats["downloaded"] == 1 + assert stats["added"] == [out] + assert os.path.exists(out) + assert not any(os.path.exists(out) for out in set(outs) - {out}) + + clean(outs, dvc) + stats = dvc.pull([stage2.addressing], with_deps=True) + assert len(stats["added"]) == 3 + assert set(stats["added"]) == set(outs) + + clean(outs, dvc) + stats = dvc.pull([os.curdir], recursive=True) + assert set(stats["added"]) == set(outs) + + +def test_pipeline_file_target_ops(tmp_dir, dvc, local_remote, run_copy): + tmp_dir.dvc_gen("foo", "foo") + run_copy("foo", "bar") + + tmp_dir.dvc_gen("lorem", "lorem") + run_copy("lorem", "lorem2", name="copy-lorem-lorem2") + + tmp_dir.dvc_gen("ipsum", "ipsum") + run_copy("ipsum", "baz", name="copy-ipsum-baz") + + outs = ["foo", "bar", "lorem", "ipsum", "baz", "lorem2"] + + dvc.push() + # each one's a copy of other, hence 3 + assert len(recurse_list_dir(fspath_py35(local_remote))) == 3 + + clean(outs, dvc) + assert set(dvc.pull(["pipelines.yaml"])["added"]) == {"lorem2", "baz"} + + clean(outs, dvc) + assert set(dvc.pull()["added"]) == set(outs) + + # clean everything in remote and push + clean(local_remote.iterdir()) + dvc.push(["pipelines.yaml:copy-ipsum-baz"]) + assert len(recurse_list_dir(fspath_py35(local_remote))) == 1 + + clean(local_remote.iterdir()) + dvc.push(["pipelines.yaml"]) + assert len(recurse_list_dir(fspath_py35(local_remote))) == 2 + + with pytest.raises(StageNotFound): + dvc.push(["pipelines.yaml:StageThatDoesNotExist"]) + + with pytest.raises(StageNotFound): + dvc.pull(["pipelines.yaml:StageThatDoesNotExist"]) diff --git a/tests/func/test_dvcfile.py b/tests/func/test_dvcfile.py index 3b871cce32..faee4065b1 100644 --- a/tests/func/test_dvcfile.py +++ b/tests/func/test_dvcfile.py @@ -1,9 +1,11 @@ import pytest -from dvc.dvcfile import Dvcfile -from dvc.stage import Stage -from dvc.loader import StageNotFound -from dvc.stage.exceptions import StageFileDoesNotExistError +from dvc.dvcfile import Dvcfile, PIPELINE_FILE +from dvc.stage.loader import StageNotFound +from dvc.stage.exceptions import ( + StageFileDoesNotExistError, + StageNameUnspecified, +) def test_run_load_one_for_multistage(tmp_dir, dvc): @@ -15,7 +17,7 @@ def test_run_load_one_for_multistage(tmp_dir, dvc): outs_persist_no_cache=["foo2"], always_changed=True, ) - stage2 = Dvcfile(dvc, "Dvcfile").stages["copy-foo-foo2"] + stage2 = Dvcfile(dvc, PIPELINE_FILE).stages["copy-foo-foo2"] assert stage1 == stage2 foo_out = stage2.outs[0] assert stage2.cmd == "cp foo foo2" @@ -29,7 +31,7 @@ def test_run_load_one_for_multistage(tmp_dir, dvc): def test_run_load_one_for_multistage_non_existing(tmp_dir, dvc): with pytest.raises(StageFileDoesNotExistError): - assert Dvcfile(dvc, "Dvcfile").stages.get("copy-foo-foo2") + assert Dvcfile(dvc, PIPELINE_FILE).stages.get("copy-foo-foo2") def test_run_load_one_for_multistage_non_existing_stage_name(tmp_dir, dvc): @@ -63,7 +65,7 @@ def test_has_stage_with_name(tmp_dir, dvc): metrics=["foo2"], always_changed=True, ) - dvcfile = Dvcfile(dvc, "Dvcfile") + dvcfile = Dvcfile(dvc, PIPELINE_FILE) assert "copy-foo-foo2" in dvcfile.stages assert "copy" not in dvcfile.stages @@ -77,7 +79,7 @@ def test_load_all_multistage(tmp_dir, dvc): metrics=["foo2"], always_changed=True, ) - stages = Dvcfile(dvc, "Dvcfile").stages.values() + stages = Dvcfile(dvc, PIPELINE_FILE).stages.values() assert len(stages) == 1 assert list(stages) == [stage1] @@ -89,7 +91,7 @@ def test_load_all_multistage(tmp_dir, dvc): metrics=["bar2"], always_changed=True, ) - assert set(Dvcfile(dvc, "Dvcfile").stages.values()) == {stage2, stage1} + assert set(Dvcfile(dvc, PIPELINE_FILE).stages.values()) == {stage2, stage1} def test_load_all_singlestage(tmp_dir, dvc): @@ -110,8 +112,8 @@ def test_load_singlestage(tmp_dir, dvc): assert Dvcfile(dvc, "foo2.dvc").stage == stage1 -def test_load_multistage(tmp_dir, dvc): - from dvc.dvcfile import MultiStageFileLoadError +def test_try_get_single_stage_from_pipeline_file(tmp_dir, dvc): + from dvc.dvcfile import DvcException tmp_dir.gen("foo", "foo") dvc.run( @@ -121,24 +123,8 @@ def test_load_multistage(tmp_dir, dvc): metrics=["foo2"], always_changed=True, ) - with pytest.raises(MultiStageFileLoadError): - Dvcfile(dvc, "Dvcfile").stage - - -def test_is_multistage(tmp_dir, dvc): - tmp_dir.gen({"foo": "foo", "bar": "bar"}) - stage1 = dvc.run( - cmd="cp foo foo2", - deps=["foo"], - name="copy-foo-foo2", - metrics=["foo2"], - always_changed=True, - ) - assert Dvcfile(dvc, stage1.path).is_multi_stage() - stage2 = dvc.run( - cmd="cp bar bar2", deps=["bar"], metrics=["bar2"], always_changed=True, - ) - assert not Dvcfile(dvc, stage2.path).is_multi_stage() + with pytest.raises(DvcException): + assert Dvcfile(dvc, PIPELINE_FILE).stage def test_stage_collection(tmp_dir, dvc): @@ -160,5 +146,38 @@ def test_stage_collection(tmp_dir, dvc): stage3 = dvc.run( cmd="cp bar bar2", deps=["bar"], metrics=["bar2"], always_changed=True, ) - assert {s for s in dvc.pipeline_stages} == {stage3, stage2, stage1} - assert {s for s in dvc.stages} == {Stage(dvc, "foo2.dvc"), stage1, stage3} + assert {s for s in dvc.stages} == {stage1, stage3, stage2} + + +def test_stage_filter(tmp_dir, dvc, run_copy): + tmp_dir.gen("foo", "foo") + stage1 = run_copy("foo", "bar", name="copy-foo-bar") + stage2 = run_copy("bar", "bax", name="copy-bar-bax") + stage3 = run_copy("bax", "baz", name="copy-bax-baz") + stages = Dvcfile(dvc, PIPELINE_FILE).stages + assert set(stages.filter(None).values()) == { + stage1, + stage2, + stage3, + } + assert set(stages.filter("copy-bar-bax").values()) == {stage2} + assert stages.filter("copy-bar-bax").get("copy-bar-bax") == stage2 + with pytest.raises(StageNameUnspecified): + stages.get(None) + + with pytest.raises(StageNotFound): + assert stages["unknown"] + + with pytest.raises(StageNotFound): + assert stages.filter("unknown") + + +def test_stage_filter_in_singlestage_file(tmp_dir, dvc, run_copy): + tmp_dir.gen("foo", "foo") + stage = run_copy("foo", "bar") + dvcfile = Dvcfile(dvc, stage.path) + assert set(dvcfile.stages.filter(None).values()) == {stage} + assert dvcfile.stages.filter(None).get(None) == stage + assert dvcfile.stages.filter("copy-bar-bax").get("copy-bar-bax") == stage + assert dvcfile.stages.filter("copy-bar-bax").get(None) == stage + assert set(dvcfile.stages.filter("unknown").values()) == {stage} diff --git a/tests/func/test_gc.py b/tests/func/test_gc.py index de4e61a787..e19d5dc41b 100644 --- a/tests/func/test_gc.py +++ b/tests/func/test_gc.py @@ -332,3 +332,20 @@ def test_gc_cloud_remove_order(tmp_dir, scm, dvc, tmp_path_factory, mocker): for args in mocked_remove.call_args_list[:4]: checksum = str(args[0][1]) assert checksum.endswith(".dir") or checksum.endswith(".dir.unpacked") + + +def test_gc_not_collect_pipeline_tracked_files(tmp_dir, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE, Dvcfile + + tmp_dir.gen("foo", "foo") + tmp_dir.gen("bar", "bar") + + run_copy("foo", "foo2", name="copy") + assert _count_files(dvc.cache.local.cache_dir) == 1 + dvc.gc(workspace=True, force=True) + assert _count_files(dvc.cache.local.cache_dir) == 1 + + # remove pipeline file and lockfile and check + Dvcfile(dvc, PIPELINE_FILE).remove(force=True) + dvc.gc(workspace=True, force=True) + assert _count_files(dvc.cache.local.cache_dir) == 0 diff --git a/tests/func/test_get.py b/tests/func/test_get.py index e7e928ab4d..7185ed4c5b 100644 --- a/tests/func/test_get.py +++ b/tests/func/test_get.py @@ -220,3 +220,19 @@ def test_get_url_git_only_repo(tmp_dir, scm, caplog): with caplog.at_level(logging.ERROR): assert main(["get", fspath(tmp_dir), "foo", "--show-url"]) == 1 assert "failed to show URL" in caplog.text + + +def test_get_pipeline_tracked_outs( + tmp_dir, dvc, scm, git_dir, local_remote, run_copy +): + from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK + + tmp_dir.gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + dvc.scm.add([PIPELINE_FILE, PIPELINE_LOCK]) + dvc.scm.commit("add pipeline stage") + dvc.push() + + with git_dir.chdir(): + Repo.get("file:///{}".format(fspath(tmp_dir)), "bar", out="baz") + assert (git_dir / "baz").read_text() == "foo" diff --git a/tests/func/test_import.py b/tests/func/test_import.py index 4e3b911e34..ce49acc50f 100644 --- a/tests/func/test_import.py +++ b/tests/func/test_import.py @@ -322,3 +322,21 @@ def test_import_from_bare_git_repo(tmp_dir, make_tmp_dir, erepo_dir): dvc_repo = make_tmp_dir("dvc-repo", scm=True, dvc=True) with dvc_repo.chdir(): dvc_repo.dvc.imp(fspath(tmp_dir), "foo") + + +def test_import_pipeline_tracked_outs( + tmp_dir, dvc, scm, erepo_dir, local_remote, run_copy +): + from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK + + tmp_dir.gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + dvc.scm.add([PIPELINE_FILE, PIPELINE_LOCK]) + dvc.scm.commit("add pipeline stage") + dvc.push() + + with erepo_dir.chdir(): + erepo_dir.dvc.imp( + "file:///{}".format(fspath(tmp_dir)), "bar", out="baz" + ) + assert (erepo_dir / "baz").read_text() == "foo" diff --git a/tests/func/test_ls.py b/tests/func/test_ls.py index 73acf70213..af3842d69a 100644 --- a/tests/func/test_ls.py +++ b/tests/func/test_ls.py @@ -429,3 +429,15 @@ def test_ls_not_existed_url(): dirname = "__{}_{}".format("not_existed", time()) with pytest.raises(CloneError): Repo.ls(dirname, recursive=True) + + +def test_ls_shows_pipeline_tracked_outs(tmp_dir, dvc, scm, run_copy): + from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK + + tmp_dir.gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + dvc.scm.add([PIPELINE_FILE, PIPELINE_LOCK]) + dvc.scm.commit("add pipeline stage") + + files = Repo.ls(os.curdir, outs_only=True) + match_files(files, ((("bar",), True),)) diff --git a/tests/func/test_pipeline.py b/tests/func/test_pipeline.py index 4431a86f20..f9c43f2aa6 100644 --- a/tests/func/test_pipeline.py +++ b/tests/func/test_pipeline.py @@ -278,20 +278,20 @@ def test_pipeline_list_show_multistage(tmp_dir, dvc, run_copy, caplog): with caplog.at_level(logging.INFO, "dvc"): command._show("foobar.dvc", False, False, False) output = caplog.text.splitlines() - assert "Dvcfile:copy-foo-bar" in output[0] + assert "pipelines.yaml:copy-foo-bar" in output[0] assert "foobar.dvc" in output[1] caplog.clear() with caplog.at_level(logging.INFO, "dvc"): - command._show("Dvcfile:copy-foo-bar", False, False, False) - assert "Dvcfile:copy-foo-bar" in caplog.text + command._show("pipelines.yaml:copy-foo-bar", False, False, False) + assert "pipelines.yaml:copy-foo-bar" in caplog.text assert "foobar.dvc" not in caplog.text command = CmdPipelineList([]) caplog.clear() with caplog.at_level(logging.INFO, "dvc"): command.run() - assert "Dvcfile:copy-foo-bar" in caplog.text + assert "pipelines.yaml:copy-foo-bar" in caplog.text assert "foobar.dvc" in caplog.text assert "1 pipelines in total" @@ -302,10 +302,10 @@ def test_pipeline_ascii_multistage(tmp_dir, dvc, run_copy): run_copy("bar", "foobar") command = CmdPipelineShow([]) nodes, edges, is_tree = command._build_graph("foobar.dvc") - assert set(nodes) == {"Dvcfile:copy-foo-bar", "foobar.dvc"} + assert set(nodes) == {"pipelines.yaml:copy-foo-bar", "foobar.dvc"} assert set(edges) == { - ("foobar.dvc", "Dvcfile:copy-foo-bar"), + ("foobar.dvc", "pipelines.yaml:copy-foo-bar"), } - nodes, edges, is_tree = command._build_graph("Dvcfile:copy-foo-bar") - assert set(nodes) == {"Dvcfile:copy-foo-bar"} + nodes, edges, is_tree = command._build_graph("pipelines.yaml:copy-foo-bar") + assert set(nodes) == {"pipelines.yaml:copy-foo-bar"} diff --git a/tests/func/test_remove.py b/tests/func/test_remove.py index 7df47300bc..d65713a47e 100644 --- a/tests/func/test_remove.py +++ b/tests/func/test_remove.py @@ -17,13 +17,13 @@ def test(self): self.assertEqual(len(stages), 1) stage = stages[0] self.assertTrue(stage is not None) - stage_removed = self.dvc.remove(stage.path, outs_only=True) + (stage_removed,) = self.dvc.remove(stage.path, outs_only=True) self.assertIsInstance(stage_removed, Stage) self.assertEqual(stage.path, stage_removed.path) self.assertFalse(os.path.isfile(self.FOO)) - stage_removed = self.dvc.remove(stage.path) + (stage_removed,) = self.dvc.remove(stage.path) self.assertIsInstance(stage_removed, Stage) self.assertEqual(stage.path, stage_removed.path) self.assertFalse(os.path.isfile(self.FOO)) @@ -60,7 +60,7 @@ def test(self): self.assertEqual(len(stages), 1) stage_add = stages[0] self.assertTrue(stage_add is not None) - stage_removed = self.dvc.remove(stage_add.path) + (stage_removed,) = self.dvc.remove(stage_add.path) self.assertEqual(stage_add.path, stage_removed.path) self.assertFalse(os.path.exists(self.DATA_DIR)) self.assertFalse(os.path.exists(stage_removed.path)) @@ -96,3 +96,34 @@ def test_force(self, mock_prompt): mock_prompt.assert_called() self.assertEqual(ret, 1) self.assertRaises(DvcException) + + +def test_remove_pipeline_outs(tmp_dir, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE + + tmp_dir.gen("foo", "foo") + stage = run_copy("foo", "bar", name="copy-foo-bar") + + assert main(["remove", stage.path]) == 0 + assert not (tmp_dir / "bar").exists() + assert (tmp_dir / PIPELINE_FILE).exists() + + stage = run_copy("foo", "foobar", name="copy-foo-foobar") + + assert main(["remove", stage.addressing]) == 0 + assert not (tmp_dir / "foobar").exists() + assert (tmp_dir / "foo").exists() + + stage = run_copy("foo", "baz", name="copy-foo-baz") + assert main(["remove", "--purge", "-f", stage.addressing]) == 0 + assert not (tmp_dir / "baz").exists() + assert (tmp_dir / "foo").exists() + assert (tmp_dir / PIPELINE_FILE).exists() + + dvc.reproduce(PIPELINE_FILE) + assert main(["remove", PIPELINE_FILE]) == 0 + for file in ["bar", "foobar", "baz"]: + assert not (tmp_dir / file).exists() + + assert (tmp_dir / "foo").exists() + assert (tmp_dir / PIPELINE_FILE).exists() diff --git a/tests/func/test_repo.py b/tests/func/test_repo.py index e907674027..20200cbd51 100644 --- a/tests/func/test_repo.py +++ b/tests/func/test_repo.py @@ -1,31 +1,59 @@ import os -from dvc.scm.git.tree import GitTree from dvc.cache import Cache from dvc.repo import Repo from dvc.system import System from dvc.compat import fspath -def test_destroy(tmp_dir, dvc): +def test_destroy(tmp_dir, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK + dvc.config["cache"]["type"] = ["symlink"] dvc.cache = Cache(dvc) tmp_dir.dvc_gen("file", "text") tmp_dir.dvc_gen({"dir": {"file": "lorem", "subdir/file": "ipsum"}}) + run_copy("file", "file2") + run_copy("file2", "file3", name="copy-file2-file3") + run_copy("file3", "file4", name="copy-file3-file4") + dvc.destroy() # Remove all the files related to DVC - for path in [".dvc", "file.dvc", "dir.dvc"]: + for path in [ + ".dvc", + "file.dvc", + "file2.dvc", + "dir.dvc", + PIPELINE_FILE, + PIPELINE_LOCK, + ]: assert not (tmp_dir / path).exists() # Leave the rest of the files - for path in ["file", "dir/file", "dir/subdir/file"]: + for path in [ + "file", + "file2", + "file3", + "file4", + "dir/file", + "dir/subdir/file", + ]: assert (tmp_dir / path).is_file() # Make sure that data was unprotected after `destroy` - for path in ["file", "dir", "dir/file", "dir/subdir", "dir/subdir/file"]: + for path in [ + "file", + "file2", + "file3", + "file4", + "dir", + "dir/file", + "dir/subdir", + "dir/subdir/file", + ]: assert not System.is_symlink(fspath(tmp_dir / path)) @@ -49,12 +77,27 @@ def collect_outs(*args, **kwargs): scm.commit("Add buzz") assert collect_outs("bar.dvc", with_deps=True) == {"foo", "bar"} - - dvc.tree = GitTree(scm.repo, "new-branch") - assert collect_outs("buzz.dvc", with_deps=True) == {"foo", "bar", "buzz"} assert collect_outs("buzz.dvc", with_deps=False) == {"buzz"} + run_copy("foo", "foobar", name="copy-foo-foobar") + assert collect_outs(":copy-foo-foobar") == {"foobar"} + assert collect_outs(":copy-foo-foobar", with_deps=True) == { + "foobar", + "foo", + } + assert collect_outs("pipelines.yaml:copy-foo-foobar", recursive=True) == { + "foobar" + } + + run_copy("foobar", "baz", name="copy-foobar-baz") + assert collect_outs("pipelines.yaml") == {"foobar", "baz"} + assert collect_outs("pipelines.yaml", with_deps=True) == { + "foobar", + "baz", + "foo", + } + def test_stages(tmp_dir, dvc): def stages(): diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index 5de6974803..186c0a8290 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -191,7 +191,7 @@ def test_nested(self): # NOTE: os.walk() walks in a sorted order and we need dir2 subdirs to # be processed before dir1 to load error.dvc first. - self.dvc.pipeline_stages = [ + self.dvc.stages = [ nested_stage, Dvcfile(self.dvc, error_stage_path).stage, ] @@ -225,9 +225,6 @@ def test_similar_paths(self): self.fail("should not raise StagePathAsOutputError") -# TODO: Test ^ for multistage - - class TestReproDepUnderDir(SingleStageRun, TestDvc): def test(self): stages = self.dvc.add(self.DATA_DIR) @@ -560,7 +557,13 @@ def test(self): def test_non_existing(self): with self.assertRaises(StageFileDoesNotExistError): - self.dvc.lock_stage("non-existing-stage") + self.dvc.lock_stage("Dvcfile") + self.dvc.lock_stage("pipelines.yaml") + self.dvc.lock_stage("pipelines.yaml:name") + self.dvc.lock_stage("Dvcfile:name") + self.dvc.lock_stage("stage.dvc") + self.dvc.lock_stage("stage.dvc:name") + self.dvc.lock_stage("not-existing-stage.json") ret = main(["lock", "non-existing-stage"]) self.assertNotEqual(ret, 0) @@ -808,7 +811,7 @@ def test(self): self.assertTrue(filecmp.cmp(foo, bar, shallow=False)) -class TestReproExternalBase(TestDvc): +class TestReproExternalBase(SingleStageRun, TestDvc): cache_type = None @staticmethod @@ -923,13 +926,14 @@ def test(self, mock_prompt): ) self.assertEqual(self.dvc.status([import_remote_stage.path]), {}) - cmd_stage = self.dvc.run( + cmd_stage = self._run( outs=[out_bar_path], deps=[out_foo_path], cmd=self.cmd(foo_path, bar_path), + name="external-base", ) - self.assertEqual(self.dvc.status([cmd_stage.path]), {}) + self.assertEqual(self.dvc.status([cmd_stage.addressing]), {}) self.assertEqual(self.dvc.status(), {}) self.check_already_cached(cmd_stage) @@ -945,19 +949,19 @@ def test(self, mock_prompt): self.dvc.update(import_remote_stage.path) self.assertEqual(self.dvc.status([import_remote_stage.path]), {}) - stages = self.dvc.reproduce(cmd_stage.path) + stages = self.dvc.reproduce(cmd_stage.addressing) self.assertEqual(len(stages), 1) - self.assertEqual(self.dvc.status([cmd_stage.path]), {}) + self.assertEqual(self.dvc.status([cmd_stage.addressing]), {}) self.assertEqual(self.dvc.status(), {}) self.dvc.gc(workspace=True) self.assertEqual(self.dvc.status(), {}) self.dvc.remove(cmd_stage.path, outs_only=True) - self.assertNotEqual(self.dvc.status([cmd_stage.path]), {}) + self.assertNotEqual(self.dvc.status([cmd_stage.addressing]), {}) self.dvc.checkout([cmd_stage.path], force=True) - self.assertEqual(self.dvc.status([cmd_stage.path]), {}) + self.assertEqual(self.dvc.status([cmd_stage.addressing]), {}) @pytest.mark.skipif(os.name == "nt", reason="temporarily disabled on windows") @@ -1196,10 +1200,11 @@ def test(self): with open("create-output.py", "w") as fd: fd.write(cmd) - run_stage = self.dvc.run( + run_stage = self._run( deps=[run_dependency], outs=[run_output], cmd="python create-output.py", + name="http_run", ) self.assertTrue(run_stage is not None) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index 78c6756732..08d059326f 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -3,9 +3,11 @@ import pytest +from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK from dvc.exceptions import CyclicGraphError from dvc.stage import PipelineStage -from dvc.utils.stage import dump_stage_file +from dvc.utils.stage import dump_stage_file, parse_stage + from tests.func import test_repro from dvc.main import main @@ -157,6 +159,42 @@ class TestReproChangedDirDataMultiStage( pass +class TestReproExternalS3MultiStage( + MultiStageRun, test_repro.TestReproExternalS3 +): + pass + + +class TestReproExternalGSMultiStage( + MultiStageRun, test_repro.TestReproExternalGS +): + pass + + +class TestReproExternalHDFSMultiStage( + MultiStageRun, test_repro.TestReproExternalHDFS +): + pass + + +class TestReproExternalHTTPMultiStage( + MultiStageRun, test_repro.TestReproExternalHTTP +): + pass + + +class TestReproExternalLOCALMultiStage( + MultiStageRun, test_repro.TestReproExternalLOCAL +): + pass + + +class TestReproExternalSSHMultiStage( + MultiStageRun, test_repro.TestReproExternalSSH +): + pass + + def test_non_existing_stage_name(tmp_dir, dvc, run_copy): from dvc.exceptions import DvcException @@ -169,9 +207,6 @@ def test_non_existing_stage_name(tmp_dir, dvc, run_copy): assert main(["lock", ":copy-file1-file3"]) != 0 -# TODO: TestReproWorkingDirectoryAsOutput - - def test_downstream(tmp_dir, dvc): # The dependency graph should look like this: # @@ -204,17 +239,19 @@ def test_downstream(tmp_dir, dvc): # / # B # - evaluation = dvc.reproduce("Dvcfile:B-gen", downstream=True, force=True) + evaluation = dvc.reproduce( + PIPELINE_FILE + ":B-gen", downstream=True, force=True + ) assert len(evaluation) == 3 assert ( isinstance(evaluation[0], PipelineStage) - and evaluation[0].relpath == "Dvcfile" + and evaluation[0].relpath == PIPELINE_FILE and evaluation[0].name == "B-gen" ) assert ( isinstance(evaluation[1], PipelineStage) - and evaluation[1].relpath == "Dvcfile" + and evaluation[1].relpath == PIPELINE_FILE and evaluation[1].name == "D-gen" ) assert ( @@ -224,16 +261,19 @@ def test_downstream(tmp_dir, dvc): def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy): - from dvc.dvcfile import Dvcfile + from dvc.dvcfile import PipelineFile tmp_dir.gen("foo", "foo") - stage = run_copy("foo", "bar", fname="copy-process.dvc", name="copy-file") - target = "copy-process.dvc:copy-file" + stage = run_copy("foo", "bar", name="copy-file") + target = ":copy-file" assert not dvc.reproduce(target) stage.cmd = " ".join(stage.cmd.split()) # change cmd spacing by two - Dvcfile(dvc, "copy-process.dvc").dump_multistage_dvcfile(stage) + PipelineFile(dvc, PIPELINE_FILE)._dump_pipeline_file(stage) + assert dvc.status([target]) == { + PIPELINE_FILE + target: ["changed command"] + } assert dvc.reproduce(target)[0] == stage @@ -246,10 +286,9 @@ def test_repro_when_new_deps_is_added_in_dvcfile(tmp_dir, dvc, run_copy): cmd="python copy.py {} {}".format("foo", "foobar"), outs=["foobar"], deps=["foo"], - fname="copy-process.dvc", name="copy-file", ) - target = "copy-process.dvc:copy-file" + target = PIPELINE_FILE + ":copy-file" assert not dvc.reproduce(target) dvcfile = Dvcfile(dvc, stage.path) @@ -269,10 +308,9 @@ def test_repro_when_new_outs_is_added_in_dvcfile(tmp_dir, dvc): cmd="python copy.py {} {}".format("foo", "foobar"), outs=[], # scenario where user forgot to add deps=["foo"], - fname="copy-process.dvc", name="copy-file", ) - target = "copy-process.dvc:copy-file" + target = ":copy-file" assert not dvc.reproduce(target) dvcfile = Dvcfile(dvc, stage.path) @@ -292,10 +330,9 @@ def test_repro_when_new_deps_is_moved(tmp_dir, dvc): cmd="python copy.py {} {}".format("foo", "foobar"), outs=["foobar"], deps=["foo"], - fname="copy-process.dvc", name="copy-file", ) - target = "copy-process.dvc:copy-file" + target = ":copy-file" assert not dvc.reproduce(target) tmp_dir.gen("copy.py", COPY_SCRIPT_FORMAT.format("'bar'", "'foobar'")) @@ -317,7 +354,7 @@ def test_repro_when_new_out_overlaps_others_stage_outs(tmp_dir, dvc): tmp_dir.gen({"dir": {"file1": "file1"}, "foo": "foo"}) dvc.add("dir") dump_stage_file( - "Dvcfile", + PIPELINE_FILE, { "stages": { "run-copy": { @@ -338,7 +375,7 @@ def test_repro_when_new_deps_added_does_not_exist(tmp_dir, dvc): tmp_dir.gen("copy.py", COPY_SCRIPT) tmp_dir.gen("foo", "foo") dump_stage_file( - "Dvcfile", + PIPELINE_FILE, { "stages": { "run-copy": { @@ -359,7 +396,7 @@ def test_repro_when_new_outs_added_does_not_exist(tmp_dir, dvc): tmp_dir.gen("copy.py", COPY_SCRIPT) tmp_dir.gen("foo", "foo") dump_stage_file( - "Dvcfile", + PIPELINE_FILE, { "stages": { "run-copy": { @@ -378,7 +415,7 @@ def test_repro_when_lockfile_gets_deleted(tmp_dir, dvc): tmp_dir.gen("copy.py", COPY_SCRIPT) tmp_dir.gen("foo", "foo") dump_stage_file( - "Dvcfile", + PIPELINE_FILE, { "stages": { "run-copy": { @@ -390,20 +427,17 @@ def test_repro_when_lockfile_gets_deleted(tmp_dir, dvc): }, ) assert dvc.reproduce(":run-copy") - assert os.path.exists("Dvcfile.lock") - assert os.path.exists("foobar.dvc") + assert os.path.exists(PIPELINE_LOCK) assert not dvc.reproduce(":run-copy") - os.unlink("Dvcfile.lock") + os.unlink(PIPELINE_LOCK) stages = dvc.reproduce(":run-copy") assert ( stages - and stages[0].relpath == "Dvcfile" + and stages[0].relpath == PIPELINE_FILE and stages[0].name == "run-copy" ) - assert os.path.exists("foobar.dvc") - def test_cyclic_graph_error(tmp_dir, dvc, run_copy): tmp_dir.gen("foo", "foo") @@ -411,16 +445,13 @@ def test_cyclic_graph_error(tmp_dir, dvc, run_copy): run_copy("bar", "baz", name="copy-bar-baz") run_copy("baz", "foobar", name="copy-baz-foobar") - stage_dump = { - "stages": { - "copy-baz-foo": { - "cmd": "echo baz > foo", - "deps": ["baz"], - "outs": ["foo"], - } + with open(PIPELINE_FILE, "r") as f: + data = parse_stage(f.read(), PIPELINE_FILE) + data["stages"]["copy-baz-foo"] = { + "cmd": "echo baz > foo", + "deps": ["baz"], + "outs": ["foo"], } - } - dump_stage_file("cycle.dvc", stage_dump) - + dump_stage_file(PIPELINE_FILE, data) with pytest.raises(CyclicGraphError): - dvc.reproduce("cycle.dvc:copy-baz-foo") + dvc.reproduce(":copy-baz-foo") diff --git a/tests/func/test_run_multistage.py b/tests/func/test_run_multistage.py index 35594c1268..f258622738 100644 --- a/tests/func/test_run_multistage.py +++ b/tests/func/test_run_multistage.py @@ -1,19 +1,20 @@ import pytest import os +from dvc.stage.exceptions import InvalidStageName, DuplicateStageName + def test_run_with_name(tmp_dir, dvc, run_copy): from dvc.stage import PipelineStage - from dvc.dvcfile import DVC_FILE, DVC_FILE_SUFFIX + from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK tmp_dir.dvc_gen("foo", "foo") - assert not os.path.exists(DVC_FILE) + assert not os.path.exists(PIPELINE_FILE) stage = run_copy("foo", "bar", name="copy-foo-to-bar") assert isinstance(stage, PipelineStage) assert stage.name == "copy-foo-to-bar" - assert os.path.exists(DVC_FILE) - assert os.path.exists(DVC_FILE + ".lock") - assert os.path.exists("foo" + DVC_FILE_SUFFIX) + assert os.path.exists(PIPELINE_FILE) + assert os.path.exists(PIPELINE_LOCK) def test_run_with_multistage_and_single_stage(tmp_dir, dvc, run_copy): @@ -32,40 +33,20 @@ def test_run_with_multistage_and_single_stage(tmp_dir, dvc, run_copy): def test_run_multi_stage_repeat(tmp_dir, dvc, run_copy): from dvc.stage import PipelineStage - from dvc.dvcfile import Dvcfile, DVC_FILE, DVC_FILE_SUFFIX + from dvc.dvcfile import Dvcfile, PIPELINE_FILE tmp_dir.dvc_gen("foo", "foo") run_copy("foo", "foo1", name="copy-foo-foo1") run_copy("foo1", "foo2", name="copy-foo1-foo2") run_copy("foo2", "foo3") - stages = list(Dvcfile(dvc, DVC_FILE).stages.values()) + stages = list(Dvcfile(dvc, PIPELINE_FILE).stages.values()) assert len(stages) == 2 assert all(isinstance(stage, PipelineStage) for stage in stages) assert set(stage.name for stage in stages) == { "copy-foo-foo1", "copy-foo1-foo2", } - assert all( - os.path.exists(file + DVC_FILE_SUFFIX) - for file in ["foo1", "foo2", "foo3"] - ) - - -def test_multi_stage_try_writing_on_single_stage_file(tmp_dir, dvc, run_copy): - from dvc.exceptions import DvcException - from dvc.dvcfile import MultiStageFileLoadError - - tmp_dir.dvc_gen("foo") - dvc.run(cmd="echo foo", deps=["foo"]) - - with pytest.raises(DvcException): - run_copy("foo", "foo2", name="copy-foo1-foo2") - - run_copy("foo", "foo2", name="copy-foo1-foo2", fname="DIFFERENT-FILE.dvc") - - with pytest.raises(MultiStageFileLoadError): - run_copy("foo2", "foo3", fname="DIFFERENT-FILE.dvc") def test_multi_stage_run_cached(tmp_dir, dvc, run_copy): @@ -78,8 +59,6 @@ def test_multi_stage_run_cached(tmp_dir, dvc, run_copy): def test_multistage_dump_on_non_cached_outputs(tmp_dir, dvc): - from dvc.dvcfile import DVC_FILE_SUFFIX - tmp_dir.dvc_gen("foo") dvc.run( cmd="cp foo foo1", @@ -87,7 +66,6 @@ def test_multistage_dump_on_non_cached_outputs(tmp_dir, dvc): name="copy-foo1-foo2", outs_no_cache=["foo1"], ) - assert not os.path.exists("foo1" + DVC_FILE_SUFFIX) def test_multistage_with_wdir(tmp_dir, dvc): @@ -136,7 +114,7 @@ def test_graph(tmp_dir, dvc): def test_run_dump_on_multistage(tmp_dir, dvc): - from dvc.dvcfile import Dvcfile + from dvc.dvcfile import Dvcfile, PIPELINE_FILE tmp_dir.gen({"dir": {"foo": "foo", "bar": "bar"}}) dvc.run( @@ -146,7 +124,7 @@ def test_run_dump_on_multistage(tmp_dir, dvc): outs=["foo1"], wdir="dir", ) - data, _ = Dvcfile(dvc, "Dvcfile")._load() + data, _ = Dvcfile(dvc, PIPELINE_FILE)._load() assert data == { "stages": { "copy-foo-foo1": { @@ -166,7 +144,7 @@ def test_run_dump_on_multistage(tmp_dir, dvc): outs_persist=["foo2"], always_changed=True, ) - assert Dvcfile(dvc, "Dvcfile")._load()[0] == { + assert Dvcfile(dvc, PIPELINE_FILE)._load()[0] == { "stages": { "copy-foo-foo2": { "cmd": "cp foo foo2", @@ -178,3 +156,16 @@ def test_run_dump_on_multistage(tmp_dir, dvc): **data["stages"], } } + + +def test_run_with_invalid_stage_name(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + with pytest.raises(InvalidStageName): + run_copy("foo", "bar", name="email@https://dvc.org") + + +def test_run_already_exists(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + run_copy("foo", "bar", name="copy") + with pytest.raises(DuplicateStageName): + run_copy("bar", "foobar", name="copy") diff --git a/tests/func/test_stage.py b/tests/func/test_stage.py index 9a827dbe97..e1852fd8d1 100644 --- a/tests/func/test_stage.py +++ b/tests/func/test_stage.py @@ -7,7 +7,7 @@ from dvc.remote.local import LocalRemote from dvc.repo import Repo from dvc.stage import Stage -from dvc.dvcfile import Dvcfile +from dvc.dvcfile import SingleStageFile from dvc.stage.exceptions import StageFileFormatError from dvc.utils.stage import dump_stage_file from dvc.utils.stage import load_stage_file @@ -16,40 +16,40 @@ def test_cmd_obj(): with pytest.raises(StageFileFormatError): - Dvcfile.validate_single_stage({Stage.PARAM_CMD: {}}) + SingleStageFile.validate({Stage.PARAM_CMD: {}}) def test_cmd_none(): - Dvcfile.validate_single_stage({Stage.PARAM_CMD: None}) + SingleStageFile.validate({Stage.PARAM_CMD: None}) def test_no_cmd(): - Dvcfile.validate_single_stage({}) + SingleStageFile.validate({}) def test_cmd_str(): - Dvcfile.validate_single_stage({Stage.PARAM_CMD: "cmd"}) + SingleStageFile.validate({Stage.PARAM_CMD: "cmd"}) def test_object(): with pytest.raises(StageFileFormatError): - Dvcfile.validate_single_stage({Stage.PARAM_DEPS: {}}) + SingleStageFile.validate({Stage.PARAM_DEPS: {}}) with pytest.raises(StageFileFormatError): - Dvcfile.validate_single_stage({Stage.PARAM_OUTS: {}}) + SingleStageFile.validate({Stage.PARAM_OUTS: {}}) def test_none(): - Dvcfile.validate_single_stage({Stage.PARAM_DEPS: None}) - Dvcfile.validate_single_stage({Stage.PARAM_OUTS: None}) + SingleStageFile.validate({Stage.PARAM_DEPS: None}) + SingleStageFile.validate({Stage.PARAM_OUTS: None}) def test_empty_list(): d = {Stage.PARAM_DEPS: []} - Dvcfile.validate_single_stage(d) + SingleStageFile.validate(d) d = {Stage.PARAM_OUTS: []} - Dvcfile.validate_single_stage(d) + SingleStageFile.validate(d) def test_list(): @@ -59,12 +59,12 @@ def test_list(): {OutputLOCAL.PARAM_PATH: "baz"}, ] d = {Stage.PARAM_DEPS: lst} - Dvcfile.validate_single_stage(d) + SingleStageFile.validate(d) lst[0][OutputLOCAL.PARAM_CACHE] = True lst[1][OutputLOCAL.PARAM_CACHE] = False d = {Stage.PARAM_OUTS: lst} - Dvcfile.validate_single_stage(d) + SingleStageFile.validate(d) class TestReload(TestDvc): @@ -81,7 +81,7 @@ def test(self): d[stage.PARAM_MD5] = md5 dump_stage_file(stage.relpath, d) - dvcfile = Dvcfile(self.dvc, stage.relpath) + dvcfile = SingleStageFile(self.dvc, stage.relpath) stage = dvcfile.stage self.assertTrue(stage is not None) @@ -106,7 +106,7 @@ def test_ignored_in_checksum(self): self.assertNotIn(Stage.PARAM_WDIR, d.keys()) with self.dvc.lock, self.dvc.state: - stage = Dvcfile(self.dvc, stage.relpath).stage + stage = SingleStageFile(self.dvc, stage.relpath).stage self.assertFalse(stage.changed()) @@ -157,8 +157,8 @@ def test_md5_ignores_comments(tmp_dir, dvc): with open(stage.path, "a") as f: f.write("# End comment\n") - new_stage = Dvcfile(dvc, stage.path).stage - assert not new_stage.changed_md5() + new_stage = SingleStageFile(dvc, stage.path).stage + assert not new_stage.stage_changed() def test_meta_is_preserved(tmp_dir, dvc): @@ -170,7 +170,7 @@ def test_meta_is_preserved(tmp_dir, dvc): dump_stage_file(stage.path, data) # Loading and dumping to test that it works and meta is retained - dvcfile = Dvcfile(dvc, stage.path) + dvcfile = SingleStageFile(dvc, stage.path) new_stage = dvcfile.stage dvcfile.dump(new_stage) @@ -192,3 +192,21 @@ def test_parent_repo_collect_stages(tmp_dir, scm, dvc): assert stages == [] assert subrepo_stages != [] + + +def test_stage_addressing(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + stage1 = run_copy("foo", "bar") + assert stage1.addressing == "bar.dvc" + + stage2 = run_copy("bar", "baz", name="copy-bar-baz") + assert stage2.addressing == "pipelines.yaml:copy-bar-baz" + + folder = tmp_dir / "dir" + folder.mkdir() + with folder.chdir(): + assert stage1.addressing == os.path.relpath(stage1.path) + assert ( + stage2.addressing + == os.path.relpath(stage2.path) + ":" + stage2.name + ) diff --git a/tests/func/test_status.py b/tests/func/test_status.py index a34a355f3d..b958d74013 100644 --- a/tests/func/test_status.py +++ b/tests/func/test_status.py @@ -66,3 +66,30 @@ def test_status_before_and_after_dvc_init(tmp_dir, dvc, git_dir): "file ({})".format(fspath(git_dir)): "update available" } } + + +def test_status_on_pipeline_stages(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + stage = run_copy("foo", "bar", name="copy-foo-bar") + + stage.cmd = " ".join(stage.cmd.split()) + stage.dvcfile._dump_pipeline_file(stage) + assert dvc.status() == {"pipelines.yaml:copy-foo-bar": ["changed command"]} + + # delete outputs + (tmp_dir / "bar").unlink() + assert dvc.status() == { + "pipelines.yaml:copy-foo-bar": [ + {"changed outs": {"bar": "deleted"}}, + "changed command", + ] + } + (tmp_dir / "foo").unlink() + assert dvc.status() == { + "foo.dvc": [{"changed outs": {"foo": "deleted"}}], + "pipelines.yaml:copy-foo-bar": [ + {"changed deps": {"foo": "deleted"}}, + {"changed outs": {"bar": "deleted"}}, + "changed command", + ], + } diff --git a/tests/unit/output/test_output.py b/tests/unit/output/test_output.py index fb9325471b..ed4f637f95 100644 --- a/tests/unit/output/test_output.py +++ b/tests/unit/output/test_output.py @@ -52,7 +52,7 @@ def test_checksum_schema_fail(value): ( False, ( - "Output 'path'(Stage stage.dvc) is missing version info. " + "Output 'path'(stage: 'stage.dvc') is missing version info. " "Cache for it will not be collected. " "Use `dvc repro` to get your pipeline up to date." ), @@ -60,19 +60,19 @@ def test_checksum_schema_fail(value): ( True, ( - "Output 'path'(Stage stage.dvc) is missing version info. " + "Output 'path'(stage: 'stage.dvc') is missing version info. " "Cache for it will not be collected. " "Use `dvc repro` to get your pipeline up to date.\n" "You can also use `dvc commit stage.dvc` to associate " - "existing 'path' with 'stage.dvc'." + "existing 'path' with stage: 'stage.dvc'." ), ), ], ) def test_get_used_cache(exists, expected_message, mocker, caplog): stage = mocker.MagicMock() - mocker.patch.object(stage, "__str__", return_value="Stage stage.dvc") - mocker.patch.object(stage, "relpath", "stage.dvc") + mocker.patch.object(stage, "__str__", return_value="stage: 'stage.dvc'") + mocker.patch.object(stage, "addressing", "stage.dvc") output = OutputBase(stage, "path") diff --git a/tests/unit/test_dvcfile.py b/tests/unit/test_dvcfile.py new file mode 100644 index 0000000000..41982e5ca2 --- /dev/null +++ b/tests/unit/test_dvcfile.py @@ -0,0 +1,26 @@ +import pytest + +from dvc.dvcfile import Dvcfile, PipelineFile, SingleStageFile + + +@pytest.mark.parametrize( + "path", + [ + "pipelines.yaml", + "pipelines.yml", + "custom-pipelines.yml", + "custom-pipelines.yaml", + "../models/pipelines.yml", + ], +) +def test_pipelines_file(path): + file_obj = Dvcfile(object(), path) + assert isinstance(file_obj, PipelineFile) + + +@pytest.mark.parametrize( + "path", ["Dvcfile", "stage.dvc", "../models/stage.dvc"] +) +def test_pipelines_single_stage_file(path): + file_obj = Dvcfile(object(), path) + assert isinstance(file_obj, SingleStageFile) diff --git a/tests/unit/test_lockfile.py b/tests/unit/test_lockfile.py index 294e2fd333..d0b559e02d 100644 --- a/tests/unit/test_lockfile.py +++ b/tests/unit/test_lockfile.py @@ -1,27 +1,26 @@ from dvc.stage import PipelineStage -from dvc import lockfile, serialize -import json +from dvc.dvcfile import Lockfile, LockfileCorruptedError +import yaml import pytest def test_stage_dump_no_outs_deps(tmp_dir, dvc): stage = PipelineStage(name="s1", repo=dvc, path="path", cmd="command") - lockfile.dump(dvc, "path.lock", serialize.to_lockfile(stage)) - assert lockfile.load(dvc, "path.lock") == { - "s1": {"cmd": "command", "deps": {}, "outs": {}} - } + lockfile = Lockfile(dvc, "path.lock") + lockfile.dump(stage) + assert lockfile.load() == {"s1": {"cmd": "command"}} def test_stage_dump_when_already_exists(tmp_dir, dvc): - data = {"s1": {"cmd": "command", "deps": {}, "outs": {}}} + data = {"s1": {"cmd": "command", "deps": [], "outs": []}} with open("path.lock", "w+") as f: - json.dump(data, f) + yaml.dump(data, f) stage = PipelineStage(name="s2", repo=dvc, path="path", cmd="command2") - - lockfile.dump(dvc, "path.lock", serialize.to_lockfile(stage)) - assert lockfile.load(dvc, "path.lock") == { + lockfile = Lockfile(dvc, "path.lock") + lockfile.dump(stage) + assert lockfile.load() == { **data, - "s2": {"cmd": "command2", "deps": {}, "outs": {}}, + "s2": {"cmd": "command2"}, } @@ -29,49 +28,57 @@ def test_stage_dump_with_deps_and_outs(tmp_dir, dvc): data = { "s1": { "cmd": "command", - "deps": {"1.txt": "checksum"}, - "outs": {"2.txt": "checksum"}, + "deps": [{"md5": "1.txt", "path": "checksum"}], + "outs": [{"md5": "2.txt", "path": "checksum"}], } } with open("path.lock", "w+") as f: - json.dump(data, f) + yaml.dump(data, f) + lockfile = Lockfile(dvc, "path.lock") stage = PipelineStage(name="s2", repo=dvc, path="path", cmd="command2") - lockfile.dump(dvc, "path.lock", serialize.to_lockfile(stage)) - assert lockfile.load(dvc, "path.lock") == { + lockfile.dump(stage) + assert lockfile.load() == { **data, - "s2": {"cmd": "command2", "deps": {}, "outs": {}}, + "s2": {"cmd": "command2"}, } def test_stage_overwrites_if_already_exists(tmp_dir, dvc): + lockfile = Lockfile(dvc, "path.lock",) stage = PipelineStage(name="s2", repo=dvc, path="path", cmd="command2") - lockfile.dump(dvc, "path.lock", serialize.to_lockfile(stage)) + lockfile.dump(stage) stage = PipelineStage(name="s2", repo=dvc, path="path", cmd="command3") - lockfile.dump(dvc, "path.lock", serialize.to_lockfile(stage)) - assert lockfile.load(dvc, "path.lock") == { - "s2": {"cmd": "command3", "deps": {}, "outs": {}}, + lockfile.dump(stage) + assert lockfile.load() == { + "s2": {"cmd": "command3"}, } def test_load_when_lockfile_does_not_exist(tmp_dir, dvc): - assert {} == lockfile.load(dvc, "dvcfile.lock") + assert {} == Lockfile(dvc, "pipelines.lock").load() @pytest.mark.parametrize( "corrupt_data", [ - {"s1": {"cmd": "command", "outs": {}}}, - {"s1": {"outs": {}}}, - {"s1": {"cmd": "command", "deps": {}}}, + {"s1": {"outs": []}}, {"s1": {}}, - {"s1": {"cmd": "command", "outs": {"file": "checksum"}}}, - {"s1": {"cmd": "command", "deps": {"file": "checksum"}}}, + { + "s1": { + "cmd": "command", + "outs": [ + {"md5": "checksum", "path": "path", "random": "value"} + ], + } + }, + {"s1": {"cmd": "command", "deps": [{"md5": "checksum"}]}}, ], ) def test_load_when_lockfile_is_corrupted(tmp_dir, dvc, corrupt_data): with open("Dvcfile.lock", "w+") as f: - json.dump(corrupt_data, f) - with pytest.raises(lockfile.LockfileCorruptedError) as exc_info: - lockfile.load(dvc, "Dvcfile.lock") + yaml.dump(corrupt_data, f) + lockfile = Lockfile(dvc, "Dvcfile.lock") + with pytest.raises(LockfileCorruptedError) as exc_info: + lockfile.load() assert "Dvcfile.lock" in str(exc_info.value) diff --git a/tests/unit/utils/test_utils.py b/tests/unit/utils/test_utils.py index 196da51656..da7194d5f9 100644 --- a/tests/unit/utils/test_utils.py +++ b/tests/unit/utils/test_utils.py @@ -4,11 +4,15 @@ import pytest from dvc.path_info import PathInfo -from dvc.utils import file_md5, resolve_output -from dvc.utils import fix_env -from dvc.utils import relpath -from dvc.utils import to_chunks -from dvc.utils import tmp_fname +from dvc.utils import ( + file_md5, + resolve_output, + fix_env, + relpath, + to_chunks, + tmp_fname, + parse_target, +) @pytest.mark.parametrize( @@ -128,3 +132,26 @@ def test_resolve_output(inp, out, is_dir, expected, mocker): mocker.patch("os.path.isdir", return_value=is_dir) result = resolve_output(inp, out) assert result == expected + + +@pytest.mark.parametrize( + "inp,out, default", + [ + ["pipelines.yaml", ("pipelines.yaml", None, None), None], + ["pipelines.yaml:name", ("pipelines.yaml", "name", None), None], + [":name", ("pipelines.yaml", "name", None), None], + ["stage.dvc", ("stage.dvc", None, None), None], + ["pipelines.yaml:name@v1", ("pipelines.yaml", "name", "v1"), None], + ["../models/stage.dvc", ("../models/stage.dvc", None, None), "def"], + [":name", ("default", "name", None), "default"], + [":name@v2", ("default", "name", "v2"), "default"], + ], +) +def test_parse_target(inp, out, default): + assert parse_target(inp, default) == out + + +def test_hint_on_lockfile(): + with pytest.raises(Exception) as exc: + assert parse_target("pipelines.lock:name@v223") + assert "pipelines.yaml:name@v223" in str(exc.value)