diff --git a/test/e2e/test_job_submissions.py b/test/e2e/test_job_submissions.py index 5dce004b..cc53d94b 100644 --- a/test/e2e/test_job_submissions.py +++ b/test/e2e/test_job_submissions.py @@ -27,6 +27,7 @@ import tempfile from e2e.utils import wait_for_job_output, submit_sleep_job, submit_custom_job + LOG = logging.getLogger(__name__) @@ -2015,6 +2016,215 @@ def session_action_has_expected_progress(session_action_id) -> bool: assert job.task_run_status == TaskStatus.SUCCEEDED + def test_worker_fails_job_attachment_sync_when_file_does_not_exist_in_bucket( + self, + deadline_resources: DeadlineResources, + session_worker: EC2InstanceWorker, + deadline_client: DeadlineClient, + tmp_path: pathlib.Path, + ) -> None: + # Submits a job with input job attachments, deleting the input files from the Job Attadchments bucket before the job starts, and verifying the job syncInputAttachments step fails + job_bundle_path: str = os.path.join( + tmp_path, + "job_attachment_bundle", + ) + os.mkdir(job_bundle_path) + + input_file_name: str = os.path.join(job_bundle_path, str(uuid.uuid4())) + with open(input_file_name, "w+") as file_to_write: + file_to_write.write(str(uuid.uuid4())) + + job_parameters: List[Dict[str, str]] = [ + {"name": "deadline:targetTaskRunStatus", "value": "SUSPENDED"}, + {"name": "DataDir", "value": job_bundle_path}, + ] + + queue_to_use = deadline_resources.queue_a + with open( + os.path.join(job_bundle_path, "parameter_values.json"), "w+" + ) as parameter_values_file: + # Make sure the job is submitted in SUSPENDED state so we have time to delete an input job attachment in the bucket + parameter_values_file.write( + json.dumps( + { + "parameterValues": [ + {"name": "deadline:targetTaskRunStatus", "value": "SUSPENDED"}, + ] + } + ) + ) + with open(os.path.join(job_bundle_path, "template.json"), "w+") as template_file: + template_file.write( + json.dumps( + { + "specificationVersion": "jobtemplate-2023-09", + "name": "JobAttachmentThatGetsDeleted", + "parameterDefinitions": [ + { + "name": "DataDir", + "type": "PATH", + "dataFlow": "INOUT", + }, + ], + "steps": [ + { + "name": "Step0", + "hostRequirements": { + "attributes": [ + { + "name": "attr.worker.os.family", + "allOf": [os.environ["OPERATING_SYSTEM"]], + } + ] + }, + "script": { + "actions": {"onRun": {"command": "whoami"}}, + }, + } + ], + } + ) + ) + + config = configparser.ConfigParser() + set_setting("defaults.farm_id", deadline_resources.farm.id, config) + set_setting("defaults.queue_id", queue_to_use.id, config) + job_id: Optional[str] = api.create_job_from_job_bundle( + job_bundle_path, + job_parameters, + priority=99, + config=config, + queue_parameter_definitions=[], + ) + + assert job_id is not None + + job_details = Job.get_job_details( + client=deadline_client, + farm=deadline_resources.farm, + queue=queue_to_use, + job_id=job_id, + ) + + LOG.info(f"job details: {job_details}") + assert job_details.get("task_run_status") == "SUSPENDED" + attachments: Optional[dict] = job_details.get("attachments") + assert attachments is not None + + manifests: list[dict[str, Any]] = attachments["manifests"] + + assert manifests is not None + first_manifest = manifests[0] + + input_manifest_path = first_manifest["inputManifestPath"] + + # Find the input manifest + queue_job_attachment_settings: dict[str, Any] = deadline_client.get_queue( + farmId=deadline_resources.farm.id, + queueId=queue_to_use.id, + )["jobAttachmentSettings"] + + job_attachments_bucket_name: str = queue_job_attachment_settings["s3BucketName"] + root_prefix: str = queue_job_attachment_settings["rootPrefix"] + + s3_client = boto3.client("s3") + + get_manifest_object_result: dict[str, Any] = s3_client.get_object( + Bucket=job_attachments_bucket_name, + Key=root_prefix + "/Manifests/" + input_manifest_path, + ) + + get_object_result_body: dict[str, Any] = json.loads( + get_manifest_object_result["Body"].read() + ) + + # Get the Job Attachment bucket file paths of the input files + input_file_paths: list[dict[str, Any]] = get_object_result_body["paths"] + first_input_file_hash = input_file_paths[0]["hash"] + + # Delete one of the input files from the Job Attachments bucket after confirming that it exists + + s3_client.get_object( + Bucket=job_attachments_bucket_name, + Key=root_prefix + "/Data/" + first_input_file_hash + ".xxh128", + ) + s3_client.delete_object( + Bucket=job_attachments_bucket_name, + Key=root_prefix + "/Data/" + first_input_file_hash + ".xxh128", + ) + + # Start the job, it should fail since one of the input files is missing from the Job Attachments bucket + + deadline_client.update_job( + farmId=deadline_resources.farm.id, + jobId=job_id, + queueId=queue_to_use.id, + targetTaskRunStatus="READY", + ) + + job: Job = Job( + farm=deadline_resources.farm, + queue=deadline_resources.queue_a, + template={}, + **job_details, + ) + job.wait_until_complete(client=deadline_client) + + # Job should have failed due to not being able to sync attachments + assert job.task_run_status == TaskStatus.FAILED + + sessions: list[dict[str, Any]] = deadline_client.list_sessions( + farmId=job.farm.id, queueId=job.queue.id, jobId=job.id + ).get("sessions") + + found_failed_session_action = False + for session in sessions: + session_actions = deadline_client.list_session_actions( + farmId=job.farm.id, + queueId=job.queue.id, + jobId=job.id, + sessionId=session["sessionId"], + ).get("sessionActions") + + logging.info(f"Session actions: {session_actions}") + for session_action in session_actions: + # Session action should be failed for a syncinputJobAttachments action + if "syncInputJobAttachments" in session_action["definition"]: + assert ( + session_action["status"] == "FAILED" + ), f"syncInputJobAttachments Session action that should have failed is in {session_action['status']} status. {session_action}" + found_failed_session_action = True + else: + # Every other session action should have never been attempted, since the syncInputJobAttachments action failed + assert ( + session_action["status"] == "NEVER_ATTEMPTED" + ), f"Session action that should not have failed is in FAILED status. {session_action}" + assert ( + found_failed_session_action + ), "Was not able to find any syncInputJobAttachments session actions" + + # Make sure the worker is still running and not crashed after this + get_worker_response: dict[str, Any] = deadline_client.get_worker( + farmId=session_worker.configuration.farm_id, + fleetId=session_worker.configuration.fleet.id, + workerId=session_worker.worker_id, + ) + + assert get_worker_response["status"] in ["STARTED", "RUNNING", "IDLE"] + + # Submit another job and verify that the worker still works properly and finishes the job + + sleep_job = submit_sleep_job( + "Test Success Sleep Job after syncInputJobAttachments fail", + deadline_client, + deadline_resources.farm, + queue_to_use, + ) + + sleep_job.wait_until_complete(client=deadline_client) + + assert sleep_job.task_run_status == TaskStatus.SUCCEEDED + def test_worker_enters_stopping_state_while_draining( self, deadline_resources: DeadlineResources,