Skip to content

Commit

Permalink
feat: move job attachments time stamp to after download. Add a retry …
Browse files Browse the repository at this point in the history
…to ensure timestamp is updated

Signed-off-by: David Leong <[email protected]>
  • Loading branch information
leongdl committed Jul 25, 2024
1 parent 950cd1a commit e0d514a
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ dependencies = [
"jsonschema == 4.17.*",
"pywin32 == 306; sys_platform == 'win32'",
"QtPy == 2.4.*",
"retry >= 0.9.2",
"types-retry >= 0.9.2",
]

[project.urls]
Expand Down
24 changes: 19 additions & 5 deletions src/deadline/job_attachments/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from itertools import chain
from logging import Logger, LoggerAdapter, getLogger
from pathlib import Path
from retry import retry
from tempfile import NamedTemporaryFile
from typing import Any, Callable, DefaultDict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -386,7 +387,7 @@ def download_file(
modified_time_override: Optional[float] = None,
progress_tracker: Optional[ProgressTracker] = None,
file_conflict_resolution: Optional[FileConflictResolution] = FileConflictResolution.CREATE_COPY,
) -> Tuple[int, Optional[Path]]:
) -> Tuple[int, Optional[Path], float]:
"""
Downloads a file from the S3 bucket to the local directory. `modified_time_override` is ignored if the manifest
version used supports timestamps.
Expand Down Expand Up @@ -416,7 +417,7 @@ def download_file(
# If the file name already exists, resolve the conflict based on the file_conflict_resolution
if local_file_name.is_file():
if file_conflict_resolution == FileConflictResolution.SKIP:
return (file_bytes, None)
return (file_bytes, None, 0)
elif file_conflict_resolution == FileConflictResolution.OVERWRITE:
pass
elif file_conflict_resolution == FileConflictResolution.CREATE_COPY:
Expand Down Expand Up @@ -541,9 +542,17 @@ def process_client_error(exc: ClientError, status_code: int):
raise AssetSyncError(e) from e

download_logger.debug(f"Downloaded {file.path} to {str(local_file_name)}")
os.utime(local_file_name, (modified_time_override, modified_time_override)) # type: ignore[arg-type]

return (file_bytes, local_file_name)
return (file_bytes, local_file_name, modified_time_override)


@retry(FileNotFoundError, delay=1, tries=3)
def _update_file_time_stamps(files: List[Tuple]):
"""
Updates the file time stamp for all input files. Retries if the file is not found due to file system consistency.
"""
for local_file_name, modified_time_override in files:
os.utime(local_file_name, (modified_time_override, modified_time_override))


def _download_files_parallel(
Expand All @@ -564,6 +573,7 @@ def _download_files_parallel(
Returns a list of local paths of downloaded files.
"""
downloaded_file_names: list[str] = []
downloaded_file_name_timestamps: list[tuple] = []

with concurrent.futures.ThreadPoolExecutor(max_workers=num_download_workers) as executor:
futures = {
Expand All @@ -584,9 +594,10 @@ def _download_files_parallel(
}
# surfaces any exceptions in the thread
for future in concurrent.futures.as_completed(futures):
(file_bytes, local_file_name) = future.result()
(file_bytes, local_file_name, modified_time_override) = future.result()
if local_file_name:
downloaded_file_names.append(str(local_file_name.resolve()))
downloaded_file_name_timestamps.append((local_file_name, modified_time_override))
if progress_tracker:
progress_tracker.increase_processed(1, 0)
progress_tracker.report_progress()
Expand All @@ -595,6 +606,9 @@ def _download_files_parallel(
progress_tracker.increase_skipped(1, file_bytes)
progress_tracker.report_progress()

# Update the timestamps on all file download completion
_update_file_time_stamps(downloaded_file_name_timestamps)

# to report progress 100% at the end
if progress_tracker:
progress_tracker.report_progress()
Expand Down
4 changes: 2 additions & 2 deletions test/unit/deadline_job_attachments/test_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -2426,11 +2426,11 @@ def test_download_files_from_manifests(
def download_file(*args):
nonlocal downloaded_files
downloaded_files.append(args[0].path)
return (40, Path(args[0].path))
return (40, Path(args[0].path), 123456)

with patch(
f"{deadline.__package__}.job_attachments.download.download_file", side_effect=download_file
):
), patch(f"{deadline.__package__}.job_attachments.download._update_file_time_stamps"):
download_files_from_manifests(
s3_bucket="s3_settings.s3BucketName",
manifests_by_root={"/test": merged_manifest},
Expand Down

0 comments on commit e0d514a

Please sign in to comment.