Skip to content

Commit

Permalink
move snapshot to embedded file and address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Godot Bian <[email protected]>
  • Loading branch information
godobyte committed Dec 4, 2024
1 parent 3d00309 commit b6d9839
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ def __init__(
self._step_details = step_details
self._logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": session_id})

def set_step_script(self, manifests, s3_settings) -> None:
def set_step_script(self, manifests: list[str], s3_settings: JobAttachmentS3Settings) -> None:
"""Sets the step script for the action
Parameters
----------
manifests : list[BaseAssetManifest]
The job attachment manifests
manifests : list[str]
The job attachment manifest paths
s3_settings : JobAttachmentS3Settings
The job attachment S3 settings
"""
Expand Down Expand Up @@ -225,15 +225,15 @@ def start(
# returns root path to PathMappingRule mapping
dynamic_mapping_rules: dict[str, PathMappingRule] = (
session._asset_sync.generate_dynamic_path_mapping(
session_dir=session.openjd_session.working_directory,
session_dir=session.working_directory,
attachments=attachments,
)
)

# Aggregate manifests (with step step dependency handling)
merged_manifests_by_root: dict[str, BaseAssetManifest] = (
session._asset_sync._aggregate_asset_root_manifests(
session_dir=session.openjd_session.working_directory,
session_dir=session.working_directory,
s3_settings=s3_settings,
queue_id=session._queue_id,
job_id=session._queue._job_id,
Expand Down Expand Up @@ -276,10 +276,9 @@ def start(

manifest_paths_by_root = session._asset_sync._check_and_write_local_manifests(
merged_manifests_by_root=merged_manifests_by_root,
manifest_write_dir=str(session.openjd_session.working_directory),
manifest_write_dir=str(session.working_directory),
)
# TODO: remove type: ignore for manifest_paths_by_root after deadline-cloud release
session.manifest_paths_by_root = manifest_paths_by_root # type: ignore
session.manifest_paths_by_root = manifest_paths_by_root

self.set_step_script(
manifests=manifest_paths_by_root.values(), # type: ignore
Expand Down Expand Up @@ -332,7 +331,7 @@ def _start_vfs(
assert session._asset_sync is not None
session._asset_sync._launch_vfs(
s3_settings=s3_settings,
session_dir=session.openjd_session.working_directory,
session_dir=session.working_directory,
fs_permission_settings=fs_permission_settings,
merged_manifests_by_root=merged_manifests_by_root,
os_env_vars=dict(os.environ),
Expand Down
41 changes: 16 additions & 25 deletions src/deadline_worker_agent/sessions/actions/run_attachment_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from concurrent.futures import (
Executor,
)
import os
import json
import sys
from logging import LoggerAdapter
from typing import Any, TYPE_CHECKING, Optional
Expand All @@ -21,8 +21,6 @@
StepActions as StepActions_2023_09,
)
from openjd.model import ParameterValue
from deadline.job_attachments.api.manifest import _manifest_snapshot
from deadline.job_attachments.models import ManifestSnapshot

from ...log_messages import SessionActionLogKind
from .openjd_action import OpenjdAction
Expand Down Expand Up @@ -67,17 +65,28 @@ def __init__(

self._logger = LoggerAdapter(OPENJD_LOG, extra={"session_id": session_id})

def set_step_script(self, manifests, s3_settings) -> None:
def set_step_script(
self, manifest_paths_by_root: dict[str, str], s3_settings: JobAttachmentS3Settings
) -> None:
"""Sets the step script for the action
Parameters
----------
manifest_paths_by_root : dict[str, str]
A dictionary mapping root paths to manifest paths
s3_settings : JobAttachmentS3Settings
The S3 settings for the job attachment
"""

args = [
"{{ Task.File.AttachmentUpload }}",
"-pm",
"{{ Session.PathMappingRulesFile }}",
"-s3",
s3_settings.to_s3_root_uri(),
"-m",
"-mm",
json.dumps(manifest_paths_by_root),
]
args.extend(manifests)

executable_path = Path(sys.executable)
python_path = executable_path.parent / executable_path.name.lower().replace(
Expand Down Expand Up @@ -155,27 +164,9 @@ def start(
)

manifest_paths_by_root = session.manifest_paths_by_root
output_path = os.path.join(session.openjd_session.working_directory, "diff")
manifests = list()

for root, path in manifest_paths_by_root.items():
self._logger.info(
f"Snapshooting manifest {path} for local root {root}",
extra={"openjd_log_content": LogContent.PARAMETER_INFO},
)
manifest: Optional[ManifestSnapshot] = _manifest_snapshot(
root=root,
destination=str(output_path),
# `output` is used for job download to discover output manifests
# manifest file name need to contain the hash of root path for attachment CLI path mapping
name=f"output-{os.path.basename(path)}",
diff=path,
)
if manifest:
manifests.append(manifest.manifest)

self.set_step_script(
manifests=manifests,
manifest_paths_by_root=manifest_paths_by_root,
s3_settings=s3_settings,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import time
import os
import boto3
import json
from typing import Optional

from deadline.job_attachments import api
from deadline.job_attachments.api.manifest import _manifest_snapshot
from deadline.job_attachments.models import ManifestSnapshot

"""
A small script to upload job output using attachment upload.
Expand All @@ -17,8 +21,7 @@
python attachment_upload.py \
-pm /sessions/session-f63c206fb5f04c04aa17821001aa3847fajfm5x4/path_mapping.json \
-s3 s3://test-job-attachment/DeadlineCloud \
-m /sessions/session-f63c206fb5f04c04aa17821001aa3847fajfm5x4/manifests/0bb7eb91fdf8780c4a7e6174de6dfc5e_manifest \
-m /sessions/session-f63c206fb5f04c04aa17821001aa3847fajfm5x4/manifests/0bb7eb91fdf8780c4a7e6174de6dfc5e_manifest
-mm '{"/sessions/session-e0317487a6cd470084b1c6fd85c789e6ank4lmh5/assetroot-a7714e87e776d9f1c179": "/sessions/session-e0317487a6cd470084b1c6fd85c789e6ank4lmh5/manifests/0bb7eb91fdf8780c4a7e6174de6dfc5e_manifest"}'
"""


Expand All @@ -33,18 +36,39 @@ def upload(s3_root_uri: str, path_mapping_rules: str, manifests: list[str]) -> N
)


def snapshot(manifest_paths_by_root: dict[str, str]) -> list[str]:
output_path = os.path.join(os.getcwd(), "diff")
manifests = list()

for root, path in manifest_paths_by_root.items():
manifest: Optional[ManifestSnapshot] = _manifest_snapshot(
root=root,
destination=str(output_path),
# `output` is used for job download to discover output manifests
# manifest file name need to contain the hash of root path for attachment CLI path mapping
name=f"output-{os.path.basename(path)}",
diff=path,
)
if manifest:
manifests.append(manifest.manifest)

return manifests


if __name__ == "__main__":
start_time = time.perf_counter()

parser = argparse.ArgumentParser()
parser.add_argument("-pm", "--path-mapping", type=str, help="", required=True)
parser.add_argument("-s3", "--s3-uri", type=str, help="", required=True)
parser.add_argument("-m", "--manifests", nargs="*", type=str, help="", required=True)
parser.add_argument("-mm", "--manifest-map", type=json.loads, required=True)

args = parser.parse_args()
path_mapping = args.path_mapping
s3_uri = args.s3_uri
manifests = args.manifests
manifest_map = args.manifest_map

manifests = snapshot(manifest_paths_by_root=manifest_map)

print("\nStarting upload...")
upload(manifests=manifests, s3_root_uri=s3_uri, path_mapping_rules=path_mapping)
Expand Down
5 changes: 5 additions & 0 deletions src/deadline_worker_agent/sessions/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,11 @@ def openjd_session(self) -> OPENJDSession:
"""The openjd session for this session"""
return self._session

@property
def working_directory(self) -> Path:
"""The working directory for this session"""
return self._session.working_directory

@property
def id(self) -> str:
"""The unique session ID"""
Expand Down
23 changes: 16 additions & 7 deletions test/unit/scheduler/test_session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def test_sync_input_job_attachments_actions(

@pytest.mark.skipif(
not ASSET_SYNC_JOB_USER_FEATURE,
reason="This test will always run after releasing the asset sync job user feature",
reason="This test will be run unconditionally after releasing the asset sync job user featuer",
)
@pytest.mark.parametrize(
"action, expected",
Expand All @@ -279,7 +279,7 @@ def test_sync_input_job_attachments_actions(
manifests=[],
),
),
id="attachmen download job input",
id="attachment download job input",
),
pytest.param(
AttachmentDownloadActionQueueEntry(
Expand All @@ -299,7 +299,7 @@ def test_sync_input_job_attachments_actions(
step_id="step-1234",
),
),
id="attachmen download step dependency",
id="attachment download step dependency",
),
pytest.param(
AttachmentUploadActionQueueEntry(
Expand All @@ -321,7 +321,7 @@ def test_sync_input_job_attachments_actions(
),
],
)
def test_attachments_download_actions(
def test_attachments_transfer_actions(
self,
action: AttachmentDownloadActionQueueEntry | AttachmentUploadActionQueueEntry,
expected: AttachmentDownloadAction | AttachmentUploadAction,
Expand All @@ -345,8 +345,17 @@ def test_attachment_upload_insert_dequeue(
session_queue: SessionActionQueue,
) -> None:
# GIVEN
action = EnvironmentQueueEntry(
Mock(), # cancel event
EnvironmentAction(
sessionActionId="id-env", actionType="ENV_ENTER", environmentId="envid"
),
)
session_queue._actions = [action]
session_queue._actions_by_id[action.definition["sessionActionId"]] = action

upload_action = AttachmentUploadActionBoto(
sessionActionId="id",
sessionActionId="id-upload",
actionType="SYNC_OUTPUT_JOB_ATTACHMENTS",
stepId="step-1",
taskId="task-1",
Expand All @@ -356,8 +365,8 @@ def test_attachment_upload_insert_dequeue(
session_queue.insert_front(action=upload_action)

# THEN
assert len(session_queue._actions) == 1
assert "id" in session_queue._actions_by_id
assert len(session_queue._actions) == 2
assert "id-upload" in session_queue._actions_by_id

# WHEN
next_action = session_queue.dequeue()
Expand Down
42 changes: 12 additions & 30 deletions test/unit/sessions/actions/test_run_attachment_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import sys
import tempfile
import json
from typing import TYPE_CHECKING, Generator
from unittest.mock import MagicMock, Mock, patch

Expand All @@ -24,7 +25,6 @@

import deadline_worker_agent.sessions.session as session_mod
from deadline.job_attachments.models import JobAttachmentS3Settings
from deadline.job_attachments.models import ManifestSnapshot

if TYPE_CHECKING:
from deadline_worker_agent.sessions.job_entities import JobAttachmentDetails
Expand Down Expand Up @@ -78,9 +78,11 @@ def action_id() -> str:
@pytest.fixture
def action(
action_id: str,
step_id: str,
task_id: str,
) -> actions_module.AttachmentUploadAction:
return actions_module.AttachmentUploadAction(
id=action_id, session_id="session-1234", step_id="step-1234", task_id="task-1234"
id=action_id, session_id="session-1234", step_id=step_id, task_id=task_id
)


Expand Down Expand Up @@ -114,26 +116,16 @@ def session(

return session

@pytest.fixture(autouse=True)
def mock_manifest_snapshot(self) -> Generator[MagicMock, None, None]:
with patch.object(
actions_module.run_attachment_upload,
"_manifest_snapshot",
return_value=ManifestSnapshot(
manifest=f"{session_dir}/diff/output-hash_manifest-timestamp.manifest"
),
) as mock_snapshot:
yield mock_snapshot

def test_attachment_upload_action_start(
self,
executor: Mock,
session: Mock,
action: actions_module.AttachmentUploadAction,
session_dir: str,
mock_manifest_snapshot: MagicMock,
job_details: JobDetails,
python_path: str,
step_id: str,
task_id: str,
action_id: str,
) -> None:
"""
Tests that AttachmentUploadAction.start() calls AssetSync functions to prepare input
Expand All @@ -152,15 +144,6 @@ def test_attachment_upload_action_start(
# WHEN
action.start(session=session, executor=executor)

# THEN
for root, path in session.manifest_paths_by_root.items():
mock_manifest_snapshot.assert_any_call(
root=root,
destination=str(os.path.join(session_dir, "diff")),
name=f"output-{os.path.basename(path)}",
diff=path,
)

with open(
Path(os.path.dirname(actions_module.__file__)) / "scripts" / "attachment_upload.py",
"r",
Expand All @@ -175,9 +158,8 @@ def test_attachment_upload_action_start(
"{{ Session.PathMappingRulesFile }}",
"-s3",
s3_settings.to_s3_root_uri(),
"-m",
mock_manifest_snapshot.return_value.manifest,
mock_manifest_snapshot.return_value.manifest,
"-mm",
json.dumps(session.manifest_paths_by_root),
],
)
),
Expand All @@ -195,8 +177,8 @@ def test_attachment_upload_action_start(
step_script=action._step_script,
task_parameter_values=dict[str, ParameterValue](),
os_env_vars={
"DEADLINE_SESSIONACTION_ID": "sessionaction-abc123",
"DEADLINE_STEP_ID": "step-1234",
"DEADLINE_TASK_ID": "task-1234",
"DEADLINE_SESSIONACTION_ID": action_id,
"DEADLINE_STEP_ID": step_id,
"DEADLINE_TASK_ID": task_id,
},
)
2 changes: 1 addition & 1 deletion test/unit/sessions/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ def sync_asset_outputs_side_effect(*, current_action: CurrentAction) -> None:

@pytest.mark.skipif(
not ASSET_SYNC_JOB_USER_FEATURE,
reason="This test will be removed after releasing the asset sync job user feature",
reason="This test will be run unconditionally after releasing the asset sync job user featuer",
)
def test_success_task_run_attachment_upload(
self,
Expand Down

0 comments on commit b6d9839

Please sign in to comment.