diff --git a/hatch.toml b/hatch.toml index 0d114924..de700728 100644 --- a/hatch.toml +++ b/hatch.toml @@ -13,6 +13,7 @@ version = "hatch version" metadata = "hatch project metadata {args:}" linux-e2e-test = "pytest --no-cov test/e2e/linux {args}" windows-e2e-test= "pytest --no-cov test/e2e/windows {args:}" +cross-os-e2e-test = "pytest --no-cov test/e2e/cross_os {args}" windows-integ-test = "pytest --no-cov test/integ/installer {args:}" typing = "mypy {args:src test}" style = [ diff --git a/pipeline/e2e.sh b/pipeline/e2e.sh index 5c6329cc..31cc00f1 100755 --- a/pipeline/e2e.sh +++ b/pipeline/e2e.sh @@ -24,4 +24,6 @@ fi if [ "$OPERATING_SYSTEM" = "windows" ] then hatch run windows-e2e-test -fi \ No newline at end of file +fi + +hatch run cross-os-e2e-test \ No newline at end of file diff --git a/requirements-testing.txt b/requirements-testing.txt index ebe05a5d..2b9f3527 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -1,3 +1,4 @@ +backoff == 2.2.* coverage[toml] ~= 7.5 coverage-conditional-plugin == 0.9.* deadline-cloud-test-fixtures == 0.12.* diff --git a/test/e2e/cross_os/test_job_submissions.py b/test/e2e/cross_os/test_job_submissions.py new file mode 100644 index 00000000..3bcd10dc --- /dev/null +++ b/test/e2e/cross_os/test_job_submissions.py @@ -0,0 +1,372 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +""" +This test module contains tests that verify the Worker agent's behavior by submitting jobs to the +Deadline Cloud service and checking that the result/output of the jobs is as we expect it. +""" +from typing import Any, Dict, List +import pytest +import logging +from deadline_test_fixtures import Job, DeadlineClient, TaskStatus +from utils import get_operating_system_name +import backoff +import boto3 +import botocore.client +import botocore.config +import botocore.exceptions + +LOG = logging.getLogger(__name__) + + +@pytest.mark.usefixtures("worker") +@pytest.mark.parametrize("operating_system", [get_operating_system_name()], indirect=True) +class TestJobSubmission: + @pytest.mark.parametrize( + "run_actions,environment_actions, expected_failed_action", + [ + ( + { + "onRun": { + "command": "noneexistentcommand", # This will fail + }, + }, + { + "onEnter": { + "command": "whoami", + }, + }, + "taskRun", + ), + ( + { + "onRun": { + "command": "whoami", + }, + }, + { + "onEnter": { + "command": "noneexistentcommand", # This will fail + }, + }, + "envEnter", + ), + ( + { + "onRun": { + "command": "whoami", + }, + }, + { + "onEnter": { + "command": "whoami", + }, + "onExit": { + "command": "noneexistentcommand", # This will fail + }, + }, + "envExit", + ), + ], + ) + def test_job_reports_failed_session_action( + self, + deadline_resources, + deadline_client: DeadlineClient, + run_actions: Dict[str, Any], + environment_actions: Dict[str, Any], + expected_failed_action: str, + ) -> None: + + job = Job.submit( + client=deadline_client, + farm=deadline_resources.farm, + queue=deadline_resources.queue_a, + priority=98, + template={ + "specificationVersion": "jobtemplate-2023-09", + "name": f"jobactionfail-{expected_failed_action}", + "steps": [ + { + "name": "Step0", + "script": {"actions": run_actions}, + }, + ], + "jobEnvironments": [ + {"name": "badenvironment", "script": {"actions": environment_actions}} + ], + }, + ) + # THEN + job.wait_until_complete(client=deadline_client) + + # Retrieve job output and verify that the expected session action has failed + + sessions = deadline_client.list_sessions( + farmId=job.farm.id, queueId=job.queue.id, jobId=job.id + ).get("sessions") + found_failed_session_action: bool = False + for session in sessions: + session_actions = deadline_client.list_session_actions( + farmId=job.farm.id, + queueId=job.queue.id, + jobId=job.id, + sessionId=session["sessionId"], + ).get("sessionActions") + + for session_action in session_actions: + # Session action should be failed IFF it's the expected action to fail + if expected_failed_action in session_action["definition"]: + found_failed_session_action = True + assert session_action["status"] == "FAILED" + else: + assert session_action["status"] != "FAILED" + assert found_failed_session_action + + @pytest.mark.parametrize( + "run_actions,environment_actions,expected_canceled_action", + [ + ( + { + "onRun": { + "command": ( + "/bin/sleep" if get_operating_system_name() == "linux" else "timeout" + ), + "args": ["40"], + "cancelation": { + "mode": "NOTIFY_THEN_TERMINATE", + "notifyPeriodInSeconds": 1, + }, + }, + }, + { + "onEnter": { + "command": "whoami", + }, + }, + "taskRun", + ), + ( + { + "onRun": { + "command": "whoami", + }, + }, + { + "onEnter": { + "command": ( + "/bin/sleep" if get_operating_system_name() == "linux" else "timeout" + ), + "args": ["40"], + "cancelation": { + "mode": "NOTIFY_THEN_TERMINATE", + "notifyPeriodInSeconds": 1, + }, + }, + }, + "envEnter", + ), + ], + ) + def test_job_reports_canceled_session_action( + self, + deadline_resources, + deadline_client: DeadlineClient, + run_actions: Dict[str, Any], + environment_actions: Dict[str, Any], + expected_canceled_action: str, + ) -> None: + job = Job.submit( + client=deadline_client, + farm=deadline_resources.farm, + queue=deadline_resources.queue_a, + priority=98, + template={ + "specificationVersion": "jobtemplate-2023-09", + "name": f"jobactioncancel-{expected_canceled_action}", + "steps": [ + { + "name": "Step0", + "script": { + "actions": run_actions, + }, + }, + ], + "jobEnvironments": [ + { + "name": "environment", + "script": { + "actions": environment_actions, + }, + } + ], + }, + ) + + @backoff.on_predicate( + wait_gen=backoff.constant, + max_time=120, + interval=10, + ) + def is_job_started(current_job: Job) -> bool: + current_job.refresh_job_info(client=deadline_client) + LOG.info(f"Waiting for job {current_job.id} to be created") + return current_job.lifecycle_status != "CREATE_IN_PROGRESS" + + assert is_job_started(job) + + @backoff.on_predicate( + wait_gen=backoff.constant, + max_time=120, + interval=10, + ) + def sessions_exist(current_job: Job) -> bool: + sessions = deadline_client.list_sessions( + farmId=current_job.farm.id, queueId=current_job.queue.id, jobId=current_job.id + ).get("sessions") + + return len(sessions) > 0 + + assert sessions_exist(job) + + deadline_client.update_job( + farmId=job.farm.id, queueId=job.queue.id, jobId=job.id, targetTaskRunStatus="CANCELED" + ) + + # THEN + + # Wait until the job is canceled or completed + job.wait_until_complete(client=deadline_client) + + LOG.info(f"Job result: {job}") + + @backoff.on_predicate( + wait_gen=backoff.constant, + max_time=120, + interval=10, + ) + def is_expected_session_action_canceled(sessions: List[Dict[str, Any]]) -> bool: + found_canceled_session_action: bool = False + for session in sessions: + session_actions = deadline_client.list_session_actions( + farmId=job.farm.id, + queueId=job.queue.id, + jobId=job.id, + sessionId=session["sessionId"], + ).get("sessionActions") + + LOG.info(f"Session Actions: {session_actions}") + for session_action in session_actions: + + # Session action should be canceled if it's the action we expect to be canceled + if expected_canceled_action in session_action["definition"]: + if session_action["status"] == "CANCELED": + found_canceled_session_action = True + else: + assert ( + session_action["status"] != "CANCELED" + ) # This should not happen at all, so we fast exit + return found_canceled_session_action + + sessions = deadline_client.list_sessions( + farmId=job.farm.id, queueId=job.queue.id, jobId=job.id + ).get("sessions") + assert is_expected_session_action_canceled(sessions) + + @pytest.mark.parametrize( + "job_environments", + [ + ([]), + ( + [ + { + "name": "environment_1", + "script": { + "actions": { + "onEnter": {"command": "echo", "args": ["Hello!"]}, + }, + }, + }, + ] + ), + ( + [ + { + "name": "environment_1", + "script": { + "actions": { + "onEnter": {"command": "echo", "args": ["Hello!"]}, + } + }, + }, + { + "name": "environment_2", + "script": { + "actions": { + "onEnter": {"command": "echo", "args": ["Hello!"]}, + } + }, + }, + { + "name": "environment_3", + "script": { + "actions": { + "onEnter": {"command": "echo", "args": ["Hello!"]}, + } + }, + }, + ] + ), + ], + ) + def test_worker_run_with_number_of_environments( + self, + deadline_resources, + deadline_client: DeadlineClient, + job_environments: List[Dict[str, Any]], + ) -> None: + job_template = { + "specificationVersion": "jobtemplate-2023-09", + "name": f"jobWithNumberOfEnvironments-{len(job_environments)}", + "steps": [ + { + "name": "Step0", + "script": { + "actions": { + "onRun": { + "command": "whoami", + }, + }, + }, + }, + ], + } + + if len(job_environments) > 0: + job_template["jobEnvironments"] = job_environments + job = Job.submit( + client=deadline_client, + farm=deadline_resources.farm, + queue=deadline_resources.queue_a, + priority=98, + template=job_template, + ) + + job.wait_until_complete(client=deadline_client) + + # Retrieve job output and verify whoami printed the queue's jobsRunAsUser + job_logs = job.get_logs( + deadline_client=deadline_client, + logs_client=boto3.client( + "logs", + config=botocore.config.Config(retries={"max_attempts": 10, "mode": "adaptive"}), + ), + ) + + full_log = "\n".join( + [le.message for _, log_events in job_logs.logs.items() for le in log_events] + ) + + assert full_log.count("Hello!") == len( + job_environments + ), "Expected number of Hello statements not found in job logs." + + assert job.task_run_status == TaskStatus.SUCCEEDED diff --git a/test/e2e/cross_os/utils.py b/test/e2e/cross_os/utils.py new file mode 100644 index 00000000..4a96e396 --- /dev/null +++ b/test/e2e/cross_os/utils.py @@ -0,0 +1,9 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +import sys + + +def get_operating_system_name() -> str: + if sys.platform == "win32": + return "windows" + else: + return "linux" diff --git a/test/e2e/linux/test_job_submissions.py b/test/e2e/linux/test_job_submissions.py index bc0593cd..65f586dc 100644 --- a/test/e2e/linux/test_job_submissions.py +++ b/test/e2e/linux/test_job_submissions.py @@ -3,8 +3,6 @@ This test module contains tests that verify the Worker agent's behavior by submitting jobs to the Deadline Cloud service and checking that the result/output of the jobs is as we expect it. """ - -from typing import Any, Dict import boto3 import botocore.client import botocore.config @@ -111,108 +109,3 @@ def test_job_run_as_user( f"I am: {job_run_as_user.user}" in full_log ), f"Expected message not found in Job logs. Logs are in CloudWatch log group: {job_logs.log_group_name}" assert job.task_run_status == TaskStatus.SUCCEEDED - - @pytest.mark.parametrize( - "run_actions,environment_actions, expected_failed_action", - [ - ( - { - "onRun": { - "command": "/bin/false", - "args": ["1"], - }, - }, - { - "onEnter": { - "command": "/bin/true", - }, - }, - "taskRun", - ), - ( - { - "onRun": { - "command": "/bin/sleep", - "args": ["1"], - }, - }, - { - "onEnter": { - "command": "/bin/false", - }, - }, - "envEnter", - ), - ( - { - "onRun": { - "command": "/bin/sleep", - "args": ["1"], - }, - }, - { - "onEnter": { - "command": "/bin/true", - }, - "onExit": { - "command": "/bin/false", - }, - }, - "envExit", - ), - ], - ) - def test_job_reports_failed_session_action( - self, - deadline_resources, - deadline_client: DeadlineClient, - run_actions: Dict[str, Any], - environment_actions: Dict[str, Any], - expected_failed_action: str, - ) -> None: - - job = Job.submit( - client=deadline_client, - farm=deadline_resources.farm, - queue=deadline_resources.queue_a, - priority=98, - template={ - "specificationVersion": "jobtemplate-2023-09", - "name": "environmentactionfail", - "steps": [ - { - "name": "Step0", - "script": {"actions": run_actions}, - }, - ], - "jobEnvironments": [ - {"name": "badenvironment", "script": {"actions": environment_actions}} - ], - }, - ) - # THEN - job.wait_until_complete(client=deadline_client) - - # Retrieve job output and verify that the expected session action has failed - - sessions = deadline_client.list_sessions( - farmId=job.farm.id, queueId=job.queue.id, jobId=job.id - ).get("sessions") - found_failed_session_action: bool = False - for session in sessions: - session_actions = deadline_client.list_session_actions( - farmId=job.farm.id, - queueId=job.queue.id, - jobId=job.id, - sessionId=session["sessionId"], - ).get("sessionActions") - - for session_action in session_actions: - - # Session action should be failed IFF it's the expected action to fail - if expected_failed_action in session_action["definition"]: - found_failed_session_action = True - assert session_action["status"] == "FAILED" - else: - assert session_action["status"] != "FAILED" - assert found_failed_session_action