Skip to content

Commit

Permalink
test: add test that verifies job fails if attachment is accidentally …
Browse files Browse the repository at this point in the history
…deleted from bucket (#436)



* test: add test that verifies job fails if attachment is accidentally deleted from bucket

Signed-off-by: Yutong Li <[email protected]>
---------
Signed-off-by: Yutong Li <[email protected]>
  • Loading branch information
YutongLi291 authored Oct 10, 2024
1 parent 760118c commit 22934f1
Showing 1 changed file with 210 additions and 0 deletions.
210 changes: 210 additions & 0 deletions test/e2e/test_job_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tempfile
from e2e.utils import wait_for_job_output, submit_sleep_job, submit_custom_job


LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -2015,6 +2016,215 @@ def session_action_has_expected_progress(session_action_id) -> bool:

assert job.task_run_status == TaskStatus.SUCCEEDED

def test_worker_fails_job_attachment_sync_when_file_does_not_exist_in_bucket(
self,
deadline_resources: DeadlineResources,
session_worker: EC2InstanceWorker,
deadline_client: DeadlineClient,
tmp_path: pathlib.Path,
) -> None:
# Submits a job with input job attachments, deleting the input files from the Job Attadchments bucket before the job starts, and verifying the job syncInputAttachments step fails
job_bundle_path: str = os.path.join(
tmp_path,
"job_attachment_bundle",
)
os.mkdir(job_bundle_path)

input_file_name: str = os.path.join(job_bundle_path, str(uuid.uuid4()))
with open(input_file_name, "w+") as file_to_write:
file_to_write.write(str(uuid.uuid4()))

job_parameters: List[Dict[str, str]] = [
{"name": "deadline:targetTaskRunStatus", "value": "SUSPENDED"},
{"name": "DataDir", "value": job_bundle_path},
]

queue_to_use = deadline_resources.queue_a
with open(
os.path.join(job_bundle_path, "parameter_values.json"), "w+"
) as parameter_values_file:
# Make sure the job is submitted in SUSPENDED state so we have time to delete an input job attachment in the bucket
parameter_values_file.write(
json.dumps(
{
"parameterValues": [
{"name": "deadline:targetTaskRunStatus", "value": "SUSPENDED"},
]
}
)
)
with open(os.path.join(job_bundle_path, "template.json"), "w+") as template_file:
template_file.write(
json.dumps(
{
"specificationVersion": "jobtemplate-2023-09",
"name": "JobAttachmentThatGetsDeleted",
"parameterDefinitions": [
{
"name": "DataDir",
"type": "PATH",
"dataFlow": "INOUT",
},
],
"steps": [
{
"name": "Step0",
"hostRequirements": {
"attributes": [
{
"name": "attr.worker.os.family",
"allOf": [os.environ["OPERATING_SYSTEM"]],
}
]
},
"script": {
"actions": {"onRun": {"command": "whoami"}},
},
}
],
}
)
)

config = configparser.ConfigParser()
set_setting("defaults.farm_id", deadline_resources.farm.id, config)
set_setting("defaults.queue_id", queue_to_use.id, config)
job_id: Optional[str] = api.create_job_from_job_bundle(
job_bundle_path,
job_parameters,
priority=99,
config=config,
queue_parameter_definitions=[],
)

assert job_id is not None

job_details = Job.get_job_details(
client=deadline_client,
farm=deadline_resources.farm,
queue=queue_to_use,
job_id=job_id,
)

LOG.info(f"job details: {job_details}")
assert job_details.get("task_run_status") == "SUSPENDED"
attachments: Optional[dict] = job_details.get("attachments")
assert attachments is not None

manifests: list[dict[str, Any]] = attachments["manifests"]

assert manifests is not None
first_manifest = manifests[0]

input_manifest_path = first_manifest["inputManifestPath"]

# Find the input manifest
queue_job_attachment_settings: dict[str, Any] = deadline_client.get_queue(
farmId=deadline_resources.farm.id,
queueId=queue_to_use.id,
)["jobAttachmentSettings"]

job_attachments_bucket_name: str = queue_job_attachment_settings["s3BucketName"]
root_prefix: str = queue_job_attachment_settings["rootPrefix"]

s3_client = boto3.client("s3")

get_manifest_object_result: dict[str, Any] = s3_client.get_object(
Bucket=job_attachments_bucket_name,
Key=root_prefix + "/Manifests/" + input_manifest_path,
)

get_object_result_body: dict[str, Any] = json.loads(
get_manifest_object_result["Body"].read()
)

# Get the Job Attachment bucket file paths of the input files
input_file_paths: list[dict[str, Any]] = get_object_result_body["paths"]
first_input_file_hash = input_file_paths[0]["hash"]

# Delete one of the input files from the Job Attachments bucket after confirming that it exists

s3_client.get_object(
Bucket=job_attachments_bucket_name,
Key=root_prefix + "/Data/" + first_input_file_hash + ".xxh128",
)
s3_client.delete_object(
Bucket=job_attachments_bucket_name,
Key=root_prefix + "/Data/" + first_input_file_hash + ".xxh128",
)

# Start the job, it should fail since one of the input files is missing from the Job Attachments bucket

deadline_client.update_job(
farmId=deadline_resources.farm.id,
jobId=job_id,
queueId=queue_to_use.id,
targetTaskRunStatus="READY",
)

job: Job = Job(
farm=deadline_resources.farm,
queue=deadline_resources.queue_a,
template={},
**job_details,
)
job.wait_until_complete(client=deadline_client)

# Job should have failed due to not being able to sync attachments
assert job.task_run_status == TaskStatus.FAILED

sessions: list[dict[str, Any]] = deadline_client.list_sessions(
farmId=job.farm.id, queueId=job.queue.id, jobId=job.id
).get("sessions")

found_failed_session_action = False
for session in sessions:
session_actions = deadline_client.list_session_actions(
farmId=job.farm.id,
queueId=job.queue.id,
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")

logging.info(f"Session actions: {session_actions}")
for session_action in session_actions:
# Session action should be failed for a syncinputJobAttachments action
if "syncInputJobAttachments" in session_action["definition"]:
assert (
session_action["status"] == "FAILED"
), f"syncInputJobAttachments Session action that should have failed is in {session_action['status']} status. {session_action}"
found_failed_session_action = True
else:
# Every other session action should have never been attempted, since the syncInputJobAttachments action failed
assert (
session_action["status"] == "NEVER_ATTEMPTED"
), f"Session action that should not have failed is in FAILED status. {session_action}"
assert (
found_failed_session_action
), "Was not able to find any syncInputJobAttachments session actions"

# Make sure the worker is still running and not crashed after this
get_worker_response: dict[str, Any] = deadline_client.get_worker(
farmId=session_worker.configuration.farm_id,
fleetId=session_worker.configuration.fleet.id,
workerId=session_worker.worker_id,
)

assert get_worker_response["status"] in ["STARTED", "RUNNING", "IDLE"]

# Submit another job and verify that the worker still works properly and finishes the job

sleep_job = submit_sleep_job(
"Test Success Sleep Job after syncInputJobAttachments fail",
deadline_client,
deadline_resources.farm,
queue_to_use,
)

sleep_job.wait_until_complete(client=deadline_client)

assert sleep_job.task_run_status == TaskStatus.SUCCEEDED

def test_worker_enters_stopping_state_while_draining(
self,
deadline_resources: DeadlineResources,
Expand Down

0 comments on commit 22934f1

Please sign in to comment.