diff --git a/test/e2e/test_job_submissions.py b/test/e2e/test_job_submissions.py index cc53d94b..7bd66f46 100644 --- a/test/e2e/test_job_submissions.py +++ b/test/e2e/test_job_submissions.py @@ -158,7 +158,7 @@ def is_expected_session_action_failed(sessions: List[Dict[str, Any]]) -> bool: sessionId=session["sessionId"], ).get("sessionActions") - logging.info(f"Session actions: {session_actions}") + LOG.info(f"Session actions: {session_actions}") for session_action in session_actions: # Session action should be failed IFF it's the expected action to fail if expected_failed_action in session_action["definition"]: @@ -245,7 +245,7 @@ def test_worker_fails_session_action_timeout( sessionId=session["sessionId"], ).get("sessionActions") - logging.info(f"Session Actions: {session_actions}") + LOG.info(f"Session Actions: {session_actions}") for session_action in session_actions: # taskRun session action should be failed if "taskRun" in session_action["definition"]: @@ -405,6 +405,9 @@ def sessions_exist(current_job: Job) -> bool: LOG.info(f"Job result: {job}") + # Wait until the envExit runs as well + time.sleep(10) + @backoff.on_predicate( wait_gen=backoff.constant, max_time=120, @@ -427,10 +430,15 @@ def is_expected_session_action_canceled(sessions: List[Dict[str, Any]]) -> bool: if expected_canceled_action in session_action["definition"]: if session_action["status"] == "CANCELED": found_canceled_session_action = True + elif "envExit" in session_action["definition"]: + # envExit should always run no matter what + assert session_action["status"] == "SUCCEEDED" else: - assert ( - session_action["status"] != "CANCELED" - ) # This should not happen at all, so we fast exit + if expected_canceled_action == "envEnter": + # If we canceled the envEnter, everything else should have been NEVER_ATTEMPTED + assert session_action["status"] == "NEVER_ATTEMPTED" + else: + assert session_action["status"] == "SUCCEEDED" return found_canceled_session_action sessions: list[dict[str, Any]] = deadline_client.list_sessions( @@ -784,7 +792,7 @@ def sync_input_action_started(current_job: Job) -> bool: jobId=job.id, sessionId=session["sessionId"], ).get("sessionActions") - logging.info(f"Session actions: {session_actions}") + LOG.info(f"Session actions: {session_actions}") for session_action in session_actions: if "syncInputJobAttachments" in session_action["definition"]: if session_action["status"] in ["ASSIGNED", "RUNNING"]: @@ -818,7 +826,7 @@ def sync_input_actions_are_canceled(sessions: List[Dict[str, Any]]) -> bool: jobId=job.id, sessionId=session["sessionId"], ).get("sessionActions") - logging.info(f"Session actions: {session_actions}") + LOG.info(f"Session actions: {session_actions}") for session_action in session_actions: # Session action should be canceled if it's the action we expect to be canceled if "syncInputJobAttachments" in session_action["definition"]: @@ -837,6 +845,221 @@ def sync_input_actions_are_canceled(sessions: List[Dict[str, Any]]) -> bool: assert sync_input_actions_are_canceled(sessions) + def test_worker_reports_never_attempted_tasks_if_task_is_canceled( + self, + deadline_resources: DeadlineResources, + deadline_client: DeadlineClient, + session_worker: EC2InstanceWorker, + ) -> None: + + # Tests that if a taskRun action is cancelled, all remaining taskRun actions that depend on it will be NEVER_ATTEMPTED + + step_one_name = "StepOneSucceeded" + step_two_name = "StepTwoToCancel" + step_three_name = "StepThreeNeverAttempted" + job: Job = Job.submit( + client=deadline_client, + farm=deadline_resources.farm, + queue=deadline_resources.queue_a, + priority=98, + template={ + "specificationVersion": "jobtemplate-2023-09", + "name": "TestSecondTaskRunCancelled", + "jobEnvironments": [ + { + "name": "WhoAmiJobEnvironment", + "script": { + "actions": { + "onEnter": ({"command": "whoami"}), + "onExit": ({"command": "whoami"}), + }, + }, + }, + ], + "steps": [ + { + "name": step_one_name, + "hostRequirements": { + "attributes": [ + { + "name": "attr.worker.os.family", + "allOf": [os.environ["OPERATING_SYSTEM"]], + } + ] + }, + "script": { + "actions": { + "onRun": { + "command": ( + "/bin/sleep" + if os.environ["OPERATING_SYSTEM"] == "linux" + else "powershell" + ), + "args": ( + ["1"] + if os.environ["OPERATING_SYSTEM"] == "linux" + else ["ping", "localhost", "-n", "1"] + ), + }, + } + }, + }, + { + "name": step_two_name, + "hostRequirements": { + "attributes": [ + { + "name": "attr.worker.os.family", + "allOf": [os.environ["OPERATING_SYSTEM"]], + } + ] + }, + "dependencies": [{"dependsOn": step_one_name}], + "script": { + "actions": { + "onRun": { + "command": ( + "/bin/sleep" + if os.environ["OPERATING_SYSTEM"] == "linux" + else "powershell" + ), + "args": ( + ["120"] + if os.environ["OPERATING_SYSTEM"] == "linux" + else ["ping", "localhost", "-n", "120"] + ), + "cancelation": { + "mode": "NOTIFY_THEN_TERMINATE", + "notifyPeriodInSeconds": 1, + }, + }, + } + }, + }, + { + "name": step_three_name, + "hostRequirements": { + "attributes": [ + { + "name": "attr.worker.os.family", + "allOf": [os.environ["OPERATING_SYSTEM"]], + } + ] + }, + "dependencies": [{"dependsOn": step_two_name}], + "script": { + "actions": { + "onRun": {"command": "whoami"}, + } + }, + }, + ], + }, + ) + + # Wait for the job to start + + @backoff.on_predicate( + wait_gen=backoff.constant, + max_time=120, + interval=10, + ) + def is_job_started_with_sessions(current_job: Job) -> bool: + current_job.refresh_job_info(client=deadline_client) + LOG.info(f"Waiting for job {current_job.id} to be created") + if current_job.lifecycle_status == "CREATE_IN_PROGRESS": + return False + sessions: list[dict[str, Any]] = deadline_client.list_sessions( + farmId=job.farm.id, queueId=job.queue.id, jobId=job.id + ).get("sessions") + if sessions and len(sessions) > 0: + return True + return False + + assert is_job_started_with_sessions(job) + + # Wait some time for the second step (which sleeps for 2 minutes) to start + time.sleep(20) + + # Find both the SUCCEEDED and RUNNING session action IDs + + @backoff.on_exception( + backoff.constant, + Exception, + max_time=60, + interval=2, + ) + def find_succeeded_and_running_actions() -> tuple[str, str]: + found_succeeded_action_id: Optional[str] = None + found_running_action_id: Optional[str] = None + + sessions: list[dict[str, Any]] = deadline_client.list_sessions( + farmId=job.farm.id, queueId=job.queue.id, jobId=job.id + ).get("sessions") + + for session in sessions: + session_actions: list[dict[str, Any]] = deadline_client.list_session_actions( + farmId=job.farm.id, + queueId=job.queue.id, + jobId=job.id, + sessionId=session["sessionId"], + ).get("sessionActions") + for session_action in session_actions: + definition: dict[str, Any] = session_action["definition"] + if "taskRun" in definition: + if session_action["status"] == "SUCCEEDED": + found_succeeded_action_id = session_action["sessionActionId"] + elif session_action["status"] == "RUNNING": + found_running_action_id = session_action["sessionActionId"] + + assert found_succeeded_action_id is not None + assert found_running_action_id is not None + + return found_succeeded_action_id, found_running_action_id + + succeeded_action_id, running_action_id = find_succeeded_and_running_actions() + deadline_client.update_job( + farmId=job.farm.id, + queueId=job.queue.id, + jobId=job.id, + targetTaskRunStatus="CANCELED", + ) + + # Wait for the job to be canceled + + job.wait_until_complete(client=deadline_client) + + sessions: list[dict[str, Any]] = deadline_client.list_sessions( + farmId=job.farm.id, queueId=job.queue.id, jobId=job.id + ).get("sessions") + for session in sessions: + session_actions: list[dict[str, Any]] = deadline_client.list_session_actions( + farmId=job.farm.id, + queueId=job.queue.id, + jobId=job.id, + sessionId=session["sessionId"], + ).get("sessionActions") + for session_action in session_actions: + definition: dict[str, Any] = session_action["definition"] + if ( + "envEnter" in definition + or "envExit" in definition + or ( + "taskRun" in definition + and succeeded_action_id == session_action["sessionActionId"] + ) + ): + assert session_action["status"] == "SUCCEEDED" + elif ( + "taskRun" in definition + and running_action_id == session_action["sessionActionId"] + ): + # The action that was running for a long time should now be CANCELED! + assert session_action["status"] == "CANCELED" + else: + # Every other action should be in NEVER_ATTEMPTED status + assert session_action["status"] == "NEVER_ATTEMPTED" + def test_worker_always_runs_env_exit_despite_failure( self, deadline_resources: DeadlineResources, @@ -924,7 +1147,7 @@ def check_environment_action_statuses_are_expected() -> None: jobId=job.id, sessionId=session["sessionId"], ).get("sessionActions") - logging.info(f"Session actions: {session_actions}") + LOG.info(f"Session actions: {session_actions}") for session_action in session_actions: definition = session_action["definition"] if "envEnter" in definition: @@ -2186,7 +2409,7 @@ def test_worker_fails_job_attachment_sync_when_file_does_not_exist_in_bucket( sessionId=session["sessionId"], ).get("sessionActions") - logging.info(f"Session actions: {session_actions}") + LOG.info(f"Session actions: {session_actions}") for session_action in session_actions: # Session action should be failed for a syncinputJobAttachments action if "syncInputJobAttachments" in session_action["definition"]: