Skip to content

Commit

Permalink
fix: Write job attachment manifests locally when submitting
Browse files Browse the repository at this point in the history
Signed-off-by: Caden Marofke <[email protected]>
  • Loading branch information
marofke committed Apr 10, 2024
1 parent 4d40b8c commit 5f41d6c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def _update_upload_progress(upload_metadata: ProgressReportMetadata) -> bool:
manifests=manifests,
on_uploading_assets=_update_upload_progress,
s3_check_cache_dir=config_file.get_cache_directory(),
manifest_write_dir=self._job_bundle_dir,
)

logger.info("Finished uploading job attachments files.")
Expand Down
2 changes: 2 additions & 0 deletions src/deadline/job_attachments/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ These snapshots are encapsulated in one or more [`asset_manifests`](asset_manife

When starting work, the worker downloads the manifest associated with your job, and recreates the file structure of your submission locally, either downloading all files at once, or as needed if using the [virtual][vfs] job attachments filesystem type. When a task completes, the worker creates a new manifest for any outputs that were specified in the job submission, and uploads the manifest and the outputs back to your S3 bucket.

Manifest files are written to a `manifests` directory within each job bundle that is added to the job history if submitted through the GUI (default: `~/.deadline/job_history`). The file path inside the `manifests` directory corresponds to the S3 manifest path in the submitted job's job attachments metadata.

[vfs]: https://docs.aws.amazon.com/deadline-cloud/latest/userguide/storage-virtual.html

## Local Cache Files
Expand Down
31 changes: 22 additions & 9 deletions src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def upload_assets(
file_system_location_name: Optional[str] = None,
progress_tracker: Optional[ProgressTracker] = None,
s3_check_cache_dir: Optional[str] = None,
manifest_write_dir: Optional[str] = None,
) -> tuple[str, str]:
"""
Uploads assets based off of an asset manifest, uploads the asset manifest.
Expand All @@ -160,15 +161,6 @@ def upload_assets(
Returns:
A tuple of (the partial key for the manifest on S3, the hash of input manifest).
"""
# Upload assets
self.upload_input_files(
manifest,
job_attachment_settings.s3BucketName,
source_root,
job_attachment_settings.full_cas_prefix(),
progress_tracker,
s3_check_cache_dir,
)

# Upload asset manifest
hash_alg = manifest.get_default_hash_alg()
Expand All @@ -178,6 +170,15 @@ def upload_assets(
)
manifest_name = f"{manifest_name_prefix}_input"

if manifest_write_dir:
local_manifest_file = os.path.join(
manifest_write_dir, "manifests", partial_manifest_prefix, manifest_name
)
logger.info(f"Creating local manifest file: {local_manifest_file}\n")
os.makedirs(os.path.dirname(local_manifest_file), exist_ok=True)
with open(local_manifest_file, "w") as file:
file.write(manifest.encode())

if partial_manifest_prefix:
partial_manifest_key = _join_s3_paths(partial_manifest_prefix, manifest_name)
else:
Expand All @@ -193,6 +194,16 @@ def upload_assets(
key=full_manifest_key,
)

# Upload assets
self.upload_input_files(
manifest,
job_attachment_settings.s3BucketName,
source_root,
job_attachment_settings.full_cas_prefix(),
progress_tracker,
s3_check_cache_dir,
)

return (partial_manifest_key, hash_data(manifest_bytes, hash_alg))

def upload_input_files(
Expand Down Expand Up @@ -1132,6 +1143,7 @@ def upload_assets(
manifests: list[AssetRootManifest],
on_uploading_assets: Optional[Callable[[Any], bool]] = None,
s3_check_cache_dir: Optional[str] = None,
manifest_write_dir: Optional[str] = None,
) -> tuple[SummaryStatistics, Attachments]:
"""
Uploads all the files for provided manifests and manifests themselves to S3.
Expand Down Expand Up @@ -1182,6 +1194,7 @@ def upload_assets(
file_system_location_name=asset_root_manifest.file_system_location_name,
progress_tracker=progress_tracker,
s3_check_cache_dir=s3_check_cache_dir,
manifest_write_dir=manifest_write_dir,
)
manifest_properties.inputManifestPath = partial_manifest_key
manifest_properties.inputManifestHash = asset_manifest_hash
Expand Down
17 changes: 17 additions & 0 deletions test/unit/deadline_job_attachments/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ def test_asset_management(
output_dir1 = tmpdir.join("outputs")
output_dir2 = tmpdir.join("outputs").join("textures")

history_dir = tmpdir.join("history")
expected_manifest_file = (
history_dir.join("manifests")
.join(farm_id)
.join(queue_id)
.join("Inputs")
.join("0000")
.join("e_input")
)
assert not os.path.exists(history_dir)
assert not os.path.exists(expected_manifest_file)

expected_total_input_bytes = (
scene_file.size() + texture_file.size() + normal_file.size() + meta_file.size()
)
Expand Down Expand Up @@ -196,6 +208,7 @@ def test_asset_management(
manifests=asset_root_manifests,
on_uploading_assets=mock_on_uploading_assets,
s3_check_cache_dir=str(cache_dir),
manifest_write_dir=str(history_dir),
)

# Then
Expand Down Expand Up @@ -236,6 +249,10 @@ def test_asset_management(

assert f"assetRoot/Manifests/{farm_id}/{queue_id}/Inputs/0000/e_input" in caplog.text

# Ensure we wrote our manifest file locally
assert os.path.exists(expected_manifest_file)
assert os.path.isfile(expected_manifest_file)

assert_progress_report_last_callback(
num_input_files=4,
expected_total_input_bytes=expected_total_input_bytes,
Expand Down

0 comments on commit 5f41d6c

Please sign in to comment.