Skip to content

Commit

Permalink
fix: Write job attachments manifests locally when submitting
Browse files Browse the repository at this point in the history
Signed-off by: Caden Marofke <[email protected]>
  • Loading branch information
marofke committed Apr 8, 2024
1 parent 8742572 commit 7b5752a
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 11 deletions.
12 changes: 10 additions & 2 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from .. import api
from ..exceptions import DeadlineOperationError, CreateJobWaiterCanceled
from ..config import get_setting, set_setting, config_file
from ..job_bundle import deadline_yaml_dump
from ..job_bundle import create_job_history_bundle_dir, deadline_yaml_dump
from ..job_bundle.loader import (
read_yaml_or_json,
read_yaml_or_json_object,
Expand Down Expand Up @@ -254,7 +254,10 @@ def create_job_from_job_bundle(
)

attachment_settings = _upload_attachments(
asset_manager, asset_manifests, print_function_callback, upload_progress_callback
asset_manager,
asset_manifests,
print_function_callback,
upload_progress_callback,
)
attachment_settings["fileSystem"] = JobAttachmentsFileSystem(
job_attachments_file_system
Expand Down Expand Up @@ -418,10 +421,15 @@ def _default_update_upload_progress(upload_metadata: Dict[str, str]) -> bool:
if not upload_progress_callback:
upload_progress_callback = _default_update_upload_progress

# Create job history dir for the manifest file(s)
manifest_write_dir = os.path.join(
create_job_history_bundle_dir("JobBundle", "CLIJob"), "manifests"
)
upload_summary, attachment_settings = asset_manager.upload_assets(
manifests=manifests,
on_uploading_assets=upload_progress_callback,
s3_check_cache_dir=config_file.get_cache_directory(),
manifest_write_dir=manifest_write_dir,
)
api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary(
upload_summary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ def _update_upload_progress(upload_metadata: ProgressReportMetadata) -> bool:
manifests=manifests,
on_uploading_assets=_update_upload_progress,
s3_check_cache_dir=config_file.get_cache_directory(),
manifest_write_dir=os.path.join(self._job_bundle_dir, "manifests"),
)

logger.info("Finished uploading job attachments files.")
Expand Down
2 changes: 2 additions & 0 deletions src/deadline/job_attachments/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ These snapshots are encapsulated in one or more [`asset_manifests`](asset_manife

When starting work, the worker downloads the manifest associated with your job, and recreates the file structure of your submission locally, either downloading all files at once, or as needed if using the [virtual][vfs] job attachments filesystem type. When a task completes, the worker creates a new manifest for any outputs that were specified in the job submission, and uploads the manifest and the outputs back to your S3 bucket.

Manifest files are written locally in the job history directory for debugging purposes (default: `~/.dealine/job_history`).

[vfs]: https://docs.aws.amazon.com/deadline-cloud/latest/userguide/storage-virtual.html

## Local Cache Files
Expand Down
28 changes: 19 additions & 9 deletions src/deadline/job_attachments/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ def upload_assets(
file_system_location_name: Optional[str] = None,
progress_tracker: Optional[ProgressTracker] = None,
s3_check_cache_dir: Optional[str] = None,
manifest_write_dir: Optional[str] = None,
) -> tuple[str, str]:
"""
Uploads assets based off of an asset manifest, uploads the asset manifest.
Expand All @@ -160,15 +161,6 @@ def upload_assets(
Returns:
A tuple of (the partial key for the manifest on S3, the hash of input manifest).
"""
# Upload assets
self.upload_input_files(
manifest,
job_attachment_settings.s3BucketName,
source_root,
job_attachment_settings.full_cas_prefix(),
progress_tracker,
s3_check_cache_dir,
)

# Upload asset manifest
hash_alg = manifest.get_default_hash_alg()
Expand All @@ -178,6 +170,12 @@ def upload_assets(
)
manifest_name = f"{manifest_name_prefix}_input"

if manifest_write_dir:
manifest_dir = os.path.join(manifest_write_dir, manifest_name)
os.makedirs(manifest_write_dir, exist_ok=True)
with open(manifest_dir, "w") as file:
file.write(manifest.encode())

if partial_manifest_prefix:
partial_manifest_key = _join_s3_paths(partial_manifest_prefix, manifest_name)
else:
Expand All @@ -193,6 +191,16 @@ def upload_assets(
key=full_manifest_key,
)

# Upload assets
self.upload_input_files(
manifest,
job_attachment_settings.s3BucketName,
source_root,
job_attachment_settings.full_cas_prefix(),
progress_tracker,
s3_check_cache_dir,
)

return (partial_manifest_key, hash_data(manifest_bytes, hash_alg))

def upload_input_files(
Expand Down Expand Up @@ -1111,6 +1119,7 @@ def upload_assets(
manifests: list[AssetRootManifest],
on_uploading_assets: Optional[Callable[[Any], bool]] = None,
s3_check_cache_dir: Optional[str] = None,
manifest_write_dir: Optional[str] = None,
) -> tuple[SummaryStatistics, Attachments]:
"""
Uploads all the files for provided manifests and manifests themselves to S3.
Expand Down Expand Up @@ -1161,6 +1170,7 @@ def upload_assets(
file_system_location_name=asset_root_manifest.file_system_location_name,
progress_tracker=progress_tracker,
s3_check_cache_dir=s3_check_cache_dir,
manifest_write_dir=manifest_write_dir,
)
manifest_properties.inputManifestPath = partial_manifest_key
manifest_properties.inputManifestHash = asset_manifest_hash
Expand Down
9 changes: 9 additions & 0 deletions test/unit/deadline_job_attachments/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ def test_asset_management(
output_dir1 = tmpdir.join("outputs")
output_dir2 = tmpdir.join("outputs").join("textures")

history_dir = tmpdir.join("history")
expected_manifest_dir = history_dir.join("e_input")
assert not os.path.exists(history_dir)
assert not os.path.exists(expected_manifest_dir)

expected_total_input_bytes = (
scene_file.size() + texture_file.size() + normal_file.size() + meta_file.size()
)
Expand Down Expand Up @@ -196,6 +201,7 @@ def test_asset_management(
manifests=asset_root_manifests,
on_uploading_assets=mock_on_uploading_assets,
s3_check_cache_dir=str(cache_dir),
manifest_write_dir=str(history_dir),
)

# Then
Expand Down Expand Up @@ -236,6 +242,9 @@ def test_asset_management(

assert f"assetRoot/Manifests/{farm_id}/{queue_id}/Inputs/0000/e_input" in caplog.text

# Ensure we wrote our manifest file locally
assert os.path.exists(expected_manifest_dir)

assert_progress_report_last_callback(
num_input_files=4,
expected_total_input_bytes=expected_total_input_bytes,
Expand Down

0 comments on commit 7b5752a

Please sign in to comment.