diff --git a/renku/command/command_builder/command.py b/renku/command/command_builder/command.py index 5920ddb632..38627b6b1a 100644 --- a/renku/command/command_builder/command.py +++ b/renku/command/command_builder/command.py @@ -171,6 +171,7 @@ def __init__(self) -> None: self._track_std_streams: bool = False self._working_directory: Optional[str] = None self._client: Optional["LocalClient"] = None + self._client_was_created: bool = False def __getattr__(self, name: str) -> Any: """Bubble up attributes of wrapped builders.""" @@ -205,6 +206,7 @@ def _injection_pre_hook(self, builder: "Command", context: dict, *args, **kwargs dispatcher.push_created_client_to_stack(self._client) else: self._client = dispatcher.push_client_to_stack(path=default_path(self._working_directory or ".")) + self._client_was_created = True ctx = click.Context(click.Command(builder._operation)) # type: ignore else: if not self._client: @@ -237,6 +239,9 @@ def _post_hook(self, builder: "Command", context: dict, result: "CommandResult", """ remove_injector() + if self._client_was_created and self._client and self._client.repository is not None: + self._client.repository.close() + if result.error: raise result.error diff --git a/renku/core/management/git.py b/renku/core/management/git.py index f93353bbdb..7988745b77 100644 --- a/renku/core/management/git.py +++ b/renku/core/management/git.py @@ -295,6 +295,10 @@ def __attrs_post_init__(self): except errors.GitError: self.repository = None + def __del__(self): + if self.repository: + self.repository.close() + @property def modified_paths(self): """Return paths of modified files.""" diff --git a/renku/infrastructure/repository.py b/renku/infrastructure/repository.py index 3793ac5da1..fa5f984a1a 100644 --- a/renku/infrastructure/repository.py +++ b/renku/infrastructure/repository.py @@ -86,6 +86,15 @@ def __init__(self, path: Union[Path, str] = ".", repository: Optional[git.Repo] def __repr__(self) -> str: return f"<{self.__class__.__name__} {self.path}>" + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + def __del__(self): + self.close() + @property def path(self) -> Path: """Absolute path to the repository's root.""" @@ -675,6 +684,16 @@ def get_user(self) -> "Actor": configuration = self.get_configuration() return Repository._get_user_from_configuration(configuration) + def close(self) -> None: + """Close the underlying repository. + + Cleans up dangling processes. + """ + if getattr(self, "_repository", None) is not None: + self._repository.close() # type:ignore + del self._repository + self._repository = None + @staticmethod def get_global_user() -> "Actor": """Return the global git user.""" @@ -795,6 +814,7 @@ def __init__( self, path: Union[Path, str] = ".", search_parent_directories: bool = False, repository: git.Repo = None ): repo = repository or _create_repository(path, search_parent_directories) + super().__init__(path=Path(repo.working_dir).resolve(), repository=repo) # type: ignore @classmethod @@ -864,7 +884,7 @@ def __init__(self, parent: git.Repo, name: str, path: Union[Path, str], url: str self._name: str = name self._url: str = url try: - self._repository: git.Repo = _create_repository(path, search_parent_directories=False) + self._repository: Optional[git.Repo] = _create_repository(path, search_parent_directories=False) except errors.GitError: # NOTE: Submodule directory doesn't exist yet, so, we ignore the error pass @@ -881,6 +901,12 @@ def __str__(self) -> str: def __repr__(self) -> str: return f"" + def __del__(self) -> None: + if getattr(self, "_repository", None) is not None: + self._repository.close() # type:ignore + del self._repository + self._repository = None + @property def name(self) -> str: """Return submodule's name.""" @@ -901,25 +927,47 @@ class SubmoduleManager: """Manage submodules of a Repository.""" def __init__(self, repository: git.Repo): - self._repository = repository + self._repository: Optional[git.Repo] = repository + self._submodule_cache: Dict[str, Submodule] = {} # type: ignore try: self.update() except errors.GitError: # NOTE: Update fails if submodule repo cannot be cloned. Repository still works but submodules are broken. pass + def _get_submodule(self, submodule: git.Submodule) -> Submodule: # type: ignore + """Get a submodule from local cache.""" + if self._repository is None: + raise errors.ParameterError("Repository not set.") + + if submodule.name not in self._submodule_cache: + submodule_result = Submodule.from_submodule(self._repository, submodule) + self._submodule_cache[submodule.name] = submodule_result + return self._submodule_cache[submodule.name] + def __getitem__(self, name: str) -> Submodule: + if self._repository is None: + raise errors.ParameterError("Repository not set.") + try: submodule = self._repository.submodules[name] except IndexError: raise errors.GitError(f"Submodule '{name}' not found") else: - return Submodule.from_submodule(self._repository, submodule) + return self._get_submodule(submodule) def __iter__(self): - return (Submodule.from_submodule(self._repository, s) for s in self._repository.submodules) + if self._repository is None: + raise errors.ParameterError("Repository not set.") + + for s in self._repository.submodules: + + yield self._get_submodule(s) def __len__(self) -> int: + if self._repository is None: + raise errors.ParameterError("Repository not set.") + return len(self._repository.submodules) def __repr__(self) -> str: @@ -927,16 +975,27 @@ def __repr__(self) -> str: def remove(self, submodule: Union[Submodule, str], force: bool = False): """Remove an existing submodule.""" + if self._repository is None: + raise errors.ParameterError("Repository not set.") + name = submodule if isinstance(submodule, str) else submodule.name try: - submodule = self._repository.submodules[name] - submodule.remove(force=force) + git_submodule = self._repository.submodules[name] + git_submodule.remove(force=force) + + if name in self._submodule_cache: + submodule = self._submodule_cache[name] + del self._submodule_cache[name] + submodule.close() except git.GitError as e: raise errors.GitError(f"Cannot delete submodule '{submodule}'") from e def update(self, initialize: bool = True): """Update all submodule.""" + if self._repository is None: + raise errors.ParameterError("Repository not set.") + # NOTE: Git complains if ``--init`` comes before ``update`` args = ("update", "--init") if initialize else ("update",) _run_git_command(self._repository, "submodule", *args) diff --git a/renku/ui/service/controllers/api/mixins.py b/renku/ui/service/controllers/api/mixins.py index 099ee54747..90acd8b313 100644 --- a/renku/ui/service/controllers/api/mixins.py +++ b/renku/ui/service/controllers/api/mixins.py @@ -190,37 +190,37 @@ def execute_op(self): ref = self.request_data.get("ref", None) if ref: - repository = Repository(project.abs_path) - if ref != repository.active_branch.name: - # NOTE: Command called for different branch than the one used in cache, change branch - if len(repository.remotes) != 1: - raise RenkuException("Couldn't find remote for project in cache.") - origin = repository.remotes[0] - remote_branch = f"{origin}/{ref}" - - with project.write_lock(): - # NOTE: Add new ref to remote branches - repository.run_git_command("remote", "set-branches", "--add", origin, ref) - if self.migrate_project or self.clone_depth == PROJECT_CLONE_NO_DEPTH: - repository.fetch(origin, ref) - else: - repository.fetch(origin, ref, depth=self.clone_depth) - - # NOTE: Switch to new ref - repository.run_git_command("checkout", "--track", "-f", "-b", ref, remote_branch) - - # NOTE: cleanup remote branches in case a remote was deleted (fetch fails otherwise) - repository.run_git_command("remote", "prune", origin) - - for branch in repository.branches: - if branch.remote_branch and not branch.remote_branch.is_valid(): - repository.branches.remove(branch, force=True) - # NOTE: Remove left-over refspec - try: - with repository.get_configuration(writable=True) as config: - config.remove_value(f"remote.{origin}.fetch", f"origin.{branch}$") - except GitConfigurationError: - pass + with Repository(project.abs_path) as repository: + if ref != repository.active_branch.name: + # NOTE: Command called for different branch than the one used in cache, change branch + if len(repository.remotes) != 1: + raise RenkuException("Couldn't find remote for project in cache.") + origin = repository.remotes[0] + remote_branch = f"{origin}/{ref}" + + with project.write_lock(): + # NOTE: Add new ref to remote branches + repository.run_git_command("remote", "set-branches", "--add", origin, ref) + if self.migrate_project or self.clone_depth == PROJECT_CLONE_NO_DEPTH: + repository.fetch(origin, ref) + else: + repository.fetch(origin, ref, depth=self.clone_depth) + + # NOTE: Switch to new ref + repository.run_git_command("checkout", "--track", "-f", "-b", ref, remote_branch) + + # NOTE: cleanup remote branches in case a remote was deleted (fetch fails otherwise) + repository.run_git_command("remote", "prune", origin) + + for branch in repository.branches: + if branch.remote_branch and not branch.remote_branch.is_valid(): + repository.branches.remove(branch, force=True) + # NOTE: Remove left-over refspec + try: + with repository.get_configuration(writable=True) as config: + config.remove_value(f"remote.{origin}.fetch", f"origin.{branch}$") + except GitConfigurationError: + pass else: self.reset_local_repo(project) @@ -250,33 +250,33 @@ def reset_local_repo(self, project): # NOTE: return immediately in case of multiple writers waiting return - repository = Repository(project.abs_path) - origin = None - tracking_branch = repository.active_branch.remote_branch - if tracking_branch: - origin = tracking_branch.remote - elif len(repository.remotes) == 1: - origin = repository.remotes[0] - - if origin: - unshallow = self.migrate_project or self.clone_depth == PROJECT_CLONE_NO_DEPTH - if unshallow: - try: - # NOTE: It could happen that repository is already un-shallowed, - # in this case we don't want to leak git exception, but still want to fetch. - repository.fetch("origin", repository.active_branch, unshallow=True) - except GitCommandError: - repository.fetch("origin", repository.active_branch) - - repository.reset(f"{origin}/{repository.active_branch}", hard=True) - else: - try: - # NOTE: it rarely happens that origin is not reachable. Try again if it fails. - repository.fetch("origin", repository.active_branch) + with Repository(project.abs_path) as repository: + origin = None + tracking_branch = repository.active_branch.remote_branch + if tracking_branch: + origin = tracking_branch.remote + elif len(repository.remotes) == 1: + origin = repository.remotes[0] + + if origin: + unshallow = self.migrate_project or self.clone_depth == PROJECT_CLONE_NO_DEPTH + if unshallow: + try: + # NOTE: It could happen that repository is already un-shallowed, + # in this case we don't want to leak git exception, but still want to fetch. + repository.fetch("origin", repository.active_branch, unshallow=True) + except GitCommandError: + repository.fetch("origin", repository.active_branch) + repository.reset(f"{origin}/{repository.active_branch}", hard=True) - except GitCommandError as e: - project.purge() - raise IntermittentCacheError(e) + else: + try: + # NOTE: it rarely happens that origin is not reachable. Try again if it fails. + repository.fetch("origin", repository.active_branch) + repository.reset(f"{origin}/{repository.active_branch}", hard=True) + except GitCommandError as e: + project.purge() + raise IntermittentCacheError(e) project.last_fetched_at = datetime.utcnow() project.save() except (portalocker.LockException, portalocker.AlreadyLocked) as e: @@ -346,7 +346,8 @@ def sync(self, remote="origin"): if self.project_path is None: raise RenkuException("unable to sync with remote since no operation has been executed") - return push_changes(Repository(self.project_path), remote=remote) + with Repository(self.project_path) as repository: + return push_changes(repository, remote=remote) def execute_and_sync(self, remote="origin"): """Execute operation which controller implements and sync with the remote."""