Skip to content

Commit

Permalink
Merge pull request #1400 from vernt/deterministic
Browse files Browse the repository at this point in the history
Add dvc run --deterministic option.
  • Loading branch information
efiop authored Dec 4, 2018
2 parents 2245ae0 + ee16d2f commit 9f4dd2d
Show file tree
Hide file tree
Showing 6 changed files with 429 additions and 53 deletions.
21 changes: 20 additions & 1 deletion dvc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,28 @@ def parse_args(argv=None):
'--yes',
action='store_true',
default=False,
help="Automatic 'yes' answer to all prompts. E.g. "
help="(OBSOLETED, use --overwrite-dvcfile instead) "
"Automatic 'yes' answer to all prompts. E.g. "
"when '.dvc' file exists and dvc asks if you "
"want to overwrite it.")
run_parser.add_argument(
'--overwrite-dvcfile',
action='store_true',
default=False,
help="Overwrite existing dvc file without asking "
"for confirmation.")
run_parser.add_argument(
'--ignore-build-cache',
action='store_true',
default=False,
help="Run this stage even if it has been already "
"ran with the same command/dependencies/outputs/etc "
"before.")
run_parser.add_argument(
'--remove-outs',
action='store_true',
default=False,
help="Remove outputs before running the command.")
run_parser.add_argument(
'command',
nargs=argparse.REMAINDER,
Expand Down
9 changes: 5 additions & 4 deletions dvc/command/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ def _joined_cmd(self):
return cmd

def run(self):
overwrite = (self.args.yes or self.args.overwrite_dvcfile)
try:
if self.args.yes:
self.project.prompt.default = True

self.project.run(cmd=self._joined_cmd(),
outs=self.args.outs,
outs_no_cache=self.args.outs_no_cache,
metrics_no_cache=self.args.metrics_no_cache,
deps=self.args.deps,
fname=self.args.file,
cwd=self.args.cwd,
no_exec=self.args.no_exec)
no_exec=self.args.no_exec,
overwrite=overwrite,
ignore_build_cache=self.args.ignore_build_cache,
remove_outs=self.args.remove_outs)
except DvcException as ex:
self.project.logger.error('Failed to run command', ex)
return 1
Expand Down
43 changes: 28 additions & 15 deletions dvc/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,16 +325,21 @@ def move(self, from_path, to_path):
def _unprotect_file(self, path):
import stat
import uuid
from dvc.system import System
from dvc.utils import copyfile, move, remove

self.logger.debug("Unprotecting '{}'".format(path))
if System.is_symlink(path) or System.is_hardlink(path):
self.logger.debug("Unprotecting '{}'".format(path))

tmp = os.path.join(os.path.dirname(path), '.' + str(uuid.uuid4()))
move(path, tmp)
tmp = os.path.join(os.path.dirname(path), '.' + str(uuid.uuid4()))
move(path, tmp)

copyfile(tmp, path)
copyfile(tmp, path)

remove(tmp)
remove(tmp)
else:
self.logger.debug("Skipping copying for '{}', since it is not "
"a symlink or a hardlink.".format(path))

os.chmod(path, os.stat(path).st_mode | stat.S_IWRITE)

Expand Down Expand Up @@ -363,18 +368,26 @@ def run(self,
fname=None,
cwd=os.curdir,
no_exec=False,
overwrite=False):
overwrite=False,
ignore_build_cache=False,
remove_outs=False):
from dvc.stage import Stage

stage = Stage.loads(project=self,
fname=fname,
cmd=cmd,
cwd=cwd,
outs=outs,
outs_no_cache=outs_no_cache,
metrics_no_cache=metrics_no_cache,
deps=deps,
overwrite=overwrite)
with self.state:
stage = Stage.loads(project=self,
fname=fname,
cmd=cmd,
cwd=cwd,
outs=outs,
outs_no_cache=outs_no_cache,
metrics_no_cache=metrics_no_cache,
deps=deps,
overwrite=overwrite,
ignore_build_cache=ignore_build_cache,
remove_outs=remove_outs)

if stage is None:
return None

all_stages = self.stages()

Expand Down
120 changes: 89 additions & 31 deletions dvc/stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ def __init__(self, fname):
super(StageFileDoesNotExistError, self).__init__(msg)


class StageFileAlreadyExistsError(DvcException):
def __init__(self, relpath):
msg = "Stage '{}' already exists".format(relpath)
super(StageFileAlreadyExistsError, self).__init__(msg)


class StageFileIsNotDvcFileError(DvcException):
def __init__(self, fname):
msg = "'{}' is not a dvc file".format(fname)
Expand Down Expand Up @@ -149,7 +155,10 @@ def is_import(self):
len(self.deps) == 1 and \
len(self.outs) == 1

def _changed_deps(self, print_info, log):
def _changed_deps(self, log):
if self.locked:
return False

if self.is_callback:
msg = "Dvc file '{}' is a 'callback' stage (has a command and " \
"no dependencies) and thus always considered as changed."
Expand All @@ -159,37 +168,35 @@ def _changed_deps(self, print_info, log):
for dep in self.deps:
if not dep.changed():
continue
if print_info:
msg = "Dependency '{}' of '{}' changed."
log(msg.format(dep, self.relpath))
log("Dependency '{}' of '{}' changed.".format(dep, self.relpath))
return True

return False

def changed(self, print_info=False):
ret = False
def _changed_outs(self, log):
for out in self.outs:
if not out.changed():
continue
log("Output '{}' of '{}' changed.".format(out, self.relpath))
return True

return False

def _changed_md5(self, log):
if self.changed_md5():
log("Dvc file '{}' changed.".format(self.relpath))
return True
return False

def changed(self, print_info=False):
if print_info:
log = self.project.logger.info
else:
log = self.project.logger.debug

if not self.locked:
ret = self._changed_deps(print_info, log)

for out in self.outs:
if not out.changed():
continue
if print_info:
msg = "Output '{}' of '{}' changed."
log(msg.format(out, self.relpath))
ret = True

if self.changed_md5():
if print_info:
msg = "Dvc file '{}' changed."
log(msg.format(self.relpath))
ret = True
ret = any([self._changed_deps(log),
self._changed_outs(log),
self._changed_md5(log)])

if ret:
msg = "Stage '{}' changed.".format(self.relpath)
Expand All @@ -206,6 +213,12 @@ def remove_outs(self, ignore_remove=False):
for out in self.outs:
out.remove(ignore_remove=ignore_remove)

def unprotect_outs(self):
for out in self.outs:
if out.path_info['scheme'] != 'local' or not out.exists:
continue
self.project.unprotect(out.path)

def remove(self):
self.remove_outs(ignore_remove=True)
os.unlink(self.path)
Expand Down Expand Up @@ -290,6 +303,37 @@ def _check_inside_project(project, cwd):
if not os.path.realpath(cwd).startswith(proj_dir):
raise StageBadCwdError(cwd)

def is_cached(self):
"""
Checks if this stage has been already ran and saved to the same
dvc file.
"""
from dvc.remote.local import RemoteLOCAL
from dvc.remote.s3 import RemoteS3

old = Stage.load(self.project, self.path)
if old._changed_outs(log=self.project.logger.debug):
return False

# NOTE: need to save checksums for deps in order to compare them
# with what is written in the old stage.
for dep in self.deps:
dep.save()

old_d = old.dumpd()
new_d = self.dumpd()

# NOTE: need to remove checksums from old dict in order to compare
# it to the new one, since the new one doesn't have checksums yet.
old_d.pop(self.PARAM_MD5, None)
new_d.pop(self.PARAM_MD5, None)
outs = old_d.get(self.PARAM_OUTS, [])
for out in outs:
out.pop(RemoteLOCAL.PARAM_MD5, None)
out.pop(RemoteS3.PARAM_ETAG, None)

return old_d == new_d

@staticmethod
def loads(project=None,
cmd=None,
Expand All @@ -301,7 +345,10 @@ def loads(project=None,
cwd=os.curdir,
locked=False,
add=False,
overwrite=True):
overwrite=True,
ignore_build_cache=False,
remove_outs=False):

stage = Stage(project=project,
cwd=cwd,
cmd=cmd,
Expand All @@ -325,17 +372,28 @@ def loads(project=None,
cwd = os.path.abspath(cwd)
path = os.path.join(cwd, fname)

if os.path.exists(path):
relpath = os.path.relpath(path)
msg = "'{}' already exists. " \
"Do you wish to run the command and overwrite it?"
if not overwrite \
and not project.prompt.prompt(msg.format(relpath), False):
raise DvcException("'{}' already exists".format(relpath))

stage.cwd = cwd
stage.path = path

# NOTE: remove outs before we check build cache
if remove_outs:
stage.remove_outs(ignore_remove=False)
project.logger.warn("Build cache is ignored when using "
"--remove-outs.")
ignore_build_cache = True
else:
stage.unprotect_outs()

if os.path.exists(path):
if not ignore_build_cache and stage.is_cached():
Logger.info('Stage is cached, skipping.')
return None

msg = "'{}' already exists. Do you wish to run the command and " \
"overwrite it?".format(stage.relpath)
if not overwrite and not project.prompt.prompt(msg, False):
raise StageFileAlreadyExistsError(stage.relpath)

return stage

@staticmethod
Expand Down
26 changes: 25 additions & 1 deletion dvc/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ def getdirinfo(path):
from ctypes import c_void_p, c_wchar_p, Structure, WinError, POINTER
from ctypes.wintypes import DWORD, HANDLE, BOOL

# NOTE: use this flag to open symlink itself and not the target
# See https://docs.microsoft.com/en-us/windows/desktop/api/
# fileapi/nf-fileapi-createfilew#symbolic-link-behavior
FILE_FLAG_OPEN_REPARSE_POINT = 0x00200000

FILE_FLAG_BACKUP_SEMANTICS = 0x02000000
FILE_SHARE_READ = 0x00000001
OPEN_EXISTING = 3
Expand All @@ -147,7 +152,7 @@ class BY_HANDLE_FILE_INFORMATION(Structure):
("nFileIndexHigh", DWORD),
("nFileIndexLow", DWORD)]

flags = FILE_FLAG_BACKUP_SEMANTICS
flags = FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OPEN_REPARSE_POINT

func = ctypes.windll.kernel32.CreateFileW
func.argtypes = [c_wchar_p,
Expand Down Expand Up @@ -240,3 +245,22 @@ def wait_for_input(timeout):
return System._wait_for_input_posix(timeout)
else:
return System._wait_for_input_windows(timeout)

@staticmethod
def is_symlink(path):
if System.is_unix():
return os.path.islink(path)

# https://docs.microsoft.com/en-us/windows/desktop/fileio/
# file-attribute-constants
FILE_ATTRIBUTE_REPARSE_POINT = 0x400
info = System.getdirinfo(path)
return info.dwFileAttributes & FILE_ATTRIBUTE_REPARSE_POINT

@staticmethod
def is_hardlink(path):
if System.is_unix():
return os.stat(path).st_nlink > 1

info = System.getdirinfo(path)
return info.nNumberOfLinks > 1
Loading

0 comments on commit 9f4dd2d

Please sign in to comment.