Skip to content

Commit

Permalink
feat: update to the new job schema
Browse files Browse the repository at this point in the history
Problem:
 The job template schema is being updated for service release. With it,
the implementation library is being updated in a breaking way and so the agent
code needs to be updated to that revision.

 Furthermore, this needs to be a phased update. The agent needs to
support both old and new revisions at the same time. Fortunately, there
is nothing changing from the 2022-09-01 revision that is relevant to
the agent except for the version string.

Solution:
 Updated the dependency of the template implementation library to the
relevant version. Also allow the agent to accept both the new and old
revision version string from the service, but treat both values as being
the new revision's version string.

 Some TODOs have been left in the code as markers for the changes that
need to happen in the second phase of the update.

Signed-off-by: Daniel Neilson <[email protected]>
  • Loading branch information
Daniel Neilson authored and mwiebe committed Sep 6, 2023
1 parent 20c3b72 commit 9aafbbe
Show file tree
Hide file tree
Showing 50 changed files with 292 additions and 241 deletions.
2 changes: 1 addition & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ This contains an impementation of the Worker Agent's scheduler. This works with

### `src/deadline_worker_agent/sessions`

This contains the logic and APIs for managing the life-cycle of a Worker session. The primary class contained in this package, the `Session` class, is responsible for taking actions from the `SessionActionQueue` and running them within the OpenJobIO session.
This contains the logic and APIs for managing the life-cycle of a Worker session. The primary class contained in this package, the `Session` class, is responsible for taking actions from the `SessionActionQueue` and running them within the Open Job Description session.

### `src/deadline_worker_agent/sessions/actions`

Expand Down
2 changes: 1 addition & 1 deletion docs/worker_api_contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Workflow before proceeding.
that the Worker cannot start the Session Action if either:
* The Session Action is a type that is not understood by the Worker Agent (e.g. a new type
added in the future);
* The Session Action uses an OpenJobIO version that the Worker Agent doesn't understand; or
* The Session Action uses an Open Job Description version that the Worker Agent doesn't understand; or
* Otherwise unable to be run by the Worker Agent (e.g. compatibility reasons, or failures
in a `AssumeQueueRoleForWorker` request).
4. Any other updates as dictated by a Worker-Initiated Drain workflow.
Expand Down
8 changes: 2 additions & 6 deletions hatch_version_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,15 @@ class CustomBuildHook(BuildHookInterface):
"_version.py",
]
destinations = [
"src/openjobio",
"src/openjobio_adaptor_runtime",
"src/openjobio_adaptor_runtime_client",
"src/deadline-cloud-worker-agent",
]
[[tool.hatch.build.hooks.custom.copy_map]]
sources = [
"something_the_tests_need.py",
"something_else_the_tests_need.ini",
]
destinations = [
"test/openjobio",
"test/openjobio_adaptor_runtime",
"test/openjobio_adaptor_runtime_client",
"test/deadline-cloud-worker-agent",
]
```
"""
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ dependencies = [
"requests ~= 2.29",
"boto3 ~= 1.26",
"deadline == 0.18.*",
"openjobio == 0.8.*",
"openjd == 0.10.*",
# tomli became tomllib in standard library in Python 3.11
"tomli >= 1.1.0 ; python_version<'3.11'",
"typing_extensions ~= 4.5",
Expand Down Expand Up @@ -106,7 +106,7 @@ line-length = 100
known-first-party = [
"deadline_worker_agent",
"deadline",
"openjobio",
"openjd",
]

[tool.black]
Expand Down
13 changes: 9 additions & 4 deletions scripts/submit_jobs/asset_example/template.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
{
"specificationVersion": "2022-09-01",
"specificationVersion": "jobtemplate-2023-09",
"name": "AssetsExample",
"parameters": [
"parameterDefinitions": [
{
"name": "DataDir",
"type": "PATH"
"type": "PATH",
"dataFlow": "INOUT",
"userInterface": {
"label": "Input/Output Directory",
"control": "CHOOSE_DIRECTORY"
}
}
],
"steps": [
Expand All @@ -25,7 +30,7 @@
}
]
},
"environments": [
"stepEnvironments": [
{
"name": "myenv",
"script": {
Expand Down
8 changes: 0 additions & 8 deletions scripts/submit_jobs/asset_example/template_uihint.yaml

This file was deleted.

8 changes: 4 additions & 4 deletions scripts/submit_jobs/sleep/sleep_job.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"specificationVersion": "2022-09-01",
"specificationVersion": "jobtemplate-2023-09",
"name": "Longsleep",
"parameters": [
"parameterDefinitions": [
{
"name": "duration",
"type": "INT",
Expand All @@ -25,11 +25,11 @@
"name": "runScript",
"type": "TEXT",
"runnable": true,
"data": "#!/usr/bin/env python3\n\nimport signal\nimport sys\nimport time\n\nprint(sys.argv)\nif len(sys.argv) < 2:\n print(\"ERROR: Expected arg for number of seconds\")\n sys.exit(1)\nelif len(sys.argv) > 2:\n print(\"ERROR: Unexpected number of arguments\")\n sys.exit(1)\n\n\ntry:\n seconds = int(sys.argv[1])\nexcept Exception as e:\n print(f'ERROR: could not parse number from \"{sys.argv[1]}\"')\n sys.exit(1)\n\nif seconds <= 0:\n print(\"ERROR: Invalid \")\n\n\ndef signal_handler(sig_num, frame):\n print(f\"Trapped signal {sig_num}\")\n sys.stdout.flush()\n\n if sig_num in (signal.SIGINT, signal.SIGTERM):\n print(\"CANCELLED\")\n sys.stdout.flush()\n sys.exit(1)\n\nif sys.platform.startswith(\"win\"):\n signal.signal(signal.SIGINT, signal_handler)\nelse:\n signal.signal(signal.SIGTERM, signal_handler)\n\n\nprogress_inc = 100 / float(seconds)\nprogress = 0.0\n\nprint(f\"Waiting {seconds}...\")\n\nfor i in range(seconds):\n time.sleep(1)\n progress += progress_inc\n print(f\"openjobio_progress: {progress}\")\n sys.stdout.flush()\n\nprint(\"done.\")\n"
"data": "#!/usr/bin/env python3\n\nimport signal\nimport sys\nimport time\n\nprint(sys.argv)\nif len(sys.argv) < 2:\n print(\"ERROR: Expected arg for number of seconds\")\n sys.exit(1)\nelif len(sys.argv) > 2:\n print(\"ERROR: Unexpected number of arguments\")\n sys.exit(1)\n\n\ntry:\n seconds = int(sys.argv[1])\nexcept Exception as e:\n print(f'ERROR: could not parse number from \"{sys.argv[1]}\"')\n sys.exit(1)\n\nif seconds <= 0:\n print(\"ERROR: Invalid \")\n\n\ndef signal_handler(sig_num, frame):\n print(f\"Trapped signal {sig_num}\")\n sys.stdout.flush()\n\n if sig_num in (signal.SIGINT, signal.SIGTERM):\n print(\"CANCELLED\")\n sys.stdout.flush()\n sys.exit(1)\n\nif sys.platform.startswith(\"win\"):\n signal.signal(signal.SIGINT, signal_handler)\nelse:\n signal.signal(signal.SIGTERM, signal_handler)\n\n\nprogress_inc = 100 / float(seconds)\nprogress = 0.0\n\nprint(f\"Waiting {seconds}...\")\n\nfor i in range(seconds):\n time.sleep(1)\n progress += progress_inc\n print(f\"openjd_progress: {progress}\")\n sys.stdout.flush()\n\nprint(\"done.\")\n"
}
]
},
"environments": [
"stepEnvironments": [
{
"name": "myenv",
"script": {
Expand Down
6 changes: 3 additions & 3 deletions src/deadline_worker_agent/api_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class StepDetailsIdentifier(TypedDict):

class StepDetailsData(StepDetailsIdentifierFields):
schemaVersion: str
"""The OpenJobIO schema version that corresponds to the template"""
"""The Open Job Description schema version that corresponds to the template"""

template: dict[str, Any]
"""The template of the step"""
Expand Down Expand Up @@ -260,7 +260,7 @@ class JobDetailsData(JobDetailsIdentifierFields):
"""The name of the CloudWatch Log Group containing the Worker session's Log Stream"""

schemaVersion: str
"""The OpenJobIO job template schema version"""
"""The Open Job Description job template schema version"""

parameters: NotRequired[
dict[str, StringParameter | PathParameter | IntParameter | FloatParameter | str]
Expand Down Expand Up @@ -297,7 +297,7 @@ class EnvironmentDetailsIdentifier(TypedDict):

class EnvironmentDetailsData(EnvironmentDetailsIdentifierFields):
schemaVersion: str
"""The OpenJobIO schema version"""
"""The Open Job Description schema version"""
template: dict[str, Any]
"""The template of the environment."""

Expand Down
2 changes: 1 addition & 1 deletion src/deadline_worker_agent/aws_credentials/aws_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from configparser import ConfigParser
from pathlib import Path
from typing import Optional
from openjobio.sessions import PosixSessionUser, SessionUser
from openjd.sessions import PosixSessionUser, SessionUser
from subprocess import run, DEVNULL, PIPE, STDOUT

__all__ = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from threading import Event

from botocore.utils import JSONFileCache
from openjobio.sessions import PosixSessionUser, SessionUser
from openjd.sessions import PosixSessionUser, SessionUser

from ..boto_mock import DeadlineClient

Expand All @@ -31,9 +31,9 @@

class QueueBoto3Session(BaseBoto3Session):
"""A Boto3 Session that contains Queue Role AWS Credentials for use by:
1. Any service Session Action run within an OpenJobIO Session; and
2. The Worker when performing actions on behalf of a service Session Action for an OpenJobIO
Session.
1. Any service Session Action run within an Open Job Description Session; and
2. The Worker when performing actions on behalf of a service Session Action for
an Open Job Description Session.
When created, this Session:
1. Installs an AWS Credentials Process in the ~/.aws of the given os_user, or the current user if
Expand Down
6 changes: 3 additions & 3 deletions src/deadline_worker_agent/boto_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def batch_get_job_entity(
{
"jobDetails": {
"jobId": "job-21432d89b44a46cbaaeb2f1d5254e548",
"schemaVersion": "2022-09-01",
"schemaVersion": "jobtemplate-2023-09",
"jobAttachmentSettings": {
"s3BucketName": "asset-bucket",
"rootPrefix": "my-queue",
Expand Down Expand Up @@ -291,7 +291,7 @@ def batch_get_job_entity(
"stepDetails": {
"jobId": "job-abac",
"stepId": "step-a50bcbf7a86848dabc46480db936b4a7",
"schemaVersion": "2022-09-01",
"schemaVersion": "jobtemplate-2023-09",
"template": {},
"dependencies": [],
},
Expand All @@ -300,7 +300,7 @@ def batch_get_job_entity(
"environmentDetails": {
"environmentId": "env1",
"jobId": "job-21432d89b44a46cbaaeb2f1d5254e548",
"schemaVersion": "2022-09-01",
"schemaVersion": "jobtemplate-2023-09",
"template": {
"name": "foo",
"script": {},
Expand Down
4 changes: 2 additions & 2 deletions src/deadline_worker_agent/installer/worker.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@
#
# <VENDOR>
# An optional namespace that indicates the capabilitiy is specific to a particular
# OpenJobIO-compatable render farm management system.
# Open Job Description-compatable render farm management system.
# <NAME>
# A name for the amount capability
# <VALUE>
Expand Down Expand Up @@ -255,7 +255,7 @@
#
# <VENDOR>
# An optional namespace that indicates the capabilitiy is specific to a particular
# OpenJobIO-compatable render farm component.
# Open Job Description-compatable render farm component.
# <NAME>
# A name for the attribute capability
# <VALUE>
Expand Down
2 changes: 1 addition & 1 deletion src/deadline_worker_agent/log_sync/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@

logger = _getLogger(__name__.rsplit(".", maxsplit=1)[0])

OJIO_ACTION_OUTPUT_LOGGER = _getLogger("openjobio.processing.action_output")
OPENJD_ACTION_OUTPUT_LOGGER = _getLogger("openjd.processing.action_output")
ROOT_LOGGER = _getLogger("root")
12 changes: 6 additions & 6 deletions src/deadline_worker_agent/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import os
import stat

from openjobio.sessions import ActionState, ActionStatus, SessionUser
from openjobio.sessions import LOG as OJIO_SESSION_LOG
from openjobio.sessions import ActionState, ActionStatus
from openjd.sessions import ActionState, ActionStatus, SessionUser
from openjd.sessions import LOG as OPENJD_SESSION_LOG
from openjd.sessions import ActionState, ActionStatus
from deadline.job_attachments.asset_sync import AssetSync
from botocore.exceptions import ClientError

Expand Down Expand Up @@ -88,8 +88,8 @@ class SchedulerSession:
@dataclass(frozen=True)
class QueueAwsCredentials:
"""This holds the AWS Credentials for a particular Queue to use for all actions
performed on behalf of the OpenJobIO Session for Jobs from that Queue. This
includes:
performed on behalf of the Open Job Description Session for Jobs from that Queue.
This includes:
1. all Job Attachments behaviors; and
2. things done by the running SessionActions' subprocesses.
Expand Down Expand Up @@ -656,7 +656,7 @@ def _create_new_sessions(

try:
log_config = LogConfiguration.from_boto(
loggers=[OJIO_SESSION_LOG, JOB_ATTACHMENTS_LOGGER],
loggers=[OPENJD_SESSION_LOG, JOB_ATTACHMENTS_LOGGER],
log_configuration=session_spec["logConfiguration"],
session_log_file=session_log_file,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime
from typing import TYPE_CHECKING

from openjobio.sessions import ActionStatus
from openjd.sessions import ActionStatus

if TYPE_CHECKING:
from ..api_models import CompletedActionStatus
Expand Down
2 changes: 1 addition & 1 deletion src/deadline_worker_agent/scheduler/session_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import subprocess
from threading import Lock

from openjobio.sessions import SessionUser, PosixSessionUser
from openjd.sessions import SessionUser, PosixSessionUser

from .log import LOGGER
from ..sessions import Session
Expand Down
8 changes: 4 additions & 4 deletions src/deadline_worker_agent/scheduler/session_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from threading import Event
from typing import Any, Callable, Iterable, Generic, Literal, TypeVar, TYPE_CHECKING, cast

from openjobio.model import UnsupportedSchema
from openjobio.sessions import ActionState, ActionStatus
from openjd.model import UnsupportedSchema
from openjd.sessions import ActionState, ActionStatus

from ..api_models import (
EnvironmentAction as EnvironmentActionApiModel,
Expand Down Expand Up @@ -209,7 +209,7 @@ def cancel(
end_time=timestamp,
# TODO: This is semantically incorrect, but status.state is a required field. We
# only need this to communicate the message. In the future, we may want to remove
# the "status" field from OJIO here and hoist the fields we care about up to the
# the "status" field from Open Job Description here and hoist the fields we care about up to the
# SessionActionStatus class.
status=ActionStatus(
state=ActionState.FAILED,
Expand Down Expand Up @@ -313,7 +313,7 @@ def dequeue(self) -> SessionActionDefinition | None:
Raises
------
JobEntityUnsupportedSchemaError:
When the details for an OjioAction have a schema that the Worker Agent
When the details for an OpenjdAction have a schema that the Worker Agent
does not support. Allows the action to gracefully report the failure
to the service.
Expand Down
4 changes: 2 additions & 2 deletions src/deadline_worker_agent/sessions/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
from .action_definition import SessionActionDefinition
from .enter_env import EnterEnvironmentAction
from .exit_env import ExitEnvironmentAction
from .ojio_action import OjioAction
from .openjd_action import OpenjdAction
from .run_step_task import RunStepTaskAction
from .sync_input_job_attachments import SyncInputJobAttachmentsAction

__all__ = [
"EnterEnvironmentAction",
"ExitEnvironmentAction",
"OjioAction",
"OpenjdAction",
"RunStepTaskAction",
"SessionActionDefinition",
"SyncInputJobAttachmentsAction",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def human_readable(self) -> str:
and hyphens.
PARAM_NAME and PARAM_VALUE describe the parameters to the action. The allowable values for
are inherited from OpenJobIO step parameter spaces.
are inherited from Open Job Description step parameter spaces.
For example:
Expand Down
8 changes: 4 additions & 4 deletions src/deadline_worker_agent/sessions/actions/enter_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,26 @@
from concurrent.futures import Executor
from typing import Any, TYPE_CHECKING

from openjobio.sessions import EnvironmentIdentifier
from openjd.sessions import EnvironmentIdentifier

from ..job_entities import EnvironmentDetails
from .ojio_action import OjioAction
from .openjd_action import OpenjdAction

if TYPE_CHECKING:
from ...api_models import EnvironmentAction
from ..session import Session
from .action_definition import SessionActionDefinition


class EnterEnvironmentAction(OjioAction):
class EnterEnvironmentAction(OpenjdAction):
"""Action to enter an environment within a Worker session
Parameters
----------
id : str
A unique identifier for the session action
job_env_id : str
A unique identifier for the environment within the OpenJobIO job
A unique identifier for the environment within the Open Job Description job
environment_details : EnvironmentDetails
The environment details
"""
Expand Down
4 changes: 2 additions & 2 deletions src/deadline_worker_agent/sessions/actions/exit_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
from concurrent.futures import Executor
from typing import TYPE_CHECKING, Any

from .ojio_action import OjioAction
from .openjd_action import OpenjdAction

if TYPE_CHECKING:
from ..session import Session


class ExitEnvironmentAction(OjioAction):
class ExitEnvironmentAction(OpenjdAction):
"""Action to exit an environment within a Worker session
Parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from ..errors import CancelationError


class OjioAction(SessionActionDefinition):
"""Common base class for OpenJobIO session actions"""
class OpenjdAction(SessionActionDefinition):
"""Common base class for Open Job Description session actions"""

_current_action_cancel_sent = False

Expand Down
Loading

0 comments on commit 9aafbbe

Please sign in to comment.