From e02e5bf846f8e63e3e2b116edb1bc7ee54bacdbc Mon Sep 17 00:00:00 2001 From: Ralf Grubenmann Date: Thu, 25 May 2023 10:27:15 +0200 Subject: [PATCH] fix(cli): fix dataset update with external files (#3379) --- renku/core/dataset/dataset.py | 211 ++++++---------------- renku/core/dataset/providers/api.py | 95 +++++++++- renku/core/dataset/providers/azure.py | 1 + renku/core/dataset/providers/dataverse.py | 1 + renku/core/dataset/providers/doi.py | 1 + renku/core/dataset/providers/external.py | 1 + renku/core/dataset/providers/git.py | 77 +++++++- renku/core/dataset/providers/local.py | 60 +++++- renku/core/dataset/providers/models.py | 17 ++ renku/core/dataset/providers/olos.py | 1 + renku/core/dataset/providers/renku.py | 1 + renku/core/dataset/providers/s3.py | 1 + renku/core/dataset/providers/web.py | 83 ++++++++- renku/core/dataset/providers/zenodo.py | 1 + tests/cli/test_datasets.py | 149 ++++++++++++++- 15 files changed, 530 insertions(+), 170 deletions(-) diff --git a/renku/core/dataset/dataset.py b/renku/core/dataset/dataset.py index e6f2ec0af4..c8456ee8e8 100644 --- a/renku/core/dataset/dataset.py +++ b/renku/core/dataset/dataset.py @@ -18,6 +18,7 @@ import os import shutil import urllib +from collections import defaultdict from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast @@ -29,22 +30,19 @@ from renku.core import errors from renku.core.config import get_value, remove_value, set_value from renku.core.dataset.datasets_provenance import DatasetsProvenance -from renku.core.dataset.pointer_file import ( - create_external_file, - delete_external_file, - is_linked_file_updated, - update_linked_file, -) +from renku.core.dataset.pointer_file import delete_external_file, is_linked_file_updated, update_linked_file +from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi from renku.core.dataset.providers.factory import ProviderFactory -from renku.core.dataset.providers.models import ProviderDataset +from renku.core.dataset.providers.git import GitProvider +from renku.core.dataset.providers.models import DatasetUpdateAction, ProviderDataset from renku.core.dataset.request_model import ImageRequestModel from renku.core.dataset.tag import get_dataset_by_tag, prompt_access_token, prompt_tag_selection from renku.core.interface.dataset_gateway import IDatasetGateway -from renku.core.storage import check_external_storage, pull_paths_from_storage, track_paths_in_storage +from renku.core.storage import check_external_storage, track_paths_in_storage from renku.core.util import communication from renku.core.util.datetime8601 import local_now -from renku.core.util.git import clone_repository, get_cache_directory_for_repository, get_git_user -from renku.core.util.metadata import is_linked_file, prompt_for_credentials, read_credentials, store_credentials +from renku.core.util.git import get_git_user +from renku.core.util.metadata import prompt_for_credentials, read_credentials, store_credentials from renku.core.util.os import ( create_symlink, delete_dataset_file, @@ -72,7 +70,6 @@ if TYPE_CHECKING: from renku.core.interface.storage import IStorage - from renku.infrastructure.repository import Repository @validate_arguments(config=dict(arbitrary_types_allowed=True)) @@ -571,6 +568,7 @@ def remove_files(dataset): total_size=calculate_total_size(importer.provider_dataset_files), clear_files_before=True, datadir=datadir, + storage=provider_dataset.storage, ) new_dataset.update_metadata_from(provider_dataset) @@ -714,19 +712,32 @@ def update_datasets( raise errors.ParameterError("No files matched the criteria.") return imported_dataset_updates_view_models, [] - git_files = [] + provider_files: Dict[AddProviderInterface, List[DynamicProxy]] = defaultdict(list) unique_remotes = set() linked_files = [] - local_files = [] for file in records: - if file.based_on: - git_files.append(file) - unique_remotes.add(file.based_on.url) - elif file.linked: + if file.linked: linked_files.append(file) else: - local_files.append(file) + if not getattr(file, "provider", None): + if file.based_on: + uri = file.dataset.same_as.value if file.dataset.same_as else file.based_on.url + else: + uri = file.source + try: + file.provider = cast( + AddProviderInterface, + ProviderFactory.get_add_provider(uri), + ) + except errors.DatasetProviderNotFound: + communication.warn(f"Couldn't find provider for file {file.path} in dataset {file.dataset.name}") + continue + + provider_files[file.provider].append(file) + + if isinstance(file.provider, GitProvider): + unique_remotes.add(file.based_on.url) if ref and len(unique_remotes) > 1: raise errors.ParameterError( @@ -741,18 +752,24 @@ def update_datasets( updated = update_linked_files(linked_files, dry_run=dry_run) updated_files.extend(updated) - if git_files and not no_remote: - updated, deleted = update_dataset_git_files(files=git_files, ref=ref, delete=delete, dry_run=dry_run) - updated_files.extend(updated) - deleted_files.extend(deleted) + provider_context: Dict[str, Any] = {} + + for provider, files in provider_files.items(): + if (no_remote and cast(ProviderApi, provider).is_remote) or ( + no_local and not cast(ProviderApi, provider).is_remote + ): + continue - if local_files and not no_local: - updated, deleted, new = update_dataset_local_files( - records=local_files, check_data_directory=check_data_directory + results = provider.update_files( + files=files, + dry_run=dry_run, + delete=delete, + context=provider_context, + ref=ref, + check_data_directory=check_data_directory, ) - updated_files.extend(updated) - deleted_files.extend(deleted) - updated_files.extend(new) + updated_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.UPDATE) + deleted_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.DELETE) if not dry_run: if deleted_files and not delete: @@ -974,62 +991,6 @@ def move_files(dataset_gateway: IDatasetGateway, files: Dict[Path, Path], to_dat datasets_provenance.add_or_update(to_dataset, creator=creator) -def update_dataset_local_files( - records: List[DynamicProxy], check_data_directory: bool -) -> Tuple[List[DynamicProxy], List[DynamicProxy], List[DynamicProxy]]: - """Update files metadata from the git history. - - Args: - records(List[DynamicProxy]): File records to update. - check_data_directory(bool): Whether to check the dataset's data directory for new files. - Returns: - Tuple[List[DynamicProxy], List[DynamicProxy]]: Tuple of updated and deleted file records. - """ - updated_files: List[DynamicProxy] = [] - deleted_files: List[DynamicProxy] = [] - new_files: List[DynamicProxy] = [] - progress_text = "Checking for local updates" - - try: - communication.start_progress(progress_text, len(records)) - check_paths = [] - records_to_check = [] - - for file in records: - communication.update_progress(progress_text, 1) - - if file.based_on or file.linked: - continue - - if not (project_context.path / file.entity.path).exists(): - deleted_files.append(file) - continue - - check_paths.append(file.entity.path) - records_to_check.append(file) - - checksums = project_context.repository.get_object_hashes(check_paths) - - for file in records_to_check: - current_checksum = checksums.get(file.entity.path) - if not current_checksum: - deleted_files.append(file) - elif current_checksum != file.entity.checksum: - updated_files.append(file) - elif check_data_directory and not any(file.entity.path == f.entity.path for f in file.dataset.files): - datadir = file.dataset.get_datadir() - try: - get_safe_relative_path(file.entity.path, datadir) - except ValueError: - continue - - new_files.append(file) - finally: - communication.finalize_progress(progress_text) - - return updated_files, deleted_files, new_files - - def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_files: List[DynamicProxy], delete: bool): modified_datasets = {} checksums = project_context.repository.get_object_hashes([file.entity.path for file in updated_files]) @@ -1037,12 +998,16 @@ def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_f new_file = DatasetFile.from_path( path=file.entity.path, based_on=file.based_on, source=file.source, checksum=checksums.get(file.entity.path) ) - modified_datasets[file.dataset.name] = file.dataset + modified_datasets[file.dataset.name] = ( + file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset + ) file.dataset.add_or_update_files(new_file) if delete: for file in deleted_files: - modified_datasets[file.dataset.name] = file.dataset + modified_datasets[file.dataset.name] = ( + file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset + ) file.dataset.unlink_file(file.entity.path) datasets_provenance = DatasetsProvenance() @@ -1050,78 +1015,6 @@ def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_f datasets_provenance.add_or_update(dataset, creator=get_git_user(repository=project_context.repository)) -def update_dataset_git_files( - files: List[DynamicProxy], ref: Optional[str], delete: bool, dry_run: bool -) -> Tuple[List[DynamicProxy], List[DynamicProxy]]: - """Update files and dataset metadata according to their remotes. - - Args: - files(List[DynamicProxy]): List of files to be updated. - ref(Optional[str]): Reference to use for update. - delete(bool, optional): Indicates whether to delete files or not (Default value = False). - dry_run(bool): Whether to perform update or only print changes. - - Returns: - Tuple[List[DynamicProxy], List[DynamicProxy]]: Tuple of updated and deleted file records. - """ - visited_repos: Dict[str, "Repository"] = {} - updated_files: List[DynamicProxy] = [] - deleted_files: List[DynamicProxy] = [] - - progress_text = "Checking files for updates" - - try: - communication.start_progress(progress_text, len(files)) - for file in files: - communication.update_progress(progress_text, 1) - if not file.based_on: - continue - - based_on = file.based_on - url = based_on.url - if url in visited_repos: - remote_repository = visited_repos[url] - else: - communication.echo(msg="Cloning remote repository...") - path = get_cache_directory_for_repository(url=url) - remote_repository = clone_repository(url=url, path=path, checkout_revision=ref) - visited_repos[url] = remote_repository - - checksum = remote_repository.get_object_hash(path=based_on.path, revision="HEAD") - found = checksum is not None - changed = found and based_on.checksum != checksum - - src = remote_repository.path / based_on.path - dst = project_context.metadata_path.parent / file.entity.path - - if not found: - if not dry_run and delete: - delete_dataset_file(dst, follow_symlinks=True) - project_context.repository.add(dst, force=True) - deleted_files.append(file) - elif changed: - if not dry_run: - # Fetch file if it is tracked by Git LFS - pull_paths_from_storage(remote_repository, remote_repository.path / based_on.path) - if is_linked_file(path=src, project_path=remote_repository.path): - delete_dataset_file(dst, follow_symlinks=True) - create_external_file(target=src.resolve(), path=dst) - else: - shutil.copy(src, dst) - file.based_on = RemoteEntity( - checksum=checksum, path=based_on.path, url=based_on.url # type: ignore - ) - updated_files.append(file) - finally: - communication.finalize_progress(progress_text) - - if not updated_files and (not delete or not deleted_files): - # Nothing to commit or update - return [], deleted_files - - return updated_files, deleted_files - - def update_linked_files(records: List[DynamicProxy], dry_run: bool) -> List[DynamicProxy]: """Update files linked to other files in the project. @@ -1230,7 +1123,7 @@ def should_include(filepath: Path) -> bool: continue record = DynamicProxy(file) - record.dataset = dataset + record.dataset = DynamicProxy(dataset) records.append(record) if not check_data_directory: diff --git a/renku/core/dataset/providers/api.py b/renku/core/dataset/providers/api.py index c4c44f405e..02ab716315 100644 --- a/renku/core/dataset/providers/api.py +++ b/renku/core/dataset/providers/api.py @@ -16,25 +16,32 @@ """API for providers.""" import abc -from collections import UserDict +from collections import UserDict, defaultdict from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, List, Optional, Protocol, Tuple, Type, Union from renku.core import errors from renku.core.constant import ProviderPriority from renku.core.plugin import hookimpl +from renku.core.util import communication +from renku.core.util.os import delete_dataset_file +from renku.core.util.urls import is_uri_subfolder, resolve_uri from renku.domain_model.constant import NO_VALUE, NoValueType +from renku.domain_model.dataset import RemoteEntity from renku.domain_model.dataset_provider import IDatasetProviderPlugin +from renku.domain_model.project_context import project_context if TYPE_CHECKING: from renku.core.dataset.providers.models import ( DatasetAddMetadata, + DatasetUpdateMetadata, ProviderDataset, ProviderDatasetFile, ProviderParameter, ) from renku.core.interface.storage import IStorage from renku.domain_model.dataset import Dataset, DatasetTag + from renku.infrastructure.immutable import DynamicProxy class ProviderApi(IDatasetProviderPlugin): @@ -42,12 +49,13 @@ class ProviderApi(IDatasetProviderPlugin): priority: Optional[ProviderPriority] = None name: Optional[str] = None + is_remote: Optional[bool] = None def __init__(self, uri: str, **kwargs): self._uri: str = uri or "" def __init_subclass__(cls, **kwargs): - for required_property in ("priority", "name"): + for required_property in ("priority", "name", "is_remote"): if getattr(cls, required_property, None) is None: raise NotImplementedError(f"{required_property} must be set for {cls}") @@ -85,6 +93,13 @@ def get_metadata(self, uri: str, destination: Path, **kwargs) -> List["DatasetAd """Get metadata of files that will be added to a dataset.""" raise NotImplementedError + @abc.abstractmethod + def update_files( + self, files: List["DynamicProxy"], dry_run: bool, delete: bool, context: Dict[str, Any], **kwargs + ) -> List["DatasetUpdateMetadata"]: + """Update dataset files from the remote provider.""" + raise NotImplementedError + class ExportProviderInterface(abc.ABC): """Interface defining export providers.""" @@ -143,6 +158,82 @@ def supports_storage(uri: str) -> bool: """Whether or not this provider supports a given URI storage.""" raise NotImplementedError + def update_files( + self, + files: List["DynamicProxy"], + dry_run: bool, + delete: bool, + context: Dict[str, Any], + **kwargs, + ) -> List["DatasetUpdateMetadata"]: + """Update dataset files from the remote provider.""" + from renku.core.dataset.providers.models import DatasetUpdateAction, DatasetUpdateMetadata + + progress_text = f"Checking remote files for updates in dataset {files[0].dataset.name}" + + results: List[DatasetUpdateMetadata] = [] + + try: + communication.start_progress(progress_text, len(files)) + + storage = self.get_storage() + + # group files by storage to efficiently compute hashes + storage_files_dict: Dict[str, List["DynamicProxy"]] = defaultdict(list) + + for file in files: + if file.dataset.storage: + storage_files_dict[file.dataset.storage].append(file) + elif file.based_on: + if not self.supports_storage(file.based_on.url): + raise ValueError( + f"Called {getattr(self, 'name', 'Storage')} provider with file {file.entity.path} " + "which is not supported by this provider" + ) + storage_files_dict[file.based_on.url].append(file) + + for file_storage, files in storage_files_dict.items(): + hashes = storage.get_hashes(uri=file_storage) + for file in files: + communication.update_progress(progress_text, 1) + if not file.based_on: + continue + + dst = project_context.metadata_path.parent / file.entity.path + + hash = next((h for h in hashes if h.uri == file.based_on.url), None) + + if hash: + if not dry_run and ( + not file.dataset.storage + or not is_uri_subfolder(resolve_uri(file.dataset.storage), file.based_on.url) + ): + # Redownload downloaded (not mounted) file + download_storage = self.get_storage() + download_storage.download(file.based_on.url, dst) + file.based_on = RemoteEntity( + checksum=hash.hash if hash.hash else "", url=hash.uri, path=hash.path + ) + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE)) + else: + if ( + not dry_run + and delete + and ( + not file.dataset.storage + or not is_uri_subfolder(resolve_uri(file.dataset.storage), file.based_on.url) + ) + ): + # Delete downloaded (not mounted) file + delete_dataset_file(dst, follow_symlinks=True) + project_context.repository.add(dst, force=True) + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE)) + + finally: + communication.finalize_progress(progress_text) + + return results + class CloudStorageProviderType(Protocol): """Intersection type for ``mypy`` hinting in storage classes.""" diff --git a/renku/core/dataset/providers/azure.py b/renku/core/dataset/providers/azure.py index 985a03bae2..e7bc4fcbc5 100644 --- a/renku/core/dataset/providers/azure.py +++ b/renku/core/dataset/providers/azure.py @@ -45,6 +45,7 @@ class AzureProvider(ProviderApi, StorageProviderInterface, AddProviderInterface) priority = ProviderPriority.HIGHEST name = "Azure" + is_remote = True def __init__(self, uri: str): super().__init__(uri=uri) diff --git a/renku/core/dataset/providers/dataverse.py b/renku/core/dataset/providers/dataverse.py index bfdf5d053d..22b938d43e 100644 --- a/renku/core/dataset/providers/dataverse.py +++ b/renku/core/dataset/providers/dataverse.py @@ -82,6 +82,7 @@ class DataverseProvider(ProviderApi, ExportProviderInterface, ImportProviderInte priority = ProviderPriority.HIGH name = "Dataverse" + is_remote = True def __init__(self, uri: str, is_doi: bool = False): super().__init__(uri=uri) diff --git a/renku/core/dataset/providers/doi.py b/renku/core/dataset/providers/doi.py index 4fd2b74cda..eb3ee19f96 100644 --- a/renku/core/dataset/providers/doi.py +++ b/renku/core/dataset/providers/doi.py @@ -30,6 +30,7 @@ class DOIProvider(ProviderApi, ImportProviderInterface): priority = ProviderPriority.HIGHER name = "DOI" + is_remote = True def __init__(self, uri: str, headers=None, timeout=3): super().__init__(uri=uri) diff --git a/renku/core/dataset/providers/external.py b/renku/core/dataset/providers/external.py index 0767fe0213..6fb881d796 100644 --- a/renku/core/dataset/providers/external.py +++ b/renku/core/dataset/providers/external.py @@ -47,6 +47,7 @@ class ExternalProvider(ProviderApi, StorageProviderInterface, AddProviderInterfa priority = ProviderPriority.HIGHEST name = "External" + is_remote = True def __init__(self, uri: str): super().__init__(uri=get_uri_absolute_path(uri).rstrip("/")) diff --git a/renku/core/dataset/providers/git.py b/renku/core/dataset/providers/git.py index 43a88a758e..1ed53eecd8 100644 --- a/renku/core/dataset/providers/git.py +++ b/renku/core/dataset/providers/git.py @@ -17,22 +17,26 @@ import glob import os +import shutil from collections import defaultdict from pathlib import Path -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Union from renku.core import errors +from renku.core.dataset.pointer_file import create_external_file from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi, ProviderPriority from renku.core.storage import pull_paths_from_storage from renku.core.util import communication from renku.core.util.git import clone_repository, get_cache_directory_for_repository -from renku.core.util.os import get_files, is_subpath +from renku.core.util.metadata import is_linked_file +from renku.core.util.os import delete_dataset_file, get_files, is_subpath from renku.core.util.urls import check_url, remove_credentials from renku.domain_model.dataset import RemoteEntity from renku.domain_model.project_context import project_context +from renku.infrastructure.immutable import DynamicProxy if TYPE_CHECKING: - from renku.core.dataset.providers.models import DatasetAddMetadata, ProviderParameter + from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateMetadata, ProviderParameter class GitProvider(ProviderApi, AddProviderInterface): @@ -40,6 +44,7 @@ class GitProvider(ProviderApi, AddProviderInterface): priority = ProviderPriority.NORMAL name = "Git" + is_remote = True @staticmethod def supports(uri: str) -> bool: @@ -178,3 +183,69 @@ def get_file_metadata(src: Path, dst: Path) -> Optional["DatasetAddMetadata"]: communication.warn(f"The following files overwrite each other in the destination project:/n/t{files_str}") return results + + def update_files( + self, + files: List[DynamicProxy], + dry_run: bool, + delete: bool, + context: Dict[str, Any], + ref: Optional[str] = None, + **kwargs, + ) -> List["DatasetUpdateMetadata"]: + """Update dataset files from the remote provider.""" + from renku.core.dataset.providers.models import DatasetUpdateAction, DatasetUpdateMetadata + + if "visited_repos" not in context: + context["visited_repos"] = {} + + progress_text = "Checking git files for updates" + + results: List[DatasetUpdateMetadata] = [] + + try: + communication.start_progress(progress_text, len(files)) + for file in files: + communication.update_progress(progress_text, 1) + if not file.based_on: + continue + + based_on = file.based_on + url = based_on.url + if url in context["visited_repos"]: + remote_repository = context["visited_repos"][url] + else: + communication.echo(msg="Cloning remote repository...") + path = get_cache_directory_for_repository(url=url) + remote_repository = clone_repository(url=url, path=path, checkout_revision=ref) + context["visited_repos"][url] = remote_repository + + checksum = remote_repository.get_object_hash(path=based_on.path, revision="HEAD") + found = checksum is not None + changed = found and based_on.checksum != checksum + + src = remote_repository.path / based_on.path + dst = project_context.metadata_path.parent / file.entity.path + + if not found: + if not dry_run and delete: + delete_dataset_file(dst, follow_symlinks=True) + project_context.repository.add(dst, force=True) + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE)) + elif changed: + if not dry_run: + # Fetch file if it is tracked by Git LFS + pull_paths_from_storage(remote_repository, remote_repository.path / based_on.path) + if is_linked_file(path=src, project_path=remote_repository.path): + delete_dataset_file(dst, follow_symlinks=True) + create_external_file(target=src.resolve(), path=dst) + else: + shutil.copy(src, dst) + file.based_on = RemoteEntity( + checksum=checksum, path=based_on.path, url=based_on.url # type: ignore + ) + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE)) + finally: + communication.finalize_progress(progress_text) + + return results diff --git a/renku/core/dataset/providers/local.py b/renku/core/dataset/providers/local.py index 807dc658b5..e537e77958 100644 --- a/renku/core/dataset/providers/local.py +++ b/renku/core/dataset/providers/local.py @@ -19,7 +19,7 @@ import urllib import uuid from pathlib import Path -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional from renku.core import errors from renku.core.config import get_value @@ -33,12 +33,13 @@ from renku.core.storage import check_external_storage, track_paths_in_storage from renku.core.util import communication from renku.core.util.metadata import is_protected_path -from renku.core.util.os import get_absolute_path, is_path_empty, is_subpath +from renku.core.util.os import get_absolute_path, get_safe_relative_path, is_path_empty, is_subpath from renku.core.util.urls import check_url from renku.domain_model.project_context import project_context +from renku.infrastructure.immutable import DynamicProxy if TYPE_CHECKING: - from renku.core.dataset.providers.models import DatasetAddMetadata, ProviderParameter + from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateMetadata, ProviderParameter from renku.domain_model.dataset import Dataset, DatasetTag @@ -47,6 +48,7 @@ class LocalProvider(ProviderApi, AddProviderInterface, ExportProviderInterface): priority = ProviderPriority.LOW name = "Local" + is_remote = False def __init__(self, uri: str): super().__init__(uri=uri) @@ -233,6 +235,58 @@ def get_file_metadata(src: Path) -> DatasetAddMetadata: return results + def update_files( + self, + files: List[DynamicProxy], + dry_run: bool, + delete: bool, + context: Dict[str, Any], + check_data_directory: bool = False, + **kwargs, + ) -> List["DatasetUpdateMetadata"]: + """Update dataset files from the remote provider.""" + from renku.core.dataset.providers.models import DatasetUpdateAction, DatasetUpdateMetadata + + progress_text = "Checking for local updates" + results: List[DatasetUpdateMetadata] = [] + + try: + communication.start_progress(progress_text, len(files)) + check_paths = [] + records_to_check = [] + for file in files: + communication.update_progress(progress_text, 1) + + if file.based_on or file.linked: + continue + + if not (project_context.path / file.entity.path).exists(): + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE)) + continue + + check_paths.append(file.entity.path) + records_to_check.append(file) + + checksums = project_context.repository.get_object_hashes(check_paths) + + for file in records_to_check: + current_checksum = checksums.get(file.entity.path) + if not current_checksum: + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE)) + elif current_checksum != file.entity.checksum: + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE)) + elif check_data_directory and not any(file.entity.path == f.entity.path for f in file.dataset.files): + datadir = file.dataset.get_datadir() + try: + get_safe_relative_path(file.entity.path, datadir) + except ValueError: + continue + + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE)) + finally: + communication.finalize_progress(progress_text) + return results + def get_exporter( self, dataset: "Dataset", *, tag: Optional["DatasetTag"], path: Optional[str] = None, **kwargs ) -> "LocalExporter": diff --git a/renku/core/dataset/providers/models.py b/renku/core/dataset/providers/models.py index eda44fdaa6..b198133d99 100644 --- a/renku/core/dataset/providers/models.py +++ b/renku/core/dataset/providers/models.py @@ -26,6 +26,7 @@ from renku.command.schema.dataset import DatasetSchema from renku.domain_model.dataset import Dataset +from renku.infrastructure.immutable import DynamicProxy if TYPE_CHECKING: from renku.core.dataset.providers.api import StorageProviderInterface @@ -44,6 +45,13 @@ class DatasetAddAction(Enum): REMOTE_STORAGE = auto() # For URIs that are from a remote storage provider +class DatasetUpdateAction(Enum): + """Types of action when updating a file in a dataset.""" + + UPDATE = auto() + DELETE = auto() + + @dataclasses.dataclass class DatasetAddMetadata: """Metadata for a new file that will be added to a dataset.""" @@ -77,6 +85,14 @@ def get_absolute_commit_path(self, project_path: Path) -> str: return os.path.join(project_path, self.entity_path) +@dataclasses.dataclass +class DatasetUpdateMetadata: + """Metadata for updating dataset files.""" + + entity: DynamicProxy + action: DatasetUpdateAction + + class ProviderParameter(NamedTuple): """Provider-specific parameters.""" @@ -131,6 +147,7 @@ def from_dataset(cls, dataset: "Dataset") -> "ProviderDataset": same_as=dataset.same_as, title=dataset.title, version=dataset.version, + storage=dataset.storage, ) @property diff --git a/renku/core/dataset/providers/olos.py b/renku/core/dataset/providers/olos.py index 5faa5b1188..72bd742db0 100644 --- a/renku/core/dataset/providers/olos.py +++ b/renku/core/dataset/providers/olos.py @@ -38,6 +38,7 @@ class OLOSProvider(ProviderApi, ExportProviderInterface): priority = ProviderPriority.HIGH name = "OLOS" + is_remote = True def __init__(self, uri: str, is_doi: bool = False): super().__init__(uri=uri) diff --git a/renku/core/dataset/providers/renku.py b/renku/core/dataset/providers/renku.py index a4f7e5b881..d6015ebf20 100644 --- a/renku/core/dataset/providers/renku.py +++ b/renku/core/dataset/providers/renku.py @@ -43,6 +43,7 @@ class RenkuProvider(ProviderApi, ImportProviderInterface): priority = ProviderPriority.HIGH name = "Renku" + is_remote = True def __init__(self, uri: str, **_): super().__init__(uri=uri) diff --git a/renku/core/dataset/providers/s3.py b/renku/core/dataset/providers/s3.py index d3b387e591..ad7adc31c0 100644 --- a/renku/core/dataset/providers/s3.py +++ b/renku/core/dataset/providers/s3.py @@ -45,6 +45,7 @@ class S3Provider(ProviderApi, StorageProviderInterface, AddProviderInterface): priority = ProviderPriority.HIGHEST name = "S3" + is_remote = True def __init__(self, uri: str): super().__init__(uri=uri) diff --git a/renku/core/dataset/providers/web.py b/renku/core/dataset/providers/web.py index 90d34cd67e..68e5b67c44 100644 --- a/renku/core/dataset/providers/web.py +++ b/renku/core/dataset/providers/web.py @@ -17,18 +17,22 @@ import urllib from pathlib import Path -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union from urllib.parse import urlparse from renku.core import errors from renku.core.constant import CACHE +from renku.core.dataset.dataset_add import copy_file from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi, ProviderPriority +from renku.core.util import communication +from renku.core.util.os import delete_dataset_file from renku.core.util.urls import check_url, remove_credentials from renku.core.util.util import parallel_execute from renku.domain_model.project_context import project_context +from renku.infrastructure.immutable import DynamicProxy if TYPE_CHECKING: - from renku.core.dataset.providers.models import DatasetAddMetadata + from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateMetadata class WebProvider(ProviderApi, AddProviderInterface): @@ -36,6 +40,7 @@ class WebProvider(ProviderApi, AddProviderInterface): priority = ProviderPriority.LOWEST name = "Web" + is_remote = True @staticmethod def supports(uri: str) -> bool: @@ -69,6 +74,80 @@ def get_metadata( multiple=multiple, ) + def update_files( + self, + files: List[DynamicProxy], + dry_run: bool, + delete: bool, + context: Dict[str, Any], + **kwargs, + ) -> List["DatasetUpdateMetadata"]: + """Update dataset files from the remote provider.""" + from renku.core.dataset.providers.models import DatasetAddMetadata, DatasetUpdateAction, DatasetUpdateMetadata + + progress_text = "Checking for local updates" + results: List[DatasetUpdateMetadata] = [] + + download_cache: Dict[str, DatasetAddMetadata] = {} + potential_updates: List[Tuple[DatasetAddMetadata, DynamicProxy]] = [] + + try: + communication.start_progress(progress_text, len(files)) + for file in files: + if not file.source: + continue + destination = project_context.path / file.dataset.get_datadir() + try: + if file.entity.path not in download_cache: + downloaded_files = download_file( + project_path=project_context.path, uri=file.source, destination=destination + ) + + if not any(f.entity_path == file.entity.path for f in downloaded_files): + # File probably comes from an extracted download + downloaded_files = download_file( + project_path=project_context.path, + uri=file.source, + destination=destination, + extract=True, + ) + + download_cache.update({str(f.entity_path): f for f in downloaded_files}) + except errors.OperationError: + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE)) + else: + metadata = download_cache.get(file.entity.path) + + if not metadata: + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.DELETE)) + + if not dry_run and delete: + delete_dataset_file(file.entity.path, follow_symlinks=True) + project_context.repository.add(file.entity.path, force=True) + else: + potential_updates.append((metadata, file)) + + finally: + communication.finalize_progress(progress_text) + + if not potential_updates: + return results + + check_paths: List[Union[Path, str]] = [ + str(u[0].source.relative_to(project_context.path)) for u in potential_updates + ] + # Stage files temporarily so we can get hashes + project_context.repository.add(*check_paths, force=True) + hashes = project_context.repository.get_object_hashes(check_paths) + project_context.repository.remove(*check_paths, index=True) + + for metadata, file in potential_updates: + if file.entity.checksum != hashes.get(metadata.source): + results.append(DatasetUpdateMetadata(entity=file, action=DatasetUpdateAction.UPDATE)) + if not dry_run: + copy_file(metadata, file.dataset, storage=None) + return results + def _ensure_dropbox(url): """Ensure dropbox url is set for file download.""" diff --git a/renku/core/dataset/providers/zenodo.py b/renku/core/dataset/providers/zenodo.py index 8237ba73cd..26daf7452d 100644 --- a/renku/core/dataset/providers/zenodo.py +++ b/renku/core/dataset/providers/zenodo.py @@ -61,6 +61,7 @@ class ZenodoProvider(ProviderApi, ExportProviderInterface, ImportProviderInterfa priority = ProviderPriority.HIGH name = "Zenodo" + is_remote = True def __init__(self, uri: str, is_doi: bool = False): super().__init__(uri=uri) diff --git a/tests/cli/test_datasets.py b/tests/cli/test_datasets.py index 73f014d7be..a0d935c98b 100644 --- a/tests/cli/test_datasets.py +++ b/tests/cli/test_datasets.py @@ -34,6 +34,7 @@ from renku.core.dataset.providers.dataverse import DataverseProvider from renku.core.dataset.providers.factory import ProviderFactory from renku.core.dataset.providers.zenodo import ZenodoProvider +from renku.core.interface.storage import FileHash from renku.core.storage import track_paths_in_storage from renku.core.util.git import get_dirty_paths from renku.core.util.urls import get_slug @@ -2558,7 +2559,6 @@ def test_add_local_data_to_cloud_datasets(runner, project, mocker, directory_tre cloud_storage.upload.return_value = [] - uri = "s3://s3.endpoint/bucket/path" result = runner.invoke(cli, ["dataset", "create", "cloud-data", "--storage", uri]) assert 0 == result.exit_code, format_result_exception(result) @@ -2587,6 +2587,153 @@ def test_add_local_data_to_cloud_datasets(runner, project, mocker, directory_tre cloud_storage.upload.assert_has_calls(calls=calls, any_order=True) +@pytest.mark.parametrize("uri", ["s3://s3.endpoint/bucket/", "azure://renkupythontest1/test-private-1"]) +def test_dataset_update_remote_file(runner, project, mocker, uri): + """Test updating a file added from remote/cloud storage.""" + storage_factory = mocker.patch("renku.infrastructure.storage.factory.StorageFactory.get_storage", autospec=True) + cloud_storage = storage_factory.return_value + + uri = f"{uri}/path/myfile" + + def _fake_download(uri, destination): + with open(destination, "w") as f: + f.write("a") + + cloud_storage.get_hashes.return_value = [FileHash(uri=uri, path="path/myfile", size=5, hash="deadbeef")] + cloud_storage.download.side_effect = _fake_download + + result = runner.invoke(cli, ["dataset", "create", "local-data"]) + assert 0 == result.exit_code, format_result_exception(result) + + result = runner.invoke(cli, ["dataset", "add", "local-data", uri]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 1 == len(dataset.files) + assert dataset.files[0].based_on.url == uri + assert dataset.files[0].based_on.checksum == "deadbeef" + + # Updating without changes does nothing + result = runner.invoke(cli, ["dataset", "update", "local-data"]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 1 == len(dataset.files) + assert dataset.files[0].based_on.url == uri + assert dataset.files[0].based_on.checksum == "deadbeef" + + # Updating with changes works + def _fake_download2(uri, destination): + with open(destination, "w") as f: + f.write("b") + + cloud_storage.get_hashes.return_value = [FileHash(uri=uri, path="path/myfile", size=7, hash="8badf00d")] + cloud_storage.download.side_effect = _fake_download2 + + result = runner.invoke(cli, ["dataset", "update", "local-data"]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 1 == len(dataset.files) + assert dataset.files[0].based_on.url == uri + assert dataset.files[0].based_on.checksum == "8badf00d" + + cloud_storage.get_hashes.return_value = [] + + # check deletion doesn't happen without --delete + result = runner.invoke(cli, ["dataset", "update", "local-data"]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 1 == len(dataset.files) + + # check deletion + result = runner.invoke(cli, ["dataset", "update", "local-data", "--delete"]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 0 == len(dataset.files) + + +def test_dataset_update_web_file(runner, project, mocker): + """Test updating a file added from remote/cloud storage.""" + + uri = "http://www.example.com/myfile.txt" + + cache = project.path / ".renku" / "cache" + cache.mkdir(parents=True, exist_ok=True) + new_file = cache / "myfile.txt" + new_file.write_text("output") + + mocker.patch("renku.core.util.requests.get_redirect_url", lambda _: uri) + mocker.patch( + "renku.core.util.requests.download_file", + lambda base_directory, url, filename, extract: (cache, [Path(new_file)]), + ) + + result = runner.invoke(cli, ["dataset", "create", "local-data"]) + assert 0 == result.exit_code, format_result_exception(result) + + result = runner.invoke(cli, ["dataset", "add", "local-data", uri]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 1 == len(dataset.files) + assert dataset.files[0].source == uri + assert dataset.files[0].entity.checksum == "6caf68aff423350af0ef7b148fec2ed4243658e5" + + # Updating without changes does nothing + new_file.write_text("output") + + result = runner.invoke(cli, ["dataset", "update", "local-data"]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 1 == len(dataset.files) + assert dataset.files[0].source == uri + assert dataset.files[0].entity.checksum == "6caf68aff423350af0ef7b148fec2ed4243658e5" + + # Updating with changes works + new_file.write_text("output2") + + result = runner.invoke(cli, ["dataset", "update", "local-data"]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 1 == len(dataset.files) + assert dataset.files[0].source == uri + assert dataset.files[0].entity.checksum == "1bc6411450b62581e5cea1174c15269c249dd4ea" + + # check deletion doesn't happen without --delete + def _fake_raise(base_directory, url, filename, extract): + raise errors.RequestError + + mocker.patch("renku.core.util.requests.download_file", _fake_raise) + + result = runner.invoke(cli, ["dataset", "update", "local-data"]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 1 == len(dataset.files) + + # check deletion + result = runner.invoke(cli, ["dataset", "update", "local-data", "--delete"]) + assert 0 == result.exit_code, format_result_exception(result) + + dataset = get_dataset_with_injection("local-data") + + assert 0 == len(dataset.files) + + @pytest.mark.parametrize( "storage", ["s3://s3.endpoint/bucket/path", "azure://renkupythontest1/test-private-1", "/local/file/storage"] )