diff --git a/dvc/output/base.py b/dvc/output/base.py index 420fc65166..d97887e5c3 100644 --- a/dvc/output/base.py +++ b/dvc/output/base.py @@ -1,15 +1,13 @@ import logging import os from copy import copy -from typing import Dict, Type +from typing import Type from urllib.parse import urlparse -from funcy import cached_property, project from voluptuous import Any import dvc.prompt as prompt from dvc.cache import NamedCache -from dvc.env import DVCLIVE_PATH, DVCLIVE_REPORT, DVCLIVE_SUMMARY from dvc.exceptions import ( CheckoutError, CollectCacheError, @@ -607,22 +605,3 @@ def merge(self, ancestor, other): self.hash_info = self.cache.merge( ancestor_info, self.hash_info, other.hash_info ) - - @cached_property - def env(self) -> Dict[str, str]: - if self.live: - from dvc.schema import LIVE_PROPS - - env = {DVCLIVE_PATH: str(self.path_info)} - if isinstance(self.live, dict): - - config = project(self.live, LIVE_PROPS) - - env[DVCLIVE_SUMMARY] = str( - int(config.get(BaseOutput.PARAM_LIVE_SUMMARY, True)) - ) - env[DVCLIVE_REPORT] = str( - int(config.get(BaseOutput.PARAM_LIVE_REPORT, True)) - ) - return env - return {} diff --git a/dvc/stage/__init__.py b/dvc/stage/__init__.py index e6ca0cf56d..990d3016dc 100644 --- a/dvc/stage/__init__.py +++ b/dvc/stage/__init__.py @@ -255,13 +255,41 @@ def is_checkpoint(self): """ return any(out.checkpoint for out in self.outs) - @property - def env(self) -> Env: + def _read_env(self, out, checkpoint_func=None) -> Env: env: Env = {} + if out.live: + from dvc.env import DVCLIVE_PATH, DVCLIVE_REPORT, DVCLIVE_SUMMARY + from dvc.output import BaseOutput + from dvc.schema import LIVE_PROPS + + env[DVCLIVE_PATH] = str(out.path_info) + if isinstance(out.live, dict): + config = project(out.live, LIVE_PROPS) + + env[DVCLIVE_SUMMARY] = str( + int(config.get(BaseOutput.PARAM_LIVE_SUMMARY, True)) + ) + env[DVCLIVE_REPORT] = str( + int(config.get(BaseOutput.PARAM_LIVE_REPORT, True)) + ) + elif out.checkpoint and checkpoint_func: + from dvc.env import DVC_CHECKPOINT + + env.update({DVC_CHECKPOINT: "1"}) + return env + + def env(self, checkpoint_func=None) -> Env: + from dvc.env import DVC_ROOT + + env: Env = {} + if self.repo: + env.update({DVC_ROOT: self.repo.root_dir}) + for out in self.outs: - if any(out.env.keys() and env.keys()): + current = self._read_env(out, checkpoint_func=checkpoint_func) + if set(env.keys()).intersection(set(current.keys())): raise DvcException("Duplicated env variable") - env.update(out.env) + env.update(current) return env def changed_deps(self): diff --git a/dvc/stage/run.py b/dvc/stage/run.py index d747409e17..afdb6f31bf 100644 --- a/dvc/stage/run.py +++ b/dvc/stage/run.py @@ -5,7 +5,6 @@ import threading from contextlib import contextmanager -from dvc.env import DVC_CHECKPOINT, DVC_ROOT from dvc.utils import fix_env from .decorators import relock_repo, unlocked_repo @@ -52,11 +51,8 @@ def _enforce_cmd_list(cmd): def prepare_kwargs(stage, checkpoint_func=None): kwargs = {"cwd": stage.wdir, "env": fix_env(None), "close_fds": True} - if checkpoint_func: - # indicate that checkpoint cmd is being run inside DVC - kwargs["env"].update(_checkpoint_env(stage)) - kwargs["env"].update(stage.env) + kwargs["env"].update(stage.env(checkpoint_func=checkpoint_func)) # NOTE: when you specify `shell=True`, `Popen` [1] will default to # `/bin/sh` on *nix and will add ["/bin/sh", "-c"] to your command. @@ -143,10 +139,6 @@ def run_stage(stage, dry=False, force=False, checkpoint_func=None, **kwargs): run(stage, dry=dry, checkpoint_func=checkpoint_func) -def _checkpoint_env(stage): - return {DVC_CHECKPOINT: "1", DVC_ROOT: stage.repo.root_dir} - - @contextmanager def checkpoint_monitor(stage, callback_func, proc, killed): if not callback_func: