Skip to content

Commit

Permalink
refactor(monitor): move fetching of logs to job-manager (reanahub#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdonadoni committed Feb 1, 2024
1 parent 9d6fc99 commit 1fc117e
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 102 deletions.
15 changes: 13 additions & 2 deletions reana_job_controller/htcondorcern_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,8 +294,19 @@ def spool_output(backend_job_id):
logging.info("Spooling jobs {} output.".format(backend_job_id))
schedd.retrieve("ClusterId == {}".format(backend_job_id))

def get_logs(backend_job_id, workspace):
"""Return job logs if log files are present."""
@classmethod
def get_logs(cls, backend_job_id, **kwargs):
"""Return job logs if log files are present.
:param backend_job_id: ID of the job in the backend.
:param kwargs: Additional parameters needed to fetch logs.
In the case of HTCondor, the ``workspace`` parameter is needed.
:return: String containing the job logs.
"""
if "workspace" not in kwargs:
raise ValueError("Missing 'workspace' parameter")
workspace = kwargs["workspace"]

stderr_file = os.path.join(
workspace, "reana_job." + str(backend_job_id) + ".0.err"
)
Expand Down
13 changes: 8 additions & 5 deletions reana_job_controller/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,14 @@ def get_status(self):
"""
raise NotImplementedError

def get_logs(self):
"""Get job log.
:returns: stderr, stdout of a job.
:rtype: dict
@classmethod
def get_logs(cls, backend_job_id, **kwargs):
"""Return job logs if log files are present.
:param backend_job_id: ID of the job in the backend.
:param kwargs: Additional parameters needed to fetch logs.
These depend on the chosen compute backend.
:return: String containing the job logs.
"""
raise NotImplementedError

Expand Down
73 changes: 6 additions & 67 deletions reana_job_controller/job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
SLURM_SSH_AUTH_TIMEOUT,
)
from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status
from reana_job_controller.kubernetes_job_manager import KubernetesJobManager
from reana_job_controller.utils import SSHClient, singleton


Expand Down Expand Up @@ -214,70 +215,6 @@ def get_job_status(self, job_pod) -> Optional[str]:

return status

def _get_containers_logs(self, job_pod) -> Optional[str]:
try:
pod_logs = ""
container_statuses = self._get_job_container_statuses(job_pod)

logging.info(f"Grabbing pod {job_pod.metadata.name} logs ...")
for container in container_statuses:
# If we are here, it means that either all the containers have finished
# running or there has been some sort of failure. For this reason we get
# the logs of all containers, even if they are still running, as the job
# will not continue running after this anyway.
if container.state.terminated or container.state.running:
container_log = (
current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=job_pod.metadata.name,
container=container.name,
)
)
pod_logs += "{}: :\n {}\n".format(container.name, container_log)
if hasattr(container.state.terminated, "reason"):
pod_logs += "\n{}\n".format(container.state.terminated.reason)
elif container.state.waiting:
# No need to fetch logs, as the container has not started yet.
pod_logs += "Container {} failed, error: {}".format(
container.name, container.state.waiting.message
)

return pod_logs
except client.rest.ApiException as e:
logging.error(f"Error from Kubernetes API while getting job logs: {e}")
return None
except Exception as e:
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))
return None

def get_job_logs(self, job_pod) -> Optional[str]:
"""Get job logs."""
logs = self._get_containers_logs(job_pod)

if job_pod.status.reason == "DeadlineExceeded":
if not logs:
logs = ""

backend_job_id = self.get_backend_job_id(job_pod)
message = f"\n{job_pod.status.reason}\nThe job was killed due to exceeding timeout"

try:
specified_timeout = job_pod.spec.active_deadline_seconds
message += f" of {specified_timeout} seconds."
except AttributeError:
message += "."
logging.error(
f"Kubernetes job id: {backend_job_id}. Could not get job timeout from Job spec."
)

logs += message
logging.info(
f"Kubernetes job id: {backend_job_id} was killed due to timeout."
)

return logs

def watch_jobs(self, job_db, app=None):
"""Open stream connection to k8s apiserver to watch all jobs status.
Expand All @@ -302,15 +239,17 @@ def watch_jobs(self, job_db, app=None):
backend_job_id = self.get_backend_job_id(job_pod)
reana_job_id = self.get_reana_job_id(backend_job_id)

logs = self.get_job_logs(job_pod)
logs = self.job_manager_cls.get_logs(
backend_job_id, job_pod=job_pod
)

store_job_logs(reana_job_id, logs)
update_job_status(reana_job_id, job_status)

if JobStatus.should_cleanup_job(job_status):
self.clean_job(backend_job_id)
except client.rest.ApiException as e:
logging.error(
logging.exception(
f"Error from Kubernetes API while watching jobs pods: {e}"
)
except Exception as e:
Expand Down Expand Up @@ -414,7 +353,7 @@ def watch_jobs(self, job_db, app):
job_logs = app.htcondor_executor.submit(
self.job_manager_cls.get_logs,
job_dict["backend_job_id"],
job_db[job_id]["obj"].workflow_workspace,
workspace=job_db[job_id]["obj"].workflow_workspace,
)
logs = job_logs.result()
store_job_logs(job_id, logs)
Expand Down
98 changes: 97 additions & 1 deletion reana_job_controller/kubernetes_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
validate_kubernetes_memory,
kubernetes_memory_to_bytes,
)
from reana_commons.k8s.api_client import current_k8s_batchv1_api_client
from reana_commons.k8s.api_client import (
current_k8s_batchv1_api_client,
current_k8s_corev1_api_client,
)
from reana_commons.k8s.kerberos import get_kerberos_k8s_config
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.k8s.volumes import (
Expand Down Expand Up @@ -243,6 +246,99 @@ def _submit(self):
logging.exception("Unexpected error while submitting a job")
raise

@classmethod
def _get_containers_logs(cls, job_pod) -> Optional[str]:
"""Fetch the logs from all the containers in the given pod.
:param job_pod: Pod resource coming from Kubernetes.
"""
try:
pod_logs = ""
container_statuses = (job_pod.status.container_statuses or []) + (
job_pod.status.init_container_statuses or []
)

logging.info(f"Grabbing pod {job_pod.metadata.name} logs ...")
for container in container_statuses:
# If we are here, it means that either all the containers have finished
# running or there has been some sort of failure. For this reason we get
# the logs of all containers, even if they are still running, as the job
# will not continue running after this anyway.
if container.state.terminated or container.state.running:
container_log = (
current_k8s_corev1_api_client.read_namespaced_pod_log(
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
name=job_pod.metadata.name,
container=container.name,
)
)
pod_logs += "{}: :\n {}\n".format(container.name, container_log)
if hasattr(container.state.terminated, "reason"):
pod_logs += "\n{}\n".format(container.state.terminated.reason)
elif container.state.waiting:
# No need to fetch logs, as the container has not started yet.
pod_logs += "Container {} failed, error: {}".format(
container.name, container.state.waiting.message
)

return pod_logs
except client.rest.ApiException as e:
logging.error(f"Error from Kubernetes API while getting job logs: {e}")
return None
except Exception as e:
logging.error(traceback.format_exc())
logging.error("Unexpected error: {}".format(e))
return None

@classmethod
def get_logs(cls, backend_job_id, **kwargs):
"""Return job logs.
:param backend_job_id: ID of the job in the backend.
:param kwargs: Additional parameters needed to fetch logs.
In the case of Kubernetes, the ``job_pod`` parameter can be specified
to avoid fetching the pod specification from Kubernetes.
:return: String containing the job logs.
"""
if "job_pod" in kwargs:
job_pod = kwargs["job_pod"]
assert (
job_pod.metadata.labels["job-name"] == backend_job_id
), "Pod does not refer to correct job."
else:
job_pods = current_k8s_corev1_api_client.list_namespaced_pod(
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
label_selector=f"job-name={backend_job_id}",
)
if not job_pods.items:
logging.error(f"Could not find any pod for job {backend_job_id}")
return None
job_pod = job_pods.items[0]

logs = cls._get_containers_logs(job_pod)

if job_pod.status.reason == "DeadlineExceeded":
if not logs:
logs = ""

message = f"\n{job_pod.status.reason}\nThe job was killed due to exceeding timeout"

try:
specified_timeout = job_pod.spec.active_deadline_seconds
message += f" of {specified_timeout} seconds."
except AttributeError:
message += "."
logging.error(
f"Kubernetes job id: {backend_job_id}. Could not get job timeout from Job spec."
)

logs += message
logging.info(
f"Kubernetes job id: {backend_job_id} was killed due to timeout."
)

return logs

def stop(backend_job_id, asynchronous=True):
"""Stop Kubernetes job execution.
Expand Down
15 changes: 13 additions & 2 deletions reana_job_controller/slurmcern_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,19 @@ def _download_dir(sftp, remote_dir, local_dir):
else:
sftp.get(remote_path, local_path)

def get_logs(backend_job_id, workspace):
"""Return job logs if log files are present."""
@classmethod
def get_logs(cls, backend_job_id, **kwargs):
"""Return job logs if log files are present.
:param backend_job_id: ID of the job in the backend.
:param kwargs: Additional parameters needed to fetch logs.
In the case of Slurm, the ``workspace`` parameter is needed.
:return: String containing the job logs.
"""
if "workspace" not in kwargs:
raise ValueError("Missing 'workspace' parameter")
workspace = kwargs["workspace"]

stderr_file = os.path.join(
workspace, "reana_job." + str(backend_job_id) + ".err"
)
Expand Down
25 changes: 25 additions & 0 deletions tests/test_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,28 @@ def cache_job(self):
job_manager = TestJobManger("docker.io/library/busybox", "ls", {})
job_manager.execute()
assert job_manager.order_list == [1, 2, 3, 4]


@pytest.mark.parametrize(
"k8s_phase,k8s_container_state,k8s_logs,pod_logs",
[
("Pending", "ErrImagePull", "pull access denied", None),
("Pending", "InvalidImageName", "couldn't parse image", None),
("Succeeded", "Completed", None, "job finished"),
("Failed", "Error", None, "job failed"),
],
)
def test_kubernetes_get_job_logs(
k8s_phase, k8s_container_state, k8s_logs, pod_logs, app, kubernetes_job_pod
):
"""Test retrieval of job logs."""
k8s_corev1_api_client = mock.MagicMock()
k8s_corev1_api_client.read_namespaced_pod_log = lambda **kwargs: pod_logs
with mock.patch(
"reana_job_controller.kubernetes_job_manager.current_k8s_corev1_api_client",
k8s_corev1_api_client,
):
job_pod = kubernetes_job_pod(k8s_phase, k8s_container_state)
assert (k8s_logs or pod_logs) in KubernetesJobManager.get_logs(
job_pod.metadata.labels["job-name"], job_pod=job_pod
)
25 changes: 0 additions & 25 deletions tests/test_job_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,31 +39,6 @@ def test_initialisation(app):
JobMonitorSlurmCERN(app=app)


@pytest.mark.parametrize(
"k8s_phase,k8s_container_state,k8s_logs,pod_logs",
[
("Pending", "ErrImagePull", "pull access denied", None),
("Pending", "InvalidImageName", "couldn't parse image", None),
("Succeeded", "Completed", None, "job finished"),
("Failed", "Error", None, "job failed"),
],
)
def test_kubernetes_get_job_logs(
k8s_phase, k8s_container_state, k8s_logs, pod_logs, app, kubernetes_job_pod
):
"""Test retrieval of job logs."""
k8s_corev1_api_client = mock.MagicMock()
k8s_corev1_api_client.read_namespaced_pod_log = lambda **kwargs: pod_logs
with mock.patch.multiple(
"reana_job_controller.job_monitor",
current_k8s_corev1_api_client=k8s_corev1_api_client,
threading=mock.DEFAULT,
):
job_monitor_k8s = JobMonitorKubernetes(app=app)
job_pod = kubernetes_job_pod(k8s_phase, k8s_container_state)
assert (k8s_logs or pod_logs) in job_monitor_k8s.get_job_logs(job_pod)


@pytest.mark.parametrize(
"k8s_phase,k8s_container_state,expected_reana_status",
[
Expand Down

0 comments on commit 1fc117e

Please sign in to comment.