Skip to content

Commit

Permalink
fix(cli): fix dataset update with external files (#3379)
Browse files Browse the repository at this point in the history
  • Loading branch information
Panaetius authored May 25, 2023
1 parent 9fa63dd commit e02e5bf
Show file tree
Hide file tree
Showing 15 changed files with 530 additions and 170 deletions.
211 changes: 52 additions & 159 deletions renku/core/dataset/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import shutil
import urllib
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast

Expand All @@ -29,22 +30,19 @@
from renku.core import errors
from renku.core.config import get_value, remove_value, set_value
from renku.core.dataset.datasets_provenance import DatasetsProvenance
from renku.core.dataset.pointer_file import (
create_external_file,
delete_external_file,
is_linked_file_updated,
update_linked_file,
)
from renku.core.dataset.pointer_file import delete_external_file, is_linked_file_updated, update_linked_file
from renku.core.dataset.providers.api import AddProviderInterface, ProviderApi
from renku.core.dataset.providers.factory import ProviderFactory
from renku.core.dataset.providers.models import ProviderDataset
from renku.core.dataset.providers.git import GitProvider
from renku.core.dataset.providers.models import DatasetUpdateAction, ProviderDataset
from renku.core.dataset.request_model import ImageRequestModel
from renku.core.dataset.tag import get_dataset_by_tag, prompt_access_token, prompt_tag_selection
from renku.core.interface.dataset_gateway import IDatasetGateway
from renku.core.storage import check_external_storage, pull_paths_from_storage, track_paths_in_storage
from renku.core.storage import check_external_storage, track_paths_in_storage
from renku.core.util import communication
from renku.core.util.datetime8601 import local_now
from renku.core.util.git import clone_repository, get_cache_directory_for_repository, get_git_user
from renku.core.util.metadata import is_linked_file, prompt_for_credentials, read_credentials, store_credentials
from renku.core.util.git import get_git_user
from renku.core.util.metadata import prompt_for_credentials, read_credentials, store_credentials
from renku.core.util.os import (
create_symlink,
delete_dataset_file,
Expand Down Expand Up @@ -72,7 +70,6 @@

if TYPE_CHECKING:
from renku.core.interface.storage import IStorage
from renku.infrastructure.repository import Repository


@validate_arguments(config=dict(arbitrary_types_allowed=True))
Expand Down Expand Up @@ -571,6 +568,7 @@ def remove_files(dataset):
total_size=calculate_total_size(importer.provider_dataset_files),
clear_files_before=True,
datadir=datadir,
storage=provider_dataset.storage,
)

new_dataset.update_metadata_from(provider_dataset)
Expand Down Expand Up @@ -714,19 +712,32 @@ def update_datasets(
raise errors.ParameterError("No files matched the criteria.")
return imported_dataset_updates_view_models, []

git_files = []
provider_files: Dict[AddProviderInterface, List[DynamicProxy]] = defaultdict(list)
unique_remotes = set()
linked_files = []
local_files = []

for file in records:
if file.based_on:
git_files.append(file)
unique_remotes.add(file.based_on.url)
elif file.linked:
if file.linked:
linked_files.append(file)
else:
local_files.append(file)
if not getattr(file, "provider", None):
if file.based_on:
uri = file.dataset.same_as.value if file.dataset.same_as else file.based_on.url
else:
uri = file.source
try:
file.provider = cast(
AddProviderInterface,
ProviderFactory.get_add_provider(uri),
)
except errors.DatasetProviderNotFound:
communication.warn(f"Couldn't find provider for file {file.path} in dataset {file.dataset.name}")
continue

provider_files[file.provider].append(file)

if isinstance(file.provider, GitProvider):
unique_remotes.add(file.based_on.url)

if ref and len(unique_remotes) > 1:
raise errors.ParameterError(
Expand All @@ -741,18 +752,24 @@ def update_datasets(
updated = update_linked_files(linked_files, dry_run=dry_run)
updated_files.extend(updated)

if git_files and not no_remote:
updated, deleted = update_dataset_git_files(files=git_files, ref=ref, delete=delete, dry_run=dry_run)
updated_files.extend(updated)
deleted_files.extend(deleted)
provider_context: Dict[str, Any] = {}

for provider, files in provider_files.items():
if (no_remote and cast(ProviderApi, provider).is_remote) or (
no_local and not cast(ProviderApi, provider).is_remote
):
continue

if local_files and not no_local:
updated, deleted, new = update_dataset_local_files(
records=local_files, check_data_directory=check_data_directory
results = provider.update_files(
files=files,
dry_run=dry_run,
delete=delete,
context=provider_context,
ref=ref,
check_data_directory=check_data_directory,
)
updated_files.extend(updated)
deleted_files.extend(deleted)
updated_files.extend(new)
updated_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.UPDATE)
deleted_files.extend(r.entity for r in results if r.action == DatasetUpdateAction.DELETE)

if not dry_run:
if deleted_files and not delete:
Expand Down Expand Up @@ -974,154 +991,30 @@ def move_files(dataset_gateway: IDatasetGateway, files: Dict[Path, Path], to_dat
datasets_provenance.add_or_update(to_dataset, creator=creator)


def update_dataset_local_files(
records: List[DynamicProxy], check_data_directory: bool
) -> Tuple[List[DynamicProxy], List[DynamicProxy], List[DynamicProxy]]:
"""Update files metadata from the git history.
Args:
records(List[DynamicProxy]): File records to update.
check_data_directory(bool): Whether to check the dataset's data directory for new files.
Returns:
Tuple[List[DynamicProxy], List[DynamicProxy]]: Tuple of updated and deleted file records.
"""
updated_files: List[DynamicProxy] = []
deleted_files: List[DynamicProxy] = []
new_files: List[DynamicProxy] = []
progress_text = "Checking for local updates"

try:
communication.start_progress(progress_text, len(records))
check_paths = []
records_to_check = []

for file in records:
communication.update_progress(progress_text, 1)

if file.based_on or file.linked:
continue

if not (project_context.path / file.entity.path).exists():
deleted_files.append(file)
continue

check_paths.append(file.entity.path)
records_to_check.append(file)

checksums = project_context.repository.get_object_hashes(check_paths)

for file in records_to_check:
current_checksum = checksums.get(file.entity.path)
if not current_checksum:
deleted_files.append(file)
elif current_checksum != file.entity.checksum:
updated_files.append(file)
elif check_data_directory and not any(file.entity.path == f.entity.path for f in file.dataset.files):
datadir = file.dataset.get_datadir()
try:
get_safe_relative_path(file.entity.path, datadir)
except ValueError:
continue

new_files.append(file)
finally:
communication.finalize_progress(progress_text)

return updated_files, deleted_files, new_files


def _update_datasets_files_metadata(updated_files: List[DynamicProxy], deleted_files: List[DynamicProxy], delete: bool):
modified_datasets = {}
checksums = project_context.repository.get_object_hashes([file.entity.path for file in updated_files])
for file in updated_files:
new_file = DatasetFile.from_path(
path=file.entity.path, based_on=file.based_on, source=file.source, checksum=checksums.get(file.entity.path)
)
modified_datasets[file.dataset.name] = file.dataset
modified_datasets[file.dataset.name] = (
file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset
)
file.dataset.add_or_update_files(new_file)

if delete:
for file in deleted_files:
modified_datasets[file.dataset.name] = file.dataset
modified_datasets[file.dataset.name] = (
file.dataset._subject if isinstance(file.dataset, DynamicProxy) else file.dataset
)
file.dataset.unlink_file(file.entity.path)

datasets_provenance = DatasetsProvenance()
for dataset in modified_datasets.values():
datasets_provenance.add_or_update(dataset, creator=get_git_user(repository=project_context.repository))


def update_dataset_git_files(
files: List[DynamicProxy], ref: Optional[str], delete: bool, dry_run: bool
) -> Tuple[List[DynamicProxy], List[DynamicProxy]]:
"""Update files and dataset metadata according to their remotes.
Args:
files(List[DynamicProxy]): List of files to be updated.
ref(Optional[str]): Reference to use for update.
delete(bool, optional): Indicates whether to delete files or not (Default value = False).
dry_run(bool): Whether to perform update or only print changes.
Returns:
Tuple[List[DynamicProxy], List[DynamicProxy]]: Tuple of updated and deleted file records.
"""
visited_repos: Dict[str, "Repository"] = {}
updated_files: List[DynamicProxy] = []
deleted_files: List[DynamicProxy] = []

progress_text = "Checking files for updates"

try:
communication.start_progress(progress_text, len(files))
for file in files:
communication.update_progress(progress_text, 1)
if not file.based_on:
continue

based_on = file.based_on
url = based_on.url
if url in visited_repos:
remote_repository = visited_repos[url]
else:
communication.echo(msg="Cloning remote repository...")
path = get_cache_directory_for_repository(url=url)
remote_repository = clone_repository(url=url, path=path, checkout_revision=ref)
visited_repos[url] = remote_repository

checksum = remote_repository.get_object_hash(path=based_on.path, revision="HEAD")
found = checksum is not None
changed = found and based_on.checksum != checksum

src = remote_repository.path / based_on.path
dst = project_context.metadata_path.parent / file.entity.path

if not found:
if not dry_run and delete:
delete_dataset_file(dst, follow_symlinks=True)
project_context.repository.add(dst, force=True)
deleted_files.append(file)
elif changed:
if not dry_run:
# Fetch file if it is tracked by Git LFS
pull_paths_from_storage(remote_repository, remote_repository.path / based_on.path)
if is_linked_file(path=src, project_path=remote_repository.path):
delete_dataset_file(dst, follow_symlinks=True)
create_external_file(target=src.resolve(), path=dst)
else:
shutil.copy(src, dst)
file.based_on = RemoteEntity(
checksum=checksum, path=based_on.path, url=based_on.url # type: ignore
)
updated_files.append(file)
finally:
communication.finalize_progress(progress_text)

if not updated_files and (not delete or not deleted_files):
# Nothing to commit or update
return [], deleted_files

return updated_files, deleted_files


def update_linked_files(records: List[DynamicProxy], dry_run: bool) -> List[DynamicProxy]:
"""Update files linked to other files in the project.
Expand Down Expand Up @@ -1230,7 +1123,7 @@ def should_include(filepath: Path) -> bool:
continue

record = DynamicProxy(file)
record.dataset = dataset
record.dataset = DynamicProxy(dataset)
records.append(record)

if not check_data_directory:
Expand Down
Loading

0 comments on commit e02e5bf

Please sign in to comment.