diff --git a/dvc/command/pipeline.py b/dvc/command/pipeline.py index d788031660..1eb7b281c5 100644 --- a/dvc/command/pipeline.py +++ b/dvc/command/pipeline.py @@ -14,8 +14,8 @@ def _show(self, target, commands, outs, locked): from dvc import dvcfile from dvc.utils import parse_target - path, name = parse_target(target) - stage = dvcfile.Dvcfile(self.repo, path).stages[name] + path, name, tag = parse_target(target) + stage = dvcfile.Dvcfile(self.repo, path, tag=tag).stages[name] G = self.repo.graph stages = networkx.dfs_postorder_nodes(G, stage) if locked: @@ -38,8 +38,8 @@ def _build_graph(self, target, commands=False, outs=False): from dvc.repo.graph import get_pipeline from dvc.utils import parse_target - path, name = parse_target(target) - target_stage = dvcfile.Dvcfile(self.repo, path).stages[name] + path, name, tag = parse_target(target) + target_stage = dvcfile.Dvcfile(self.repo, path, tag=tag).stages[name] G = get_pipeline(self.repo.pipelines, target_stage) nodes = set() diff --git a/dvc/dvcfile.py b/dvc/dvcfile.py index 8757d4c030..e97ef86005 100644 --- a/dvc/dvcfile.py +++ b/dvc/dvcfile.py @@ -1,5 +1,5 @@ +import contextlib import os -import re import logging import dvc.prompt as prompt @@ -7,7 +7,7 @@ from voluptuous import MultipleInvalid from dvc import serialize from dvc.exceptions import DvcException -from dvc.loader import SingleStageLoader, StageLoader +from dvc.stage.loader import SingleStageLoader, StageLoader from dvc.stage.exceptions import ( StageFileBadNameError, StageFileDoesNotExistError, @@ -16,6 +16,7 @@ StageFileAlreadyExistsError, ) from dvc.utils import relpath +from dvc.utils.collections import apply_diff from dvc.utils.stage import ( dump_stage_file, parse_stage, @@ -28,7 +29,6 @@ DVC_FILE_SUFFIX = ".dvc" PIPELINE_FILE = "pipelines.yaml" PIPELINE_LOCK = "pipelines.lock" -TAG_REGEX = r"^(?P.*)@(?P[^\\/@:]*)$" def is_valid_filename(path): @@ -39,7 +39,9 @@ def is_valid_filename(path): def is_dvc_file(path): - return os.path.isfile(path) and is_valid_filename(path) + return os.path.isfile(path) and ( + is_valid_filename(path) or os.path.basename(path) == PIPELINE_LOCK + ) def check_dvc_filename(path): @@ -52,59 +54,46 @@ def check_dvc_filename(path): ) -def _get_path_tag(s): - regex = re.compile(TAG_REGEX) - match = regex.match(s) - if not match: - return s, None - return match.group("path"), match.group("tag") - - -class MultiStageFileLoadError(DvcException): - def __init__(self, file): - super().__init__("Cannot load multi-stage file: '{}'".format(file)) - - class FileMixin: SCHEMA = None - def __init__(self, repo, path): + def __init__(self, repo, path, **kwargs): self.repo = repo - self.path, self.tag = _get_path_tag(path) + self.path = path def __repr__(self): return "{}: {}".format( - DVC_FILE, relpath(self.path, self.repo.root_dir) + self.__class__.__name__, relpath(self.path, self.repo.root_dir) ) + def __hash__(self): + return hash(self.path) + + def __eq__(self, other): + return self.repo == other.repo and os.path.abspath( + self.path + ) == os.path.abspath(other.path) + def __str__(self): - return "{}: {}".format(DVC_FILE, self.relpath) + return "{}: {}".format(self.__class__.__name__, self.relpath) + @property def relpath(self): return relpath(self.path) def exists(self): return self.repo.tree.exists(self.path) - def check_file_exists(self): - if not self.exists(): - raise StageFileDoesNotExistError(self.path) - - def check_isfile(self): - if not self.repo.tree.isfile(self.path): - raise StageFileIsNotDvcFileError(self.path) - - def check_filename(self): - raise NotImplementedError - def _load(self): # it raises the proper exceptions by priority: # 1. when the file doesn't exists # 2. filename is not a DVC-file # 3. path doesn't represent a regular file - self.check_file_exists() + if not self.exists(): + raise StageFileDoesNotExistError(self.path) check_dvc_filename(self.path) - self.check_isfile() + if not self.repo.tree.isfile(self.path): + raise StageFileIsNotDvcFileError(self.path) with self.repo.tree.open(self.path) as fd: stage_text = fd.read() @@ -121,31 +110,28 @@ def validate(cls, d, fname=None): raise StageFileFormatError(fname, exc) def remove_with_prompt(self, force=False): - if not self.exists(): - return - - msg = ( - "'{}' already exists. Do you wish to run the command and " - "overwrite it?".format(relpath(self.path)) - ) - if not (force or prompt.confirm(msg)): - raise StageFileAlreadyExistsError(self.path) + raise NotImplementedError - os.unlink(self.path) + def remove(self): + raise NotImplementedError class SingleStageFile(FileMixin): from dvc.schema import COMPILED_SINGLE_STAGE_SCHEMA as SCHEMA + def __init__(self, repo, path, tag=None): + super().__init__(repo, path) + self.tag = tag + @property def stage(self): data, raw = self._load() - return SingleStageLoader.load_stage(self, data, raw) + return SingleStageLoader.load_stage(self, data, raw, tag=self.tag) @property def stages(self): data, raw = self._load() - return SingleStageLoader(self, data, raw) + return SingleStageLoader(self, data, raw, tag=self.tag) def dump(self, stage, **kwargs): """Dumps given stage appropriately in the dvcfile.""" @@ -159,8 +145,27 @@ def dump(self, stage, **kwargs): dump_stage_file(self.path, serialize.to_single_stage_file(stage)) self.repo.scm.track_file(relpath(self.path)) + def remove_with_prompt(self, force=False): + if not self.exists(): + return + + msg = ( + "'{}' already exists. Do you wish to run the command and " + "overwrite it?".format(relpath(self.path)) + ) + if not (force or prompt.confirm(msg)): + raise StageFileAlreadyExistsError(self.path) + + self.remove() + + def remove(self, force=False): + with contextlib.suppress(FileNotFoundError): + os.unlink(self.path) + class PipelineFile(FileMixin): + """Abstraction for pipelines file, .yaml + .lock combined.""" + from dvc.schema import COMPILED_MULTI_STAGE_SCHEMA as SCHEMA @property @@ -172,6 +177,7 @@ def dump(self, stage, update_pipeline=False): from dvc.stage import PipelineStage assert isinstance(stage, PipelineStage) + check_dvc_filename(self.path) self._dump_lockfile(stage) if update_pipeline and not stage.is_data_source: self._dump_pipeline_file(stage) @@ -191,14 +197,21 @@ def _dump_pipeline_file(self, stage): open(self.path, "w+").close() data["stages"] = data.get("stages", {}) - data["stages"].update(serialize.to_dvcfile(stage)) + stage_data = serialize.to_dvcfile(stage) + if data["stages"].get(stage.name): + orig_stage_data = data["stages"][stage.name] + apply_diff(stage_data[stage.name], orig_stage_data) + else: + data["stages"].update(stage_data) - dump_stage_file(self.path, self.SCHEMA(data)) + dump_stage_file(self.path, data) self.repo.scm.track_file(relpath(self.path)) @property def stage(self): - raise MultiStageFileLoadError(self.path) + raise DvcException( + "PipelineFile has multiple stages. Please specify it's name." + ) @property def stages(self): @@ -208,16 +221,23 @@ def stages(self): lockfile_data = lockfile.load(self.repo, self._lockfile) return StageLoader(self, data.get("stages", {}), lockfile_data) + def remove(self, force=False): + if not force: + logger.warning("Cannot remove pipeline file.") + return + + for file in [self.path, self._lockfile]: + with contextlib.suppress(FileNotFoundError): + os.unlink(file) + class Dvcfile: - def __new__(cls, repo, path): + def __new__(cls, repo, path, **kwargs): assert path assert repo - file, _ = _get_path_tag(path) - _, ext = os.path.splitext(file) - assert not ext or ext in [".yml", ".yaml", ".dvc"] - - if not ext or ext == DVC_FILE_SUFFIX: - return SingleStageFile(repo, path) - return PipelineFile(repo, path) + _, ext = os.path.splitext(path) + if ext in [".yaml", ".yml"]: + return PipelineFile(repo, path, **kwargs) + # fallback to single stage file for better error messages + return SingleStageFile(repo, path, **kwargs) diff --git a/dvc/exceptions.py b/dvc/exceptions.py index 19035f1866..6e6a5e588e 100644 --- a/dvc/exceptions.py +++ b/dvc/exceptions.py @@ -31,6 +31,8 @@ def __init__(self, output, stages): output, "\n".join("\t{}".format(s.addressing) for s in stages) ) super().__init__(msg) + self.stages = stages + self.output = output class OutputNotFoundError(DvcException): diff --git a/dvc/repo/__init__.py b/dvc/repo/__init__.py index 89701c1b72..9040e0dffc 100644 --- a/dvc/repo/__init__.py +++ b/dvc/repo/__init__.py @@ -211,8 +211,8 @@ def collect(self, target, with_deps=False, recursive=False, graph=None): os.path.abspath(target), graph or self.graph ) - file, name, = parse_target(target) - dvcfile = Dvcfile(self, file) + file, name, tag = parse_target(target) + dvcfile = Dvcfile(self, file, tag=tag) stages = list(dvcfile.stages.filter(name).values()) if not with_deps: return stages @@ -229,10 +229,10 @@ def collect_granular(self, target, *args, **kwargs): if not target: return [(stage, None) for stage in self.stages] - file, name = parse_target(target) + file, name, tag = parse_target(target) if is_valid_filename(file) and not kwargs.get("with_deps"): # Optimization: do not collect the graph for a specific .dvc target - stages = Dvcfile(self, file).stages.filter(name) + stages = Dvcfile(self, file, tag=tag).stages.filter(name) return [(stage, None) for stage in stages.values()] try: diff --git a/dvc/repo/add.py b/dvc/repo/add.py index 6a9accce21..ae96520076 100644 --- a/dvc/repo/add.py +++ b/dvc/repo/add.py @@ -8,6 +8,7 @@ from ..exceptions import ( RecursiveAddingWhileUsingFilename, OverlappingOutputPathsError, + OutputDuplicationError, ) from ..output.base import OutputDoesNotExistError from ..progress import Tqdm @@ -68,6 +69,10 @@ def add(repo, targets, recursive=False, no_commit=False, fname=None): raise OverlappingOutputPathsError( exc.parent, exc.overlapping_out, msg ) + except OutputDuplicationError as exc: + raise OutputDuplicationError( + exc.output, set(exc.stages) - set(stages) + ) with Tqdm( total=len(stages), diff --git a/dvc/repo/destroy.py b/dvc/repo/destroy.py index c180778beb..8fbdbee197 100644 --- a/dvc/repo/destroy.py +++ b/dvc/repo/destroy.py @@ -5,7 +5,8 @@ @locked def _destroy_stages(repo): for stage in repo.stages: - stage.remove(remove_outs=False) + stage.unprotect_outs() + stage.dvcfile.remove(force=True) # NOTE: not locking `destroy`, as `remove` will need to delete `.dvc` dir, diff --git a/dvc/repo/imp_url.py b/dvc/repo/imp_url.py index dbf71c0aeb..d2b33f55a9 100644 --- a/dvc/repo/imp_url.py +++ b/dvc/repo/imp_url.py @@ -4,6 +4,7 @@ from dvc.repo.scm_context import scm_context from dvc.utils import resolve_output, resolve_paths, relpath from dvc.utils.fs import path_isin +from ..exceptions import OutputDuplicationError @locked_repo @@ -35,7 +36,12 @@ def imp_url(self, url, out=None, fname=None, erepo=None, locked=True): dvcfile = Dvcfile(self, stage.path) dvcfile.remove_with_prompt(force=True) - self.check_modified_graph([stage]) + try: + self.check_modified_graph([stage]) + except OutputDuplicationError as exc: + raise OutputDuplicationError( + exc.output, set(exc.stages) - set([stage]) + ) stage.run() diff --git a/dvc/repo/lock.py b/dvc/repo/lock.py index f7920c2538..7845d34931 100644 --- a/dvc/repo/lock.py +++ b/dvc/repo/lock.py @@ -6,9 +6,9 @@ def lock(self, target, unlock=False): from .. import dvcfile from dvc.utils import parse_target - path, target = parse_target(target) - dvcfile = dvcfile.Dvcfile(self, path) - stage = dvcfile.stages[target] + path, name, tag = parse_target(target) + dvcfile = dvcfile.Dvcfile(self, path, tag=tag) + stage = dvcfile.stages[name] stage.locked = False if unlock else True dvcfile.dump(stage, update_pipeline=True) diff --git a/dvc/repo/remove.py b/dvc/repo/remove.py index c24ede47ef..f807270441 100644 --- a/dvc/repo/remove.py +++ b/dvc/repo/remove.py @@ -1,14 +1,22 @@ +import logging + from . import locked +from ..utils import parse_target + +logger = logging.getLogger(__name__) @locked def remove(self, target, outs_only=False): from ..dvcfile import Dvcfile - stage = Dvcfile(self, target).stage - if outs_only: + path, name, _ = parse_target(target) + dvcfile = Dvcfile(self, path) + stages = list(dvcfile.stages.filter(name).values()) + for stage in stages: stage.remove_outs(force=True) - else: - stage.remove(force=True) - return stage + if not outs_only: + dvcfile.remove() + + return stages diff --git a/dvc/repo/reproduce.py b/dvc/repo/reproduce.py index a0f90cdc67..e77934e89e 100644 --- a/dvc/repo/reproduce.py +++ b/dvc/repo/reproduce.py @@ -76,12 +76,12 @@ def reproduce( active_graph = _get_active_graph(self.graph) active_pipelines = get_pipelines(active_graph) - path, name = parse_target(target) + path, name, tag = parse_target(target) if pipeline or all_pipelines: if all_pipelines: pipelines = active_pipelines else: - dvcfile = Dvcfile(self, path) + dvcfile = Dvcfile(self, path, tag=tag) stage = dvcfile.stages[name] pipelines = [get_pipeline(active_pipelines, stage)] diff --git a/dvc/repo/run.py b/dvc/repo/run.py index cef9c7d302..49924d8e20 100644 --- a/dvc/repo/run.py +++ b/dvc/repo/run.py @@ -2,9 +2,16 @@ from . import locked from .scm_context import scm_context +from dvc.stage.exceptions import DuplicateStageName, InvalidStageName from funcy import first, concat +from ..exceptions import OutputDuplicationError + + +def is_valid_name(name: str): + return not {"\\", "/", "@", ":"} & set(name) + def _get_file_path(kwargs): from dvc.dvcfile import DVC_FILE_SUFFIX, DVC_FILE @@ -34,19 +41,32 @@ def run(self, fname=None, no_exec=False, **kwargs): from dvc.dvcfile import Dvcfile, PIPELINE_FILE stage_cls, path = PipelineStage, PIPELINE_FILE - if not kwargs.get("name"): + stage_name = kwargs.get("name") + if not stage_name: kwargs.pop("name", None) stage_cls, path = Stage, fname or _get_file_path(kwargs) + else: + if not is_valid_name(stage_name): + raise InvalidStageName stage = create_stage(stage_cls, repo=self, path=path, **kwargs) if stage is None: return None dvcfile = Dvcfile(self, stage.path) - if stage_cls != PipelineStage and dvcfile.exists(): - dvcfile.remove_with_prompt(force=kwargs.get("overwrite", True)) + if dvcfile.exists(): + if stage_name and stage_name in dvcfile.stages: + raise DuplicateStageName(stage_name, dvcfile) + if stage_cls != PipelineStage: + dvcfile.remove_with_prompt(force=kwargs.get("overwrite", True)) + + try: + self.check_modified_graph([stage]) + except OutputDuplicationError as exc: + raise OutputDuplicationError( + exc.output, set(exc.stages) - set([stage]) + ) - self.check_modified_graph([stage]) if not no_exec: stage.run(no_commit=kwargs.get("no_commit", False)) dvcfile.dump(stage, update_pipeline=True) diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index 7e453f0c83..c101262aa3 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -186,9 +186,6 @@ def is_data_source(self): """Whether the DVC-file was created with `dvc add` or `dvc import`""" return self.cmd is None - def changed_md5(self): - return self.md5 != self._compute_md5() - @property def is_callback(self): """ @@ -251,11 +248,11 @@ def _changed_outs(self): return False - def _changed_md5(self): - if self.changed_md5(): + def stage_changed(self, warn=False): + changed = self.md5 != self._compute_md5() + if changed and warn: logger.warning("DVC-file '{}' changed.".format(self.relpath)) - return True - return False + return changed @rwlocked(read=["deps", "outs"]) def changed(self): @@ -269,7 +266,9 @@ def changed(self): def _changed(self): # Short-circuit order: stage md5 is fast, deps are expected to change return ( - self._changed_md5() or self._changed_deps() or self._changed_outs() + self.stage_changed(warn=True) + or self._changed_deps() + or self._changed_outs() ) @rwlocked(write=["outs"]) @@ -280,8 +279,8 @@ def remove_outs(self, ignore_remove=False, force=False): out.unprotect() else: logger.debug( - "Removing output '{out}' of '{stage}'.".format( - out=out, stage=self.relpath + "Removing output '{out}' of {stage}.".format( + out=out, stage=self ) ) out.remove(ignore_remove=ignore_remove) @@ -296,7 +295,7 @@ def remove(self, force=False, remove_outs=True): self.remove_outs(ignore_remove=True, force=force) else: self.unprotect_outs() - os.unlink(self.path) + self.dvcfile.remove() @rwlocked(read=["deps"], write=["outs"]) def reproduce(self, interactive=False, **kwargs): @@ -507,14 +506,14 @@ def check_can_commit(self, force): changed_deps = self._changed_entries(self.deps) changed_outs = self._changed_entries(self.outs) - if changed_deps or changed_outs or self.changed_md5(): + if changed_deps or changed_outs or self.stage_changed(): msg = ( "dependencies {}".format(changed_deps) if changed_deps else "" ) msg += " and " if (changed_deps and changed_outs) else "" msg += "outputs {}".format(changed_outs) if changed_outs else "" msg += "md5" if not (changed_deps or changed_outs) else "" - msg += " of '{}' changed. ".format(self.relpath) + msg += " of {} changed. ".format(self) msg += "Are you sure you want to commit it?" if not force and not prompt.confirm(msg): raise StageCommitError( @@ -641,7 +640,7 @@ def run(self, dry=False, no_commit=False, force=False): if not dry: if ( not force - and not self._changed_md5() + and not self.stage_changed(warn=True) and self._already_cached() ): self.outs[0].checkout() @@ -719,6 +718,9 @@ def _status(entries): return ret + def stage_status(self): + return ["changed checksum"] if self.stage_changed() else [] + @rwlocked(read=["deps", "outs"]) def status(self, check_updates=False): ret = [] @@ -734,14 +736,12 @@ def status(self, check_updates=False): if outs_status: ret.append({"changed outs": outs_status}) - if self.changed_md5(): - ret.append("changed checksum") - + ret.extend(self.stage_status()) if self.is_callback or self.always_changed: ret.append("always changed") if ret: - return {self.relpath: ret} + return {self.addressing: ret} return {} @@ -795,14 +795,17 @@ def __str__(self): def addressing(self): return super().addressing + ":" + self.name - def _changed(self): - if self.cmd_changed: - logger.warning("'cmd' of {} has changed.".format(self)) - return self.cmd_changed or self._changed_deps() or self._changed_outs() - def reload(self): return self.dvcfile.stages[self.name] @property def is_cached(self): return self.name in self.dvcfile.stages and super().is_cached + + def stage_status(self): + return ["changed command"] if self.cmd_changed else [] + + def stage_changed(self, warn=False): + if self.cmd_changed and warn: + logger.warning("'cmd' of {} has changed.".format(self)) + return self.cmd_changed diff --git a/dvc/stage/exceptions.py b/dvc/stage/exceptions.py index 495d60f658..cd6912ab36 100644 --- a/dvc/stage/exceptions.py +++ b/dvc/stage/exceptions.py @@ -92,3 +92,39 @@ def __init__(self, missing_files): msg = "missing data '{}': {}".format(source, ", ".join(missing_files)) super().__init__(msg) + + +class StageNotFound(KeyError, DvcException): + def __init__(self, file, name): + super().__init__( + "Stage with '{}' name not found inside '{}' file".format( + name, file.relpath + ) + ) + + +class StageNameUnspecified(DvcException): + def __init__(self, file): + super().__init__( + "Stage name not provided. " + "\nPlease specify the name as: `{0}:stage_name`".format( + file.relpath + ) + ) + + +class DuplicateStageName(DvcException): + def __init__(self, name, file): + super().__init__( + "Stage with name '{name}' already exists in '{relpath}'.".format( + name=name, relpath=file.relpath + ) + ) + + +class InvalidStageName(DvcException): + def __init__(self,): + super().__init__( + "Stage name cannot contain invalid characters: " + "'\\', '/', '@' and ':'." + ) diff --git a/dvc/loader.py b/dvc/stage/loader.py similarity index 78% rename from dvc/loader.py rename to dvc/stage/loader.py index f2f849c9cc..1d8a9f0271 100644 --- a/dvc/loader.py +++ b/dvc/stage/loader.py @@ -6,21 +6,11 @@ from itertools import chain from dvc import dependency, output -from dvc.exceptions import DvcException -from dvc.utils import relpath +from .exceptions import StageNameUnspecified, StageNotFound logger = logging.getLogger(__name__) -class StageNotFound(KeyError, DvcException): - def __init__(self, file, name): - super().__init__( - "Stage with '{}' name not found inside '{}' file".format( - name, relpath(file) - ) - ) - - def resolve_paths(path, wdir=None): path = os.path.abspath(path) wdir = wdir or os.curdir @@ -35,31 +25,31 @@ def __init__(self, dvcfile, stages_data, lockfile_data=None): self.lockfile_data = lockfile_data or {} def filter(self, item=None): - data = self.stages_data - if item: - data = {item: data.get(item, {})} - return self.__class__(self.dvcfile, data, self.lockfile_data,) + if not item: + return self + + data = {item: self.stages_data[item]} if item in self else {} + return self.__class__(self.dvcfile, data, self.lockfile_data) @staticmethod def _fill_lock_checksums(stage, lock_data): - from .stage import Stage + from .params import StageParams - outs = stage.outs if not stage.cmd_changed else [] items = chain( - ((Stage.PARAM_DEPS, dep) for dep in stage.deps), - ((Stage.PARAM_OUTS, out) for out in outs), + ((StageParams.PARAM_DEPS, dep) for dep in stage.deps), + ((StageParams.PARAM_OUTS, out) for out in stage.outs), ) checksums = { key: {item["path"]: item["md5"] for item in lock_data.get(key, {})} - for key in [Stage.PARAM_DEPS, Stage.PARAM_OUTS] + for key in [StageParams.PARAM_DEPS, StageParams.PARAM_OUTS] } for key, item in items: item.checksum = checksums.get(key, {}).get(item.def_path) @classmethod def load_stage(cls, dvcfile, name, stage_data, lock_data): - from .stage import PipelineStage, Stage, loads_from + from . import PipelineStage, Stage, loads_from path, wdir = resolve_paths( dvcfile.path, stage_data.get(Stage.PARAM_WDIR) @@ -77,13 +67,17 @@ def load_stage(cls, dvcfile, name, stage_data, lock_data): return stage def __getitem__(self, name): + if not name: + raise StageNameUnspecified(self.dvcfile) + if name not in self: - raise StageNotFound(self.dvcfile.path, name) + raise StageNotFound(self.dvcfile, name) if not self.lockfile_data.get(name): logger.warning( "No lock entry found for '%s:%s'", self.dvcfile.relpath, name ) + return self.load_stage( self.dvcfile, name, @@ -102,10 +96,11 @@ def __contains__(self, name): class SingleStageLoader(collections.abc.Mapping): - def __init__(self, dvcfile, stage_data, stage_text=None): + def __init__(self, dvcfile, stage_data, stage_text=None, tag=None): self.dvcfile = dvcfile self.stage_data = stage_data or {} self.stage_text = stage_text + self.tag = tag def filter(self, item=None): return self @@ -120,16 +115,16 @@ def __getitem__(self, item): # during `load`, we remove attributes from stage data, so as to # not duplicate, therefore, for MappingView, we need to deepcopy. return self.load_stage( - self.dvcfile, deepcopy(self.stage_data), self.stage_text + self.dvcfile, deepcopy(self.stage_data), self.stage_text, self.tag ) @classmethod - def load_stage(cls, dvcfile, d, stage_text): + def load_stage(cls, dvcfile, d, stage_text, tag=None): from dvc.stage import Stage, loads_from path, wdir = resolve_paths(dvcfile.path, d.get(Stage.PARAM_WDIR)) stage = loads_from(Stage, dvcfile.repo, path, wdir, d) - stage._stage_text, stage.tag = stage_text, dvcfile.tag + stage._stage_text, stage.tag = stage_text, tag stage.deps = dependency.loadd_from( stage, d.get(Stage.PARAM_DEPS) or [] ) diff --git a/dvc/utils/__init__.py b/dvc/utils/__init__.py index 7524cf692a..08a3e11442 100644 --- a/dvc/utils/__init__.py +++ b/dvc/utils/__init__.py @@ -23,7 +23,9 @@ LOCAL_CHUNK_SIZE = 2 ** 20 # 1 MB LARGE_FILE_SIZE = 2 ** 30 # 1 GB LARGE_DIR_SIZE = 100 -TARGET_REGEX = re.compile(r"^(?P.*):(?P[^\\/@:]*)$") +TARGET_REGEX = re.compile( + r"(?P.*?)(:(?P[^\\/@:]*))??(@(?P[^\\/@:]*))??$" +) def dos2unix(data): @@ -378,15 +380,27 @@ def format_link(link): def parse_target(target, default=None): - from dvc.dvcfile import PIPELINE_FILE + from dvc.dvcfile import PIPELINE_FILE, PIPELINE_LOCK if not target: - return None, None + return None, None, None match = TARGET_REGEX.match(target) if not match: - return target, None - path, name = match.group("path"), match.group("name") + return target, None, None + + path, name, tag = ( + match.group("path"), + match.group("name"), + match.group("tag"), + ) if not path: - logger.warning("Assuming file to be '%s'", default) - return path or default or PIPELINE_FILE, name + path = default or PIPELINE_FILE + logger.warning("Assuming file to be '%s'", path) + + if os.path.basename(path) == PIPELINE_LOCK: + raise Exception( + "Did you mean: `{}`?".format(target.replace(".lock", ".yaml", 1)) + ) + + return path, name, tag diff --git a/tests/dir_helpers.py b/tests/dir_helpers.py index ad374a2dc8..1eac1e59c4 100644 --- a/tests/dir_helpers.py +++ b/tests/dir_helpers.py @@ -60,6 +60,7 @@ "tmp_dir", "scm", "dvc", + "local_remote", "run_copy", "erepo_dir", "git_dir", @@ -256,6 +257,17 @@ def _git_init(path): git.close() +@pytest.fixture +def local_remote(request, tmp_dir, dvc, make_tmp_dir): + path = make_tmp_dir("local-remote") + with dvc.config.edit() as conf: + conf["remote"]["upstream"] = {"url": fspath(path)} + conf["core"]["remote"] = "upstream" + if "scm" in request.fixturenames: + tmp_dir.scm_add([dvc.config.files["repo"]], commit="add remote") + return path + + @pytest.fixture def run_copy(tmp_dir, dvc): tmp_dir.gen( diff --git a/tests/func/test_add.py b/tests/func/test_add.py index 7458156500..2c6ee387c6 100644 --- a/tests/func/test_add.py +++ b/tests/func/test_add.py @@ -12,11 +12,15 @@ import dvc as dvc_module from dvc.cache import Cache from dvc.dvcfile import DVC_FILE_SUFFIX -from dvc.exceptions import DvcException, OverlappingOutputPathsError -from dvc.exceptions import RecursiveAddingWhileUsingFilename -from dvc.exceptions import StageFileCorruptedError +from dvc.exceptions import ( + DvcException, + OverlappingOutputPathsError, + OutputDuplicationError, + RecursiveAddingWhileUsingFilename, + StageFileCorruptedError, +) from dvc.main import main -from dvc.output.base import OutputAlreadyTrackedError +from dvc.output.base import OutputAlreadyTrackedError, OutputIsStageFileError from dvc.remote import LocalRemote from dvc.repo import Repo as DvcRepo from dvc.stage import Stage @@ -703,3 +707,20 @@ def test_add_optimization_for_hardlink_on_empty_files(tmp_dir, dvc, mocker): for stage in stages: assert os.path.exists(stage.path) assert os.path.exists(stage.outs[0].cache_path) + + +def test_output_duplication_for_pipeline_tracked(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + with pytest.raises(OutputDuplicationError): + dvc.add("bar") + + +def test_add_pipeline_file(tmp_dir, dvc, run_copy): + from dvc.dvcfile import PIPELINE_FILE + + tmp_dir.dvc_gen("foo", "foo") + run_copy("foo", "bar", name="copy-foo-bar") + + with pytest.raises(OutputIsStageFileError): + dvc.add(PIPELINE_FILE) diff --git a/tests/func/test_data_cloud.py b/tests/func/test_data_cloud.py index cea969b123..6a753c7b77 100644 --- a/tests/func/test_data_cloud.py +++ b/tests/func/test_data_cloud.py @@ -764,3 +764,38 @@ def test_pull_external_dvc_imports(tmp_dir, dvc, scm, erepo_dir): assert (tmp_dir / "new_dir").exists() assert (tmp_dir / "new_dir" / "bar").read_text() == "bar" + + +def test_dvc_pull_pipeline_stages(tmp_dir, dvc, local_remote, run_copy): + (stage0,) = tmp_dir.dvc_gen("foo", "foo") + stage1 = run_copy("foo", "bar") + stage2 = run_copy("bar", "foobar", name="copy-bar-foobar") + outs = ["foo", "bar", "foobar"] + + def clean(): + for path in [*outs, dvc.cache.local.cache_dir]: + remove(path) + os.makedirs(dvc.cache.local.cache_dir, exist_ok=True) + + dvc.push() + clean() + dvc.pull() + assert all((tmp_dir / file).exists() for file in outs) + + for out, stage in zip(outs, [stage0, stage1, stage2]): + for target in [stage.addressing, out]: + clean() + stats = dvc.pull([target]) + assert stats["downloaded"] == 1 + assert stats["added"] == [out] + assert os.path.exists(out) + assert not any(os.path.exists(out) for out in set(outs) - {out}) + + clean() + stats = dvc.pull([stage2.addressing], with_deps=True) + assert len(stats["added"]) == 3 + assert set(stats["added"]) == set(outs) + + clean() + stats = dvc.pull([os.curdir], recursive=True) + assert set(stats["added"]) == set(outs) diff --git a/tests/func/test_dvcfile.py b/tests/func/test_dvcfile.py index 8ae0752507..9eda67757e 100644 --- a/tests/func/test_dvcfile.py +++ b/tests/func/test_dvcfile.py @@ -1,8 +1,11 @@ import pytest from dvc.dvcfile import Dvcfile, PIPELINE_FILE -from dvc.loader import StageNotFound -from dvc.stage.exceptions import StageFileDoesNotExistError +from dvc.stage.loader import StageNotFound +from dvc.stage.exceptions import ( + StageFileDoesNotExistError, + StageNameUnspecified, +) def test_run_load_one_for_multistage(tmp_dir, dvc): @@ -109,8 +112,8 @@ def test_load_singlestage(tmp_dir, dvc): assert Dvcfile(dvc, "foo2.dvc").stage == stage1 -def test_load_multistage(tmp_dir, dvc): - from dvc.dvcfile import MultiStageFileLoadError +def test_try_get_single_stage_from_pipeline_file(tmp_dir, dvc): + from dvc.dvcfile import DvcException tmp_dir.gen("foo", "foo") dvc.run( @@ -120,8 +123,8 @@ def test_load_multistage(tmp_dir, dvc): metrics=["foo2"], always_changed=True, ) - with pytest.raises(MultiStageFileLoadError): - Dvcfile(dvc, PIPELINE_FILE).stage + with pytest.raises(DvcException): + assert Dvcfile(dvc, PIPELINE_FILE).stage def test_stage_collection(tmp_dir, dvc): @@ -144,3 +147,36 @@ def test_stage_collection(tmp_dir, dvc): cmd="cp bar bar2", deps=["bar"], metrics=["bar2"], always_changed=True, ) assert {s for s in dvc.stages} == {stage1, stage3, stage2} + + +def test_stage_filter(tmp_dir, dvc, run_copy): + tmp_dir.gen("foo", "foo") + stage1 = run_copy("foo", "bar", name="copy-foo-bar") + stage2 = run_copy("bar", "bax", name="copy-bar-bax") + stage3 = run_copy("bax", "baz", name="copy-bax-baz") + stages = Dvcfile(dvc, PIPELINE_FILE).stages + assert set(stages.filter(None).values()) == { + stage1, + stage2, + stage3, + } + assert set(stages.filter("copy-bar-bax").values()) == {stage2} + assert stages.filter("copy-bar-bax").get("copy-bar-bax") == stage2 + with pytest.raises(StageNameUnspecified): + stages.get(None) + + with pytest.raises(StageNotFound): + assert stages["unknown"] + + assert not stages.filter("unknown").values() + + +def test_stage_filter_in_singlestage_file(tmp_dir, dvc, run_copy): + tmp_dir.gen("foo", "foo") + stage = run_copy("foo", "bar") + dvcfile = Dvcfile(dvc, stage.path) + assert set(dvcfile.stages.filter(None).values()) == {stage} + assert dvcfile.stages.filter(None).get(None) == stage + assert dvcfile.stages.filter("copy-bar-bax").get("copy-bar-bax") == stage + assert dvcfile.stages.filter("copy-bar-bax").get(None) == stage + assert set(dvcfile.stages.filter("unknown").values()) == {stage} diff --git a/tests/func/test_remove.py b/tests/func/test_remove.py index 7df47300bc..8ce0e0e490 100644 --- a/tests/func/test_remove.py +++ b/tests/func/test_remove.py @@ -17,13 +17,13 @@ def test(self): self.assertEqual(len(stages), 1) stage = stages[0] self.assertTrue(stage is not None) - stage_removed = self.dvc.remove(stage.path, outs_only=True) + (stage_removed,) = self.dvc.remove(stage.path, outs_only=True) self.assertIsInstance(stage_removed, Stage) self.assertEqual(stage.path, stage_removed.path) self.assertFalse(os.path.isfile(self.FOO)) - stage_removed = self.dvc.remove(stage.path) + (stage_removed,) = self.dvc.remove(stage.path) self.assertIsInstance(stage_removed, Stage) self.assertEqual(stage.path, stage_removed.path) self.assertFalse(os.path.isfile(self.FOO)) @@ -60,7 +60,7 @@ def test(self): self.assertEqual(len(stages), 1) stage_add = stages[0] self.assertTrue(stage_add is not None) - stage_removed = self.dvc.remove(stage_add.path) + (stage_removed,) = self.dvc.remove(stage_add.path) self.assertEqual(stage_add.path, stage_removed.path) self.assertFalse(os.path.exists(self.DATA_DIR)) self.assertFalse(os.path.exists(stage_removed.path)) diff --git a/tests/func/test_repo.py b/tests/func/test_repo.py index e907674027..b9abf6dc22 100644 --- a/tests/func/test_repo.py +++ b/tests/func/test_repo.py @@ -1,6 +1,5 @@ import os -from dvc.scm.git.tree import GitTree from dvc.cache import Cache from dvc.repo import Repo from dvc.system import System @@ -49,12 +48,27 @@ def collect_outs(*args, **kwargs): scm.commit("Add buzz") assert collect_outs("bar.dvc", with_deps=True) == {"foo", "bar"} - - dvc.tree = GitTree(scm.repo, "new-branch") - assert collect_outs("buzz.dvc", with_deps=True) == {"foo", "bar", "buzz"} assert collect_outs("buzz.dvc", with_deps=False) == {"buzz"} + run_copy("foo", "foobar", name="copy-foo-foobar") + assert collect_outs(":copy-foo-foobar") == {"foobar"} + assert collect_outs(":copy-foo-foobar", with_deps=True) == { + "foobar", + "foo", + } + assert collect_outs("pipelines.yaml:copy-foo-foobar", recursive=True) == { + "foobar" + } + + run_copy("foobar", "baz", name="copy-foobar-baz") + assert collect_outs("pipelines.yaml") == {"foobar", "baz"} + assert collect_outs("pipelines.yaml", with_deps=True) == { + "foobar", + "baz", + "foo", + } + def test_stages(tmp_dir, dvc): def stages(): diff --git a/tests/func/test_repro.py b/tests/func/test_repro.py index 98743b598a..d45c60beba 100644 --- a/tests/func/test_repro.py +++ b/tests/func/test_repro.py @@ -560,7 +560,13 @@ def test(self): def test_non_existing(self): with self.assertRaises(StageFileDoesNotExistError): - self.dvc.lock_stage("non-existing-stage") + self.dvc.lock_stage("Dvcfile") + self.dvc.lock_stage("pipelines.yaml") + self.dvc.lock_stage("pipelines.yaml:name") + self.dvc.lock_stage("Dvcfile:name") + self.dvc.lock_stage("stage.dvc") + self.dvc.lock_stage("stage.dvc:name") + self.dvc.lock_stage("not-existing-stage.json") ret = main(["lock", "non-existing-stage"]) self.assertNotEqual(ret, 0) diff --git a/tests/func/test_repro_multistage.py b/tests/func/test_repro_multistage.py index d648cd9d41..73f31e0547 100644 --- a/tests/func/test_repro_multistage.py +++ b/tests/func/test_repro_multistage.py @@ -228,7 +228,7 @@ def test_downstream(tmp_dir, dvc): def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy): - from dvc.dvcfile import PipelineFile + from dvc.dvcfile import PipelineFile, PIPELINE_FILE tmp_dir.gen("foo", "foo") stage = run_copy("foo", "bar", name="copy-file") @@ -238,6 +238,9 @@ def test_repro_when_cmd_changes(tmp_dir, dvc, run_copy): stage.cmd = " ".join(stage.cmd.split()) # change cmd spacing by two PipelineFile(dvc, PIPELINE_FILE)._dump_pipeline_file(stage) + assert dvc.status([target]) == { + PIPELINE_FILE + target: ["changed command"] + } assert dvc.reproduce(target)[0] == stage diff --git a/tests/func/test_run_multistage.py b/tests/func/test_run_multistage.py index 8a1c642a24..2f72722327 100644 --- a/tests/func/test_run_multistage.py +++ b/tests/func/test_run_multistage.py @@ -1,6 +1,8 @@ import pytest import os +from dvc.stage.exceptions import InvalidStageName, DuplicateStageName + def test_run_with_name(tmp_dir, dvc, run_copy): from dvc.stage import PipelineStage @@ -155,3 +157,16 @@ def test_run_dump_on_multistage(tmp_dir, dvc): **data["stages"], } } + + +def test_run_with_invalid_stage_name(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + with pytest.raises(InvalidStageName): + run_copy("foo", "bar", name="email@https://dvc.org") + + +def test_run_already_exists(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + run_copy("foo", "bar", name="copy") + with pytest.raises(DuplicateStageName): + run_copy("bar", "foobar", name="copy") diff --git a/tests/func/test_stage.py b/tests/func/test_stage.py index de59911251..a0e03a7967 100644 --- a/tests/func/test_stage.py +++ b/tests/func/test_stage.py @@ -158,7 +158,7 @@ def test_md5_ignores_comments(tmp_dir, dvc): f.write("# End comment\n") new_stage = SingleStageFile(dvc, stage.path).stage - assert not new_stage.changed_md5() + assert not new_stage.stage_changed() def test_meta_is_preserved(tmp_dir, dvc): @@ -192,3 +192,18 @@ def test_parent_repo_collect_stages(tmp_dir, scm, dvc): assert stages == [] assert subrepo_stages != [] + + +def test_stage_addressing(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + stage1 = run_copy("foo", "bar") + assert stage1.addressing == "bar.dvc" + + stage2 = run_copy("bar", "baz", name="copy-bar-baz") + assert stage2.addressing == "pipelines.yaml:copy-bar-baz" + + folder = tmp_dir / "dir" + folder.mkdir() + with folder.chdir(): + assert stage1.addressing == "../bar.dvc" + assert stage2.addressing == "../pipelines.yaml:copy-bar-baz" diff --git a/tests/func/test_status.py b/tests/func/test_status.py index a34a355f3d..b958d74013 100644 --- a/tests/func/test_status.py +++ b/tests/func/test_status.py @@ -66,3 +66,30 @@ def test_status_before_and_after_dvc_init(tmp_dir, dvc, git_dir): "file ({})".format(fspath(git_dir)): "update available" } } + + +def test_status_on_pipeline_stages(tmp_dir, dvc, run_copy): + tmp_dir.dvc_gen("foo", "foo") + stage = run_copy("foo", "bar", name="copy-foo-bar") + + stage.cmd = " ".join(stage.cmd.split()) + stage.dvcfile._dump_pipeline_file(stage) + assert dvc.status() == {"pipelines.yaml:copy-foo-bar": ["changed command"]} + + # delete outputs + (tmp_dir / "bar").unlink() + assert dvc.status() == { + "pipelines.yaml:copy-foo-bar": [ + {"changed outs": {"bar": "deleted"}}, + "changed command", + ] + } + (tmp_dir / "foo").unlink() + assert dvc.status() == { + "foo.dvc": [{"changed outs": {"foo": "deleted"}}], + "pipelines.yaml:copy-foo-bar": [ + {"changed deps": {"foo": "deleted"}}, + {"changed outs": {"bar": "deleted"}}, + "changed command", + ], + } diff --git a/tests/unit/test_dvcfile.py b/tests/unit/test_dvcfile.py new file mode 100644 index 0000000000..41982e5ca2 --- /dev/null +++ b/tests/unit/test_dvcfile.py @@ -0,0 +1,26 @@ +import pytest + +from dvc.dvcfile import Dvcfile, PipelineFile, SingleStageFile + + +@pytest.mark.parametrize( + "path", + [ + "pipelines.yaml", + "pipelines.yml", + "custom-pipelines.yml", + "custom-pipelines.yaml", + "../models/pipelines.yml", + ], +) +def test_pipelines_file(path): + file_obj = Dvcfile(object(), path) + assert isinstance(file_obj, PipelineFile) + + +@pytest.mark.parametrize( + "path", ["Dvcfile", "stage.dvc", "../models/stage.dvc"] +) +def test_pipelines_single_stage_file(path): + file_obj = Dvcfile(object(), path) + assert isinstance(file_obj, SingleStageFile) diff --git a/tests/unit/utils/test_utils.py b/tests/unit/utils/test_utils.py index 196da51656..5f05be088d 100644 --- a/tests/unit/utils/test_utils.py +++ b/tests/unit/utils/test_utils.py @@ -4,11 +4,15 @@ import pytest from dvc.path_info import PathInfo -from dvc.utils import file_md5, resolve_output -from dvc.utils import fix_env -from dvc.utils import relpath -from dvc.utils import to_chunks -from dvc.utils import tmp_fname +from dvc.utils import ( + file_md5, + resolve_output, + fix_env, + relpath, + to_chunks, + tmp_fname, + parse_target, +) @pytest.mark.parametrize( @@ -128,3 +132,26 @@ def test_resolve_output(inp, out, is_dir, expected, mocker): mocker.patch("os.path.isdir", return_value=is_dir) result = resolve_output(inp, out) assert result == expected + + +@pytest.mark.parametrize( + "inp,out, default", + [ + ["pipelines.yaml", ("pipelines.yaml", None, None), None], + ["pipelines.yaml:name", ("pipelines.yaml", "name", None), None], + [":name", ("pipelines.yaml", "name", None), None], + ["stage.dvc", ("stage.dvc", None, None), None], + ["pipelines.yaml:name@v1", ("pipelines.yaml", "name", "v1"), None], + ["../models/stage.dvc", ("../models/stage.dvc", None, None), "def"], + [":name", ("default", "name", None), "default"], + [":name@v2", ("default", "name", "v2"), "default"], + ], +) +def test_parse_target(inp, out, default): + assert parse_target(inp, default) == out + + +def test_hint_on_lockfile(): + with pytest.raises(Exception) as exc: + assert parse_target("pipelines.lock:name@v223") + assert "pipelines.yaml:name@v223" in exc.value