Skip to content

Commit

Permalink
test: add test that verifies subsequent tasks are NEVER_ATTEMPTED whe…
Browse files Browse the repository at this point in the history
…n dependent task is canceled

Signed-off-by: Yutong Li <[email protected]>
  • Loading branch information
YutongLi291 committed Oct 28, 2024
1 parent 7b26b38 commit 2e622a4
Showing 1 changed file with 227 additions and 9 deletions.
236 changes: 227 additions & 9 deletions test/e2e/test_job_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def is_expected_session_action_failed(sessions: List[Dict[str, Any]]) -> bool:
sessionId=session["sessionId"],
).get("sessionActions")

logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
# Session action should be failed IFF it's the expected action to fail
if expected_failed_action in session_action["definition"]:
Expand Down Expand Up @@ -246,7 +246,7 @@ def test_worker_fails_session_action_timeout(
sessionId=session["sessionId"],
).get("sessionActions")

logging.info(f"Session Actions: {session_actions}")
LOG.info(f"Session Actions: {session_actions}")
for session_action in session_actions:
# taskRun session action should be failed
if "taskRun" in session_action["definition"]:
Expand Down Expand Up @@ -428,10 +428,16 @@ def is_expected_session_action_canceled(sessions: List[Dict[str, Any]]) -> bool:
if expected_canceled_action in session_action["definition"]:
if session_action["status"] == "CANCELED":
found_canceled_session_action = True
elif "envExit" in session_action["definition"]:
# envExit should always run no matter what
if session_action["status"] != "SUCCEEDED":
return False
else:
assert (
session_action["status"] != "CANCELED"
) # This should not happen at all, so we fast exit
if expected_canceled_action == "envEnter":
# If we canceled the envEnter, everything else should have been NEVER_ATTEMPTED
assert session_action["status"] == "NEVER_ATTEMPTED"
else:
assert session_action["status"] == "SUCCEEDED"
return found_canceled_session_action

sessions: list[dict[str, Any]] = deadline_client.list_sessions(
Expand Down Expand Up @@ -789,7 +795,7 @@ def sync_input_action_started(current_job: Job) -> bool:
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
if "syncInputJobAttachments" in session_action["definition"]:
if session_action["status"] in ["ASSIGNED", "RUNNING"]:
Expand Down Expand Up @@ -823,7 +829,7 @@ def sync_input_actions_are_canceled(sessions: List[Dict[str, Any]]) -> bool:
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
# Session action should be canceled if it's the action we expect to be canceled
if "syncInputJobAttachments" in session_action["definition"]:
Expand All @@ -842,6 +848,218 @@ def sync_input_actions_are_canceled(sessions: List[Dict[str, Any]]) -> bool:

assert sync_input_actions_are_canceled(sessions)

def test_worker_reports_never_attempted_tasks_if_task_is_canceled(
self,
deadline_resources: DeadlineResources,
deadline_client: DeadlineClient,
session_worker: EC2InstanceWorker,
) -> None:

# Tests that if a taskRun action is cancelled, all remaining taskRun actions that depend on it will be NEVER_ATTEMPTED

step_one_name = "StepOneSucceeded"
step_two_name = "StepTwoToCancel"
step_three_name = "StepThreeNeverAttempted"
job: Job = Job.submit(
client=deadline_client,
farm=deadline_resources.farm,
queue=deadline_resources.queue_a,
priority=98,
template={
"specificationVersion": "jobtemplate-2023-09",
"name": "TestSecondTaskRunCancelled",
"jobEnvironments": [
{
"name": "WhoAmiJobEnvironment",
"script": {
"actions": {
"onEnter": ({"command": "whoami"}),
"onExit": ({"command": "whoami"}),
},
},
},
],
"steps": [
{
"name": step_one_name,
"hostRequirements": {
"attributes": [
{
"name": "attr.worker.os.family",
"allOf": [os.environ["OPERATING_SYSTEM"]],
}
]
},
"script": {
"actions": {
"onRun": {
"command": (
"/bin/sleep"
if os.environ["OPERATING_SYSTEM"] == "linux"
else "powershell"
),
"args": (
["1"]
if os.environ["OPERATING_SYSTEM"] == "linux"
else ["ping", "localhost", "-n", "1"]
),
},
}
},
},
{
"name": step_two_name,
"hostRequirements": {
"attributes": [
{
"name": "attr.worker.os.family",
"allOf": [os.environ["OPERATING_SYSTEM"]],
}
]
},
"dependencies": [{"dependsOn": step_one_name}],
"script": {
"actions": {
"onRun": {
"command": (
"/bin/sleep"
if os.environ["OPERATING_SYSTEM"] == "linux"
else "powershell"
),
"args": (
["120"]
if os.environ["OPERATING_SYSTEM"] == "linux"
else ["ping", "localhost", "-n", "120"]
),
"cancelation": {
"mode": "NOTIFY_THEN_TERMINATE",
"notifyPeriodInSeconds": 1,
},
},
}
},
},
{
"name": step_three_name,
"hostRequirements": {
"attributes": [
{
"name": "attr.worker.os.family",
"allOf": [os.environ["OPERATING_SYSTEM"]],
}
]
},
"dependencies": [{"dependsOn": step_two_name}],
"script": {
"actions": {
"onRun": {"command": "whoami"},
}
},
},
],
},
)

# Wait for the job to start

@backoff.on_predicate(
wait_gen=backoff.constant,
max_time=120,
interval=10,
)
def is_job_started_with_sessions(current_job: Job) -> bool:
current_job.refresh_job_info(client=deadline_client)
LOG.info(f"Waiting for job {current_job.id} to be created")
if current_job.lifecycle_status == "CREATE_IN_PROGRESS":
return False
sessions: list[dict[str, Any]] = deadline_client.list_sessions(
farmId=job.farm.id, queueId=job.queue.id, jobId=job.id
).get("sessions")
if sessions and len(sessions) > 0:
return True
return False

assert is_job_started_with_sessions(job)

# Find both the SUCCEEDED and RUNNING session action IDs

@backoff.on_exception(
backoff.constant,
Exception,
max_time=90,
interval=5,
)
def find_succeeded_and_running_actions() -> tuple[str, str]:
found_succeeded_action_id: Optional[str] = None
found_running_action_id: Optional[str] = None

sessions: list[dict[str, Any]] = deadline_client.list_sessions(
farmId=job.farm.id, queueId=job.queue.id, jobId=job.id
).get("sessions")

for session in sessions:
session_actions: list[dict[str, Any]] = deadline_client.list_session_actions(
farmId=job.farm.id,
queueId=job.queue.id,
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
for session_action in session_actions:
definition: dict[str, Any] = session_action["definition"]
if "taskRun" in definition:
if session_action["status"] == "SUCCEEDED":
found_succeeded_action_id = session_action["sessionActionId"]
elif session_action["status"] == "RUNNING":
found_running_action_id = session_action["sessionActionId"]

assert found_succeeded_action_id is not None
assert found_running_action_id is not None

return found_succeeded_action_id, found_running_action_id

succeeded_action_id, running_action_id = find_succeeded_and_running_actions()
deadline_client.update_job(
farmId=job.farm.id,
queueId=job.queue.id,
jobId=job.id,
targetTaskRunStatus="CANCELED",
)

# Wait for the job to be canceled

job.wait_until_complete(client=deadline_client)

sessions: list[dict[str, Any]] = deadline_client.list_sessions(
farmId=job.farm.id, queueId=job.queue.id, jobId=job.id
).get("sessions")
for session in sessions:
session_actions: list[dict[str, Any]] = deadline_client.list_session_actions(
farmId=job.farm.id,
queueId=job.queue.id,
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
for session_action in session_actions:
definition: dict[str, Any] = session_action["definition"]
if (
"envEnter" in definition
or "envExit" in definition
or (
"taskRun" in definition
and succeeded_action_id == session_action["sessionActionId"]
)
):
assert session_action["status"] == "SUCCEEDED"
elif (
"taskRun" in definition
and running_action_id == session_action["sessionActionId"]
):
# The action that was running for a long time should now be CANCELED!
assert session_action["status"] == "CANCELED"
else:
# Every other action should be in NEVER_ATTEMPTED status
assert session_action["status"] == "NEVER_ATTEMPTED"

def test_worker_always_runs_env_exit_despite_failure(
self,
deadline_resources: DeadlineResources,
Expand Down Expand Up @@ -929,7 +1147,7 @@ def check_environment_action_statuses_are_expected() -> None:
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
definition = session_action["definition"]
if "envEnter" in definition:
Expand Down Expand Up @@ -2191,7 +2409,7 @@ def test_worker_fails_job_attachment_sync_when_file_does_not_exist_in_bucket(
sessionId=session["sessionId"],
).get("sessionActions")

logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
# Session action should be failed for a syncinputJobAttachments action
if "syncInputJobAttachments" in session_action["definition"]:
Expand Down

0 comments on commit 2e622a4

Please sign in to comment.