Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add test that verifies subsequent tasks are NEVER_ATTEMPTED when previous action is canceled #437

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 227 additions & 9 deletions test/e2e/test_job_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def is_expected_session_action_failed(sessions: List[Dict[str, Any]]) -> bool:
sessionId=session["sessionId"],
).get("sessionActions")

logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
# Session action should be failed IFF it's the expected action to fail
if expected_failed_action in session_action["definition"]:
Expand Down Expand Up @@ -246,7 +246,7 @@ def test_worker_fails_session_action_timeout(
sessionId=session["sessionId"],
).get("sessionActions")

logging.info(f"Session Actions: {session_actions}")
LOG.info(f"Session Actions: {session_actions}")
for session_action in session_actions:
# taskRun session action should be failed
if "taskRun" in session_action["definition"]:
Expand Down Expand Up @@ -428,10 +428,16 @@ def is_expected_session_action_canceled(sessions: List[Dict[str, Any]]) -> bool:
if expected_canceled_action in session_action["definition"]:
if session_action["status"] == "CANCELED":
found_canceled_session_action = True
elif "envExit" in session_action["definition"]:
# envExit should always run no matter what
if session_action["status"] != "SUCCEEDED":
return False
else:
assert (
session_action["status"] != "CANCELED"
) # This should not happen at all, so we fast exit
if expected_canceled_action == "envEnter":
# If we canceled the envEnter, everything else should have been NEVER_ATTEMPTED
assert session_action["status"] == "NEVER_ATTEMPTED"
else:
assert session_action["status"] == "SUCCEEDED"
return found_canceled_session_action

sessions: list[dict[str, Any]] = deadline_client.list_sessions(
Expand Down Expand Up @@ -789,7 +795,7 @@ def sync_input_action_started(current_job: Job) -> bool:
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
if "syncInputJobAttachments" in session_action["definition"]:
if session_action["status"] in ["ASSIGNED", "RUNNING"]:
Expand Down Expand Up @@ -823,7 +829,7 @@ def sync_input_actions_are_canceled(sessions: List[Dict[str, Any]]) -> bool:
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
# Session action should be canceled if it's the action we expect to be canceled
if "syncInputJobAttachments" in session_action["definition"]:
Expand All @@ -842,6 +848,218 @@ def sync_input_actions_are_canceled(sessions: List[Dict[str, Any]]) -> bool:

assert sync_input_actions_are_canceled(sessions)

def test_worker_reports_never_attempted_tasks_if_task_is_canceled(
self,
deadline_resources: DeadlineResources,
deadline_client: DeadlineClient,
session_worker: EC2InstanceWorker,
) -> None:

# Tests that if a taskRun action is cancelled, all remaining taskRun actions that depend on it will be NEVER_ATTEMPTED

step_one_name = "StepOneSucceeded"
step_two_name = "StepTwoToCancel"
step_three_name = "StepThreeNeverAttempted"
job: Job = Job.submit(
client=deadline_client,
farm=deadline_resources.farm,
queue=deadline_resources.queue_a,
priority=98,
template={
"specificationVersion": "jobtemplate-2023-09",
"name": "TestSecondTaskRunCancelled",
"jobEnvironments": [
{
"name": "WhoAmiJobEnvironment",
"script": {
"actions": {
"onEnter": ({"command": "whoami"}),
"onExit": ({"command": "whoami"}),
},
},
},
],
"steps": [
{
"name": step_one_name,
"hostRequirements": {
"attributes": [
{
"name": "attr.worker.os.family",
"allOf": [os.environ["OPERATING_SYSTEM"]],
}
]
},
"script": {
"actions": {
"onRun": {
"command": (
"/bin/sleep"
if os.environ["OPERATING_SYSTEM"] == "linux"
else "powershell"
),
"args": (
["1"]
if os.environ["OPERATING_SYSTEM"] == "linux"
else ["ping", "localhost", "-n", "1"]
),
},
}
},
},
{
"name": step_two_name,
"hostRequirements": {
"attributes": [
{
"name": "attr.worker.os.family",
"allOf": [os.environ["OPERATING_SYSTEM"]],
}
]
},
"dependencies": [{"dependsOn": step_one_name}],
"script": {
"actions": {
"onRun": {
"command": (
"/bin/sleep"
if os.environ["OPERATING_SYSTEM"] == "linux"
else "powershell"
),
"args": (
["120"]
if os.environ["OPERATING_SYSTEM"] == "linux"
else ["ping", "localhost", "-n", "120"]
),
"cancelation": {
"mode": "NOTIFY_THEN_TERMINATE",
"notifyPeriodInSeconds": 1,
},
},
}
},
},
{
"name": step_three_name,
"hostRequirements": {
"attributes": [
{
"name": "attr.worker.os.family",
"allOf": [os.environ["OPERATING_SYSTEM"]],
}
]
},
"dependencies": [{"dependsOn": step_two_name}],
"script": {
"actions": {
"onRun": {"command": "whoami"},
}
},
},
],
},
)

# Wait for the job to start

YutongLi291 marked this conversation as resolved.
Show resolved Hide resolved
@backoff.on_predicate(
wait_gen=backoff.constant,
max_time=120,
interval=10,
)
def is_job_started_with_sessions(current_job: Job) -> bool:
current_job.refresh_job_info(client=deadline_client)
LOG.info(f"Waiting for job {current_job.id} to be created")
if current_job.lifecycle_status == "CREATE_IN_PROGRESS":
return False
sessions: list[dict[str, Any]] = deadline_client.list_sessions(
farmId=job.farm.id, queueId=job.queue.id, jobId=job.id
).get("sessions")
if sessions and len(sessions) > 0:
return True
return False

assert is_job_started_with_sessions(job)

# Find both the SUCCEEDED and RUNNING session action IDs

@backoff.on_exception(
backoff.constant,
Exception,
max_time=90,
interval=5,
)
def find_succeeded_and_running_actions() -> tuple[str, str]:
found_succeeded_action_id: Optional[str] = None
found_running_action_id: Optional[str] = None

sessions: list[dict[str, Any]] = deadline_client.list_sessions(
farmId=job.farm.id, queueId=job.queue.id, jobId=job.id
).get("sessions")

for session in sessions:
session_actions: list[dict[str, Any]] = deadline_client.list_session_actions(
farmId=job.farm.id,
queueId=job.queue.id,
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
for session_action in session_actions:
definition: dict[str, Any] = session_action["definition"]
if "taskRun" in definition:
if session_action["status"] == "SUCCEEDED":
found_succeeded_action_id = session_action["sessionActionId"]
elif session_action["status"] == "RUNNING":
found_running_action_id = session_action["sessionActionId"]

assert found_succeeded_action_id is not None
assert found_running_action_id is not None

return found_succeeded_action_id, found_running_action_id

succeeded_action_id, running_action_id = find_succeeded_and_running_actions()
deadline_client.update_job(
farmId=job.farm.id,
queueId=job.queue.id,
jobId=job.id,
targetTaskRunStatus="CANCELED",
)

# Wait for the job to be canceled

job.wait_until_complete(client=deadline_client)

sessions: list[dict[str, Any]] = deadline_client.list_sessions(
farmId=job.farm.id, queueId=job.queue.id, jobId=job.id
).get("sessions")
for session in sessions:
session_actions: list[dict[str, Any]] = deadline_client.list_session_actions(
farmId=job.farm.id,
queueId=job.queue.id,
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
for session_action in session_actions:
definition: dict[str, Any] = session_action["definition"]
if (
"envEnter" in definition
or "envExit" in definition
or (
"taskRun" in definition
and succeeded_action_id == session_action["sessionActionId"]
)
):
assert session_action["status"] == "SUCCEEDED"
elif (
"taskRun" in definition
and running_action_id == session_action["sessionActionId"]
):
# The action that was running for a long time should now be CANCELED!
assert session_action["status"] == "CANCELED"
else:
# Every other action should be in NEVER_ATTEMPTED status
assert session_action["status"] == "NEVER_ATTEMPTED"

def test_worker_always_runs_env_exit_despite_failure(
self,
deadline_resources: DeadlineResources,
Expand Down Expand Up @@ -929,7 +1147,7 @@ def check_environment_action_statuses_are_expected() -> None:
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")
logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
definition = session_action["definition"]
if "envEnter" in definition:
Expand Down Expand Up @@ -2191,7 +2409,7 @@ def test_worker_fails_job_attachment_sync_when_file_does_not_exist_in_bucket(
sessionId=session["sessionId"],
).get("sessionActions")

logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")
for session_action in session_actions:
# Session action should be failed for a syncinputJobAttachments action
if "syncInputJobAttachments" in session_action["definition"]:
Expand Down