From e0d514a85d7768710c4c3f7b9b6ee77e5ec52c3c Mon Sep 17 00:00:00 2001 From: David Leong <116610336+leongdl@users.noreply.github.com> Date: Wed, 24 Jul 2024 13:20:16 -0700 Subject: [PATCH] feat: move job attachments time stamp to after download. Add a retry to ensure timestamp is updated Signed-off-by: David Leong <116610336+leongdl@users.noreply.github.com> --- pyproject.toml | 2 ++ src/deadline/job_attachments/download.py | 24 +++++++++++++++---- .../deadline_job_attachments/test_download.py | 4 ++-- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index d983b3b2c..c30a98d08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,6 +47,8 @@ dependencies = [ "jsonschema == 4.17.*", "pywin32 == 306; sys_platform == 'win32'", "QtPy == 2.4.*", + "retry >= 0.9.2", + "types-retry >= 0.9.2", ] [project.urls] diff --git a/src/deadline/job_attachments/download.py b/src/deadline/job_attachments/download.py index ef16cca66..ecd3eca98 100644 --- a/src/deadline/job_attachments/download.py +++ b/src/deadline/job_attachments/download.py @@ -14,6 +14,7 @@ from itertools import chain from logging import Logger, LoggerAdapter, getLogger from pathlib import Path +from retry import retry from tempfile import NamedTemporaryFile from typing import Any, Callable, DefaultDict, List, Optional, Tuple, Union @@ -386,7 +387,7 @@ def download_file( modified_time_override: Optional[float] = None, progress_tracker: Optional[ProgressTracker] = None, file_conflict_resolution: Optional[FileConflictResolution] = FileConflictResolution.CREATE_COPY, -) -> Tuple[int, Optional[Path]]: +) -> Tuple[int, Optional[Path], float]: """ Downloads a file from the S3 bucket to the local directory. `modified_time_override` is ignored if the manifest version used supports timestamps. @@ -416,7 +417,7 @@ def download_file( # If the file name already exists, resolve the conflict based on the file_conflict_resolution if local_file_name.is_file(): if file_conflict_resolution == FileConflictResolution.SKIP: - return (file_bytes, None) + return (file_bytes, None, 0) elif file_conflict_resolution == FileConflictResolution.OVERWRITE: pass elif file_conflict_resolution == FileConflictResolution.CREATE_COPY: @@ -541,9 +542,17 @@ def process_client_error(exc: ClientError, status_code: int): raise AssetSyncError(e) from e download_logger.debug(f"Downloaded {file.path} to {str(local_file_name)}") - os.utime(local_file_name, (modified_time_override, modified_time_override)) # type: ignore[arg-type] - return (file_bytes, local_file_name) + return (file_bytes, local_file_name, modified_time_override) + + +@retry(FileNotFoundError, delay=1, tries=3) +def _update_file_time_stamps(files: List[Tuple]): + """ + Updates the file time stamp for all input files. Retries if the file is not found due to file system consistency. + """ + for local_file_name, modified_time_override in files: + os.utime(local_file_name, (modified_time_override, modified_time_override)) def _download_files_parallel( @@ -564,6 +573,7 @@ def _download_files_parallel( Returns a list of local paths of downloaded files. """ downloaded_file_names: list[str] = [] + downloaded_file_name_timestamps: list[tuple] = [] with concurrent.futures.ThreadPoolExecutor(max_workers=num_download_workers) as executor: futures = { @@ -584,9 +594,10 @@ def _download_files_parallel( } # surfaces any exceptions in the thread for future in concurrent.futures.as_completed(futures): - (file_bytes, local_file_name) = future.result() + (file_bytes, local_file_name, modified_time_override) = future.result() if local_file_name: downloaded_file_names.append(str(local_file_name.resolve())) + downloaded_file_name_timestamps.append((local_file_name, modified_time_override)) if progress_tracker: progress_tracker.increase_processed(1, 0) progress_tracker.report_progress() @@ -595,6 +606,9 @@ def _download_files_parallel( progress_tracker.increase_skipped(1, file_bytes) progress_tracker.report_progress() + # Update the timestamps on all file download completion + _update_file_time_stamps(downloaded_file_name_timestamps) + # to report progress 100% at the end if progress_tracker: progress_tracker.report_progress() diff --git a/test/unit/deadline_job_attachments/test_download.py b/test/unit/deadline_job_attachments/test_download.py index 4f6372348..8a65087bd 100644 --- a/test/unit/deadline_job_attachments/test_download.py +++ b/test/unit/deadline_job_attachments/test_download.py @@ -2426,11 +2426,11 @@ def test_download_files_from_manifests( def download_file(*args): nonlocal downloaded_files downloaded_files.append(args[0].path) - return (40, Path(args[0].path)) + return (40, Path(args[0].path), 123456) with patch( f"{deadline.__package__}.job_attachments.download.download_file", side_effect=download_file - ): + ), patch(f"{deadline.__package__}.job_attachments.download._update_file_time_stamps"): download_files_from_manifests( s3_bucket="s3_settings.s3BucketName", manifests_by_root={"/test": merged_manifest},