Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shutdown): stop all running jobs before stopping workflow (#423) #423

Merged
merged 4 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,54 @@
},
"summary": "Returns the logs for a given job."
}
},
"/shutdown": {
"delete": {
"consumes": [
"application/json"
],
"description": "All running jobs will be stopped and no more jobs will be scheduled. Kubernetes will call this endpoint before stopping the pod (PreStop hook).",
"operationId": "shutdown",
"produces": [
"application/json"
],
"responses": {
"200": {
"description": "Request successful. All jobs were stopped.",
"examples": {
"application/json": {
"message": "All jobs stopped."
}
},
"schema": {
"properties": {
"message": {
"type": "string"
}
},
"type": "object"
}
},
"500": {
"description": "Request failed. Something went wrong while stopping the jobs.",
"examples": {
"application/json": {
"message": "Could not stop jobs cdcf48b1-c2f3-4693-8230-b066e088444c"
}
},
"schema": {
"properties": {
"message": {
"type": "string"
}
},
"type": "object"
}
}
},
"summary": "Stop reana-job-controller"
}
}
},
"swagger": "2.0"
}
}
15 changes: 13 additions & 2 deletions reana_job_controller/htcondorcern_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,19 @@ def spool_output(backend_job_id):
logging.info("Spooling jobs {} output.".format(backend_job_id))
schedd.retrieve("ClusterId == {}".format(backend_job_id))

def get_logs(backend_job_id, workspace):
"""Return job logs if log files are present."""
@classmethod
def get_logs(cls, backend_job_id, **kwargs):
"""Return job logs if log files are present.

:param backend_job_id: ID of the job in the backend.
:param kwargs: Additional parameters needed to fetch logs.
In the case of HTCondor, the ``workspace`` parameter is needed.
:return: String containing the job logs.
"""
if "workspace" not in kwargs:
raise ValueError("Missing 'workspace' parameter")
workspace = kwargs["workspace"]

stderr_file = os.path.join(
workspace, "reana_job." + str(backend_job_id) + ".0.err"
)
Expand Down
38 changes: 37 additions & 1 deletion reana_job_controller/job_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

"""REANA-Job-Controller job database."""

import logging
from reana_commons.utils import calculate_hash_of_dir, calculate_job_input_hash
from reana_db.database import Session
from reana_db.models import JobCache
from reana_db.models import Job, JobCache, JobStatus

JOB_DB = {}

Expand Down Expand Up @@ -111,3 +112,38 @@ def retrieve_job_logs(job_id):
:returns: Job's logs.
"""
return JOB_DB[job_id].get("log")


def store_job_logs(job_id, logs):
"""Store job logs.

:param job_id: Internal REANA job ID.
:param logs: Job logs.
:type job_id: str
:type logs: str
"""
logging.info(f"Storing job logs: {job_id}")
JOB_DB[job_id]["log"] = logs
try:
Session.query(Job).filter_by(id_=job_id).update(dict(logs=logs))
Session.commit()
except Exception as e:
logging.exception(f"Exception while saving logs: {e}")


def update_job_status(job_id, status):
"""Update job status.

:param job_id: Internal REANA job ID.
:param status: One of the possible status for jobs in REANA
:type job_id: str
:type status: str
"""
logging.info(f"Updating status of job {job_id} to {status}")
JOB_DB[job_id]["status"] = status
try:
job_in_db = Session.query(Job).filter_by(id_=job_id).one()
job_in_db.status = JobStatus[status]
Session.commit()
except Exception as e:
logging.exception(f"Exception while updating status: {e}")
15 changes: 9 additions & 6 deletions reana_job_controller/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,14 @@ def get_status(self):
"""
raise NotImplementedError

def get_logs(self):
"""Get job log.

:returns: stderr, stdout of a job.
:rtype: dict
@classmethod
def get_logs(cls, backend_job_id, **kwargs):
"""Return job logs if log files are present.

:param backend_job_id: ID of the job in the backend.
:param kwargs: Additional parameters needed to fetch logs.
These depend on the chosen compute backend.
:return: String containing the job logs.
"""
raise NotImplementedError

Expand All @@ -109,7 +112,7 @@ def create_job_in_db(self, backend_job_id):
job_db_entry = JobTable(
backend_job_id=backend_job_id,
workflow_uuid=self.workflow_uuid,
status=JobStatus.created.name,
status=JobStatus.created,
compute_backend=self.compute_backend,
cvmfs_mounts=self.cvmfs_mounts or "",
shared_file_system=self.shared_file_system or False,
Expand Down
Loading
Loading