Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add test that verifies job fails if attachment is accidentally deleted from bucket #436

Merged
merged 5 commits into from
Oct 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 210 additions & 0 deletions test/e2e/test_job_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tempfile
from e2e.utils import wait_for_job_output, submit_sleep_job, submit_custom_job


LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -2015,6 +2016,215 @@ def session_action_has_expected_progress(session_action_id) -> bool:

assert job.task_run_status == TaskStatus.SUCCEEDED

def test_worker_fails_job_attachment_sync_when_file_does_not_exist_in_bucket(
self,
deadline_resources: DeadlineResources,
session_worker: EC2InstanceWorker,
deadline_client: DeadlineClient,
tmp_path: pathlib.Path,
) -> None:
# Submits a job with input job attachments, deleting the input files from the Job Attadchments bucket before the job starts, and verifying the job syncInputAttachments step fails
job_bundle_path: str = os.path.join(
tmp_path,
"job_attachment_bundle",
)
os.mkdir(job_bundle_path)

input_file_name: str = os.path.join(job_bundle_path, str(uuid.uuid4()))
with open(input_file_name, "w+") as file_to_write:
file_to_write.write(str(uuid.uuid4()))

job_parameters: List[Dict[str, str]] = [
{"name": "deadline:targetTaskRunStatus", "value": "SUSPENDED"},
{"name": "DataDir", "value": job_bundle_path},
]

queue_to_use = deadline_resources.queue_a
with open(
os.path.join(job_bundle_path, "parameter_values.json"), "w+"
) as parameter_values_file:
# Make sure the job is submitted in SUSPENDED state so we have time to delete an input job attachment in the bucket
parameter_values_file.write(
json.dumps(
{
"parameterValues": [
{"name": "deadline:targetTaskRunStatus", "value": "SUSPENDED"},
]
Comment on lines +2050 to +2052
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this duplicated above in job_parameters variable?

}
)
)
with open(os.path.join(job_bundle_path, "template.json"), "w+") as template_file:
template_file.write(
json.dumps(
{
"specificationVersion": "jobtemplate-2023-09",
"name": "JobAttachmentThatGetsDeleted",
"parameterDefinitions": [
{
"name": "DataDir",
"type": "PATH",
"dataFlow": "INOUT",
},
],
"steps": [
{
"name": "Step0",
"hostRequirements": {
"attributes": [
{
"name": "attr.worker.os.family",
"allOf": [os.environ["OPERATING_SYSTEM"]],
}
]
},
"script": {
"actions": {"onRun": {"command": "whoami"}},
},
}
],
}
)
)

config = configparser.ConfigParser()
set_setting("defaults.farm_id", deadline_resources.farm.id, config)
set_setting("defaults.queue_id", queue_to_use.id, config)
job_id: Optional[str] = api.create_job_from_job_bundle(
job_bundle_path,
job_parameters,
priority=99,
config=config,
queue_parameter_definitions=[],
)

assert job_id is not None

job_details = Job.get_job_details(
client=deadline_client,
farm=deadline_resources.farm,
queue=queue_to_use,
job_id=job_id,
)

LOG.info(f"job details: {job_details}")
assert job_details.get("task_run_status") == "SUSPENDED"
attachments: Optional[dict] = job_details.get("attachments")
assert attachments is not None

manifests: list[dict[str, Any]] = attachments["manifests"]

assert manifests is not None
first_manifest = manifests[0]

input_manifest_path = first_manifest["inputManifestPath"]

# Find the input manifest
queue_job_attachment_settings: dict[str, Any] = deadline_client.get_queue(
farmId=deadline_resources.farm.id,
queueId=queue_to_use.id,
)["jobAttachmentSettings"]

job_attachments_bucket_name: str = queue_job_attachment_settings["s3BucketName"]
root_prefix: str = queue_job_attachment_settings["rootPrefix"]

s3_client = boto3.client("s3")

get_manifest_object_result: dict[str, Any] = s3_client.get_object(
Bucket=job_attachments_bucket_name,
Key=root_prefix + "/Manifests/" + input_manifest_path,
)

get_object_result_body: dict[str, Any] = json.loads(
get_manifest_object_result["Body"].read()
)

# Get the Job Attachment bucket file paths of the input files
input_file_paths: list[dict[str, Any]] = get_object_result_body["paths"]
first_input_file_hash = input_file_paths[0]["hash"]

# Delete one of the input files from the Job Attachments bucket after confirming that it exists

s3_client.get_object(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HeadObject is a lighter API call if we just want to check for existence, but if the role doesn't have permission to call it already, don't worry about it.

Bucket=job_attachments_bucket_name,
Key=root_prefix + "/Data/" + first_input_file_hash + ".xxh128",
)
s3_client.delete_object(
Bucket=job_attachments_bucket_name,
Key=root_prefix + "/Data/" + first_input_file_hash + ".xxh128",
)

# Start the job, it should fail since one of the input files is missing from the Job Attachments bucket

deadline_client.update_job(
farmId=deadline_resources.farm.id,
jobId=job_id,
queueId=queue_to_use.id,
targetTaskRunStatus="READY",
)

job: Job = Job(
farm=deadline_resources.farm,
queue=deadline_resources.queue_a,
template={},
**job_details,
)
job.wait_until_complete(client=deadline_client)

# Job should have failed due to not being able to sync attachments
assert job.task_run_status == TaskStatus.FAILED

sessions: list[dict[str, Any]] = deadline_client.list_sessions(
farmId=job.farm.id, queueId=job.queue.id, jobId=job.id
).get("sessions")

found_failed_session_action = False
for session in sessions:
session_actions = deadline_client.list_session_actions(
farmId=job.farm.id,
queueId=job.queue.id,
jobId=job.id,
sessionId=session["sessionId"],
).get("sessionActions")

logging.info(f"Session actions: {session_actions}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logging.info(f"Session actions: {session_actions}")
LOG.info(f"Session actions: {session_actions}")

Should use same logger as above for consistency. I believe this method goes to root logger

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address in next PR

for session_action in session_actions:
# Session action should be failed for a syncinputJobAttachments action
if "syncInputJobAttachments" in session_action["definition"]:
assert (
session_action["status"] == "FAILED"
), f"syncInputJobAttachments Session action that should have failed is in {session_action['status']} status. {session_action}"
found_failed_session_action = True
else:
# Every other session action should have never been attempted, since the syncInputJobAttachments action failed
assert (
session_action["status"] == "NEVER_ATTEMPTED"
), f"Session action that should not have failed is in FAILED status. {session_action}"
assert (
found_failed_session_action
), "Was not able to find any syncInputJobAttachments session actions"

# Make sure the worker is still running and not crashed after this
get_worker_response: dict[str, Any] = deadline_client.get_worker(
farmId=session_worker.configuration.farm_id,
fleetId=session_worker.configuration.fleet.id,
workerId=session_worker.worker_id,
)

assert get_worker_response["status"] in ["STARTED", "RUNNING", "IDLE"]

# Submit another job and verify that the worker still works properly and finishes the job

sleep_job = submit_sleep_job(
"Test Success Sleep Job after syncInputJobAttachments fail",
deadline_client,
deadline_resources.farm,
queue_to_use,
)

sleep_job.wait_until_complete(client=deadline_client)

assert sleep_job.task_run_status == TaskStatus.SUCCEEDED

def test_worker_enters_stopping_state_while_draining(
self,
deadline_resources: DeadlineResources,
Expand Down