From 1c3a9b54460a33e973305f8d48dfacbf2c237e7e Mon Sep 17 00:00:00 2001 From: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com> Date: Fri, 15 Sep 2023 12:21:26 -0500 Subject: [PATCH] fix: remove the recursive ownership/permissions changes (#26) Signed-off-by: Gahyun Suh <132245153+gahyusuh@users.noreply.github.com> --- pyproject.toml | 2 +- .../job_entities/job_attachment_details.py | 2 +- src/deadline_worker_agent/sessions/session.py | 54 ++++++------- test/unit/conftest.py | 8 +- test/unit/scheduler/test_session_queue.py | 2 +- .../sessions/test_job_attachment_details.py | 2 +- test/unit/sessions/test_job_entities.py | 2 +- test/unit/sessions/test_session.py | 77 ++++++++++--------- 8 files changed, 76 insertions(+), 73 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index e2092d56..9a1ca6f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ dynamic = ["version"] dependencies = [ "requests ~= 2.29", "boto3 ~= 1.26", - "deadline == 0.23.*", + "deadline == 0.25.*", "openjd-sessions == 0.2.*", # tomli became tomllib in standard library in Python 3.11 "tomli >= 1.1.0 ; python_version<'3.11'", diff --git a/src/deadline_worker_agent/sessions/job_entities/job_attachment_details.py b/src/deadline_worker_agent/sessions/job_entities/job_attachment_details.py index 86ac8eac..7b543c77 100644 --- a/src/deadline_worker_agent/sessions/job_entities/job_attachment_details.py +++ b/src/deadline_worker_agent/sessions/job_entities/job_attachment_details.py @@ -5,7 +5,7 @@ from typing import Any, cast from openjd.sessions import Parameter, ParameterType -from deadline.job_attachments._utils import AssetLoadingMethod +from deadline.job_attachments.models import AssetLoadingMethod from ...api_models import ( FloatParameter, diff --git a/src/deadline_worker_agent/sessions/session.py b/src/deadline_worker_agent/sessions/session.py index 683f5472..56742653 100644 --- a/src/deadline_worker_agent/sessions/session.py +++ b/src/deadline_worker_agent/sessions/session.py @@ -2,14 +2,12 @@ from __future__ import annotations -from os import chmod +import os from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, fields from datetime import datetime, timedelta, timezone from functools import partial from logging import getLogger, LoggerAdapter -from pathlib import Path -from shutil import chown from threading import Event, RLock from time import monotonic, sleep from types import TracebackType @@ -53,12 +51,13 @@ from deadline.job_attachments.asset_sync import AssetSync from deadline.job_attachments.asset_sync import logger as ASSET_SYNC_LOGGER from deadline.job_attachments.models import ( - ManifestProperties, - JobAttachmentS3Settings, Attachments, + PosixFileSystemPermissionSettings, + JobAttachmentS3Settings, + ManifestProperties, + OperatingSystemFamily, ) from deadline.job_attachments.progress_tracker import ProgressReportMetadata -from deadline.job_attachments._utils import OperatingSystemFamily from ..scheduler.session_action_status import SessionActionStatus from ..sessions.errors import SessionActionError @@ -764,8 +763,6 @@ def progress_handler(job_upload_status: ProgressReportMetadata) -> bool: return not cancel.is_set() if not (job_attachment_settings := self._job_details.job_attachment_settings): - if self._os_user is not None: - self._recursive_change_fs_group(self._session.working_directory) raise RuntimeError("Job attachment settings were not contained in JOB_DETAILS entity") if job_attachment_details: @@ -792,7 +789,7 @@ def progress_handler(job_upload_status: ProgressReportMetadata) -> bool: ManifestProperties( rootPath=manifest_properties.root_path, fileSystemLocationName=manifest_properties.file_system_location_name, - osType=OperatingSystemFamily.get_os_family(manifest_properties.os_type), + osType=OperatingSystemFamily(manifest_properties.os_type), inputManifestPath=manifest_properties.input_manifest_path, inputManifestHash=manifest_properties.input_manifest_hash, outputRelativeDirectories=manifest_properties.output_relative_directories, @@ -809,6 +806,22 @@ def progress_handler(job_upload_status: ProgressReportMetadata) -> bool: for rule in self._job_details.path_mapping_rules } + fs_permission_settings = None + if self._os_user is not None: + if os.name == "posix": + if not isinstance(self._os_user, PosixSessionUser): + raise ValueError(f"The user must be a posix-user. Got {type(self._os_user)}") + fs_permission_settings = PosixFileSystemPermissionSettings( + os_group=self._os_user.group, + dir_mode=0o20, + file_mode=0o20, + ) + else: + # TODO: Support Windows file system permission settings + raise NotImplementedError( + "File system permission settings for non-posix systems are not currently supported." + ) + # Add path mapping rules for root paths in job attachments ASSET_SYNC_LOGGER.info("Syncing inputs using Job Attachments") (download_summary_statistics, path_mapping_rules) = self._asset_sync.sync_inputs( @@ -817,6 +830,7 @@ def progress_handler(job_upload_status: ProgressReportMetadata) -> bool: queue_id=self._queue_id, # only used for error message job_id=self._queue._job_id, # only used for error message session_dir=self._session.working_directory, + fs_permission_settings=fs_permission_settings, storage_profiles_path_mapping_rules=storage_profiles_path_mapping_rules_dict, step_dependencies=step_dependencies, on_downloading_files=progress_handler, @@ -859,26 +873,6 @@ def progress_handler(job_upload_status: ProgressReportMetadata) -> bool: # sort here since we're modifying that internal list appending to the list. self._session._path_mapping_rules.sort(key=lambda rule: -len(rule.source_path.parts)) - # Open Job Description sessions handles the working directory, - # but JobAttachments adds files afterwards - if self._os_user is not None: - self._recursive_change_fs_group(self._session.working_directory) - - def _recursive_change_fs_group(self, path: Path) -> None: - """ - Recursively changes the group owner of the file-system path to group with the same name as - the step user. This can be used to allow paths written by the Worker Agent to be accessible - by the session actions. - """ - # The user that we're impersonating must be set prior to calling this function. - assert isinstance(self._os_user, PosixSessionUser) - chown(path, group=self._os_user.group) - stat = path.stat() - chmod(path, stat.st_mode | 0o20) - if path.is_dir(): - for child_path in path.iterdir(): - self._recursive_change_fs_group(child_path) - def update_action(self, action_status: ActionStatus) -> None: """Callback called on every Open Job Description status/progress update and the completion/exit of the current action. @@ -1061,7 +1055,7 @@ def _sync_asset_outputs( ManifestProperties( manifest_properties.root_path, manifest_properties.file_system_location_name, - OperatingSystemFamily.get_os_family(manifest_properties.os_type), + OperatingSystemFamily(manifest_properties.os_type), manifest_properties.input_manifest_path, manifest_properties.input_manifest_hash, manifest_properties.output_relative_directories, diff --git a/test/unit/conftest.py b/test/unit/conftest.py index b619ace2..92f6f601 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -9,8 +9,12 @@ from pytest import FixtureRequest from typing import Generator, Optional -from deadline.job_attachments.models import ManifestProperties, Attachments -from deadline.job_attachments._utils import AssetLoadingMethod, OperatingSystemFamily +from deadline.job_attachments.models import ( + AssetLoadingMethod, + Attachments, + ManifestProperties, + OperatingSystemFamily, +) from openjd.model import SchemaVersion from openjd.sessions import ( Parameter, diff --git a/test/unit/scheduler/test_session_queue.py b/test/unit/scheduler/test_session_queue.py index e3fd8bab..7d920cde 100644 --- a/test/unit/scheduler/test_session_queue.py +++ b/test/unit/scheduler/test_session_queue.py @@ -5,7 +5,7 @@ from unittest.mock import MagicMock, Mock, patch from collections import OrderedDict -from deadline.job_attachments._utils import AssetLoadingMethod +from deadline.job_attachments.models import AssetLoadingMethod from openjd.model import SchemaVersion, UnsupportedSchema from openjd.model.v2023_09 import ( Environment, diff --git a/test/unit/sessions/test_job_attachment_details.py b/test/unit/sessions/test_job_attachment_details.py index d3518bfe..3fdb024f 100644 --- a/test/unit/sessions/test_job_attachment_details.py +++ b/test/unit/sessions/test_job_attachment_details.py @@ -2,7 +2,7 @@ import pytest -from deadline.job_attachments._utils import AssetLoadingMethod +from deadline.job_attachments.models import AssetLoadingMethod from deadline_worker_agent.sessions.job_entities.job_attachment_details import JobAttachmentDetails diff --git a/test/unit/sessions/test_job_entities.py b/test/unit/sessions/test_job_entities.py index 485c42f1..04b37297 100644 --- a/test/unit/sessions/test_job_entities.py +++ b/test/unit/sessions/test_job_entities.py @@ -4,7 +4,7 @@ from typing import Generator from unittest.mock import MagicMock, patch -from deadline.job_attachments._utils import AssetLoadingMethod +from deadline.job_attachments.models import AssetLoadingMethod from openjd.model import SchemaVersion from openjd.model.v2023_09 import ( Action, diff --git a/test/unit/sessions/test_session.py b/test/unit/sessions/test_session.py index 67c1b9a0..e270ce6f 100644 --- a/test/unit/sessions/test_session.py +++ b/test/unit/sessions/test_session.py @@ -43,8 +43,11 @@ JobDetails, StepDetails, ) -from deadline.job_attachments.models import Attachments -from deadline.job_attachments._utils import AssetLoadingMethod +from deadline.job_attachments.models import ( + Attachments, + AssetLoadingMethod, + PosixFileSystemPermissionSettings, +) import deadline_worker_agent.sessions.session as session_mod @@ -494,27 +497,31 @@ def test_asset_loading_method( mock_sync_inputs: MagicMock = mock_asset_sync.sync_inputs mock_sync_inputs.return_value = ({}, {}) cancel = Event() - with (patch.object(session, "_recursive_change_fs_group", return_value=()),): - # WHEN - session.sync_asset_inputs( - cancel=cancel, - job_attachment_details=job_attachment_details, - ) + # WHEN + session.sync_asset_inputs( + cancel=cancel, + job_attachment_details=job_attachment_details, + ) - # THEN - mock_sync_inputs.assert_called_with( - s3_settings=ANY, - queue_id=ANY, - job_id=ANY, - session_dir=ANY, - attachments=Attachments( - manifests=ANY, - assetLoadingMethod=asset_loading_method, - ), - storage_profiles_path_mapping_rules={}, - step_dependencies=None, - on_downloading_files=ANY, - ) + # THEN + mock_sync_inputs.assert_called_with( + s3_settings=ANY, + queue_id=ANY, + job_id=ANY, + session_dir=ANY, + attachments=Attachments( + manifests=ANY, + assetLoadingMethod=asset_loading_method, + ), + fs_permission_settings=PosixFileSystemPermissionSettings( + os_group="some-group", + dir_mode=0o20, + file_mode=0o20, + ), + storage_profiles_path_mapping_rules={}, + step_dependencies=None, + on_downloading_files=ANY, + ) @pytest.mark.parametrize( "sync_asset_inputs_args_sequence, expected_error", @@ -568,17 +575,16 @@ def test_sync_asset_inputs( mock_sync_inputs.return_value = ({}, {}) cancel = Event() - with (patch.object(session, "_recursive_change_fs_group", return_value=()),): - for args in sync_asset_inputs_args_sequence: - if expected_error: - with pytest.raises(RuntimeError) as raise_ctx: - session.sync_asset_inputs(cancel=cancel, **args) # type: ignore[arg-type] - assert ( - raise_ctx.value.args[0] - == "Job attachments must be synchronized before downloading Step dependencies." - ) - else: + for args in sync_asset_inputs_args_sequence: + if expected_error: + with pytest.raises(RuntimeError) as raise_ctx: session.sync_asset_inputs(cancel=cancel, **args) # type: ignore[arg-type] + assert ( + raise_ctx.value.args[0] + == "Job attachments must be synchronized before downloading Step dependencies." + ) + else: + session.sync_asset_inputs(cancel=cancel, **args) # type: ignore[arg-type] def test_job_attachments_path_mapping_rules_compatibility( self, @@ -617,10 +623,9 @@ def test_job_attachments_path_mapping_rules_compatibility( ) } - with (patch.object(session, "_recursive_change_fs_group", return_value=()),): - # WHEN / THEN - session.sync_asset_inputs(cancel=cancel, **sync_asset_inputs_args) # type: ignore[arg-type] - # No errors on generating path mapping rules - success! + # WHEN / THEN + session.sync_asset_inputs(cancel=cancel, **sync_asset_inputs_args) # type: ignore[arg-type] + # No errors on generating path mapping rules - success! class TestSessionInnerRun: