Skip to content

Commit

Permalink
perf(secrets): avoid fetching k8s secrets multiple times (reanahub#451)
Browse files Browse the repository at this point in the history
Cache user secrets to avoid fetching them multiple times when creating
many jobs.

Closes reanahub/reana-commons#455
  • Loading branch information
mdonadoni committed May 6, 2024
1 parent f4f955c commit a8d4a77
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 12 deletions.
3 changes: 3 additions & 0 deletions reana_job_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,6 @@

SLURM_SSH_AUTH_TIMEOUT = float(os.getenv("SLURM_SSH_AUTH_TIMEOUT", "60"))
"""Seconds to wait for SLURM SSH authentication response."""

REANA_USER_ID = os.getenv("REANA_USER_ID")
"""User UUID of the owner of the workflow."""
29 changes: 20 additions & 9 deletions reana_job_controller/kubernetes_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
current_k8s_corev1_api_client,
)
from reana_commons.k8s.kerberos import get_kerberos_k8s_config
from reana_commons.k8s.secrets import REANAUserSecretsStore
from reana_commons.k8s.secrets import UserSecretsStore, UserSecrets
from reana_commons.k8s.volumes import (
get_k8s_cvmfs_volumes,
get_reana_shared_volume,
Expand All @@ -51,6 +51,7 @@
from reana_job_controller.config import (
REANA_KUBERNETES_JOBS_MEMORY_LIMIT,
REANA_KUBERNETES_JOBS_MAX_USER_MEMORY_LIMIT,
REANA_USER_ID,
)
from reana_job_controller.errors import ComputingBackendSubmissionError
from reana_job_controller.job_manager import JobManager
Expand Down Expand Up @@ -81,6 +82,7 @@ def __init__(
voms_proxy=False,
rucio=False,
kubernetes_job_timeout: Optional[int] = None,
secrets: Optional[UserSecrets] = None,
**kwargs,
):
"""Instantiate kubernetes job manager.
Expand Down Expand Up @@ -117,6 +119,8 @@ def __init__(
:param rucio: Decides if a rucio environment should be provided
for job.
:type rucio: bool
:param secrets: User secrets, if none they will be fetched from k8s.
:type secrets: Optional[UserSecrets]
"""
super(KubernetesJobManager, self).__init__(
docker_img=docker_img,
Expand All @@ -127,6 +131,7 @@ def __init__(
workflow_workspace=workflow_workspace,
job_name=job_name,
)

self.compute_backend = "Kubernetes"
self.cvmfs_mounts = cvmfs_mounts
self.shared_file_system = shared_file_system
Expand All @@ -137,6 +142,14 @@ def __init__(
self.set_memory_limit(kubernetes_memory_limit)
self.workflow_uuid = workflow_uuid
self.kubernetes_job_timeout = kubernetes_job_timeout
self._secrets: Optional[UserSecrets] = secrets

@property
def secrets(self):
"""Get cached secrets if present, otherwise fetch them from k8s."""
if self._secrets is None:
self._secrets = UserSecretsStore.fetch(REANA_USER_ID)
return self._secrets

@JobManager.execution_hook
def execute(self):
Expand Down Expand Up @@ -179,15 +192,13 @@ def execute(self):
},
},
}
user_id = os.getenv("REANA_USER_ID")
secrets_store = REANAUserSecretsStore(user_id)

secret_env_vars = secrets_store.get_env_secrets_as_k8s_spec()
secret_env_vars = self.secrets.get_env_secrets_as_k8s_spec()
job_spec = self.job["spec"]["template"]["spec"]
job_spec["containers"][0]["env"].extend(secret_env_vars)
job_spec["volumes"].append(secrets_store.get_file_secrets_volume_as_k8s_specs())
job_spec["volumes"].append(self.secrets.get_file_secrets_volume_as_k8s_specs())

secrets_volume_mount = secrets_store.get_secrets_volume_mount_as_k8s_spec()
secrets_volume_mount = self.secrets.get_secrets_volume_mount_as_k8s_spec()
job_spec["containers"][0]["volumeMounts"].append(secrets_volume_mount)

if self.env_vars:
Expand Down Expand Up @@ -215,7 +226,7 @@ def execute(self):
)

if self.kerberos:
self._add_krb5_containers(secrets_store)
self._add_krb5_containers(self.secrets)

if self.voms_proxy:
self._add_voms_proxy_init_container(secrets_volume_mount, secret_env_vars)
Expand Down Expand Up @@ -444,10 +455,10 @@ def add_volumes(self, volumes):
].append(volume_mount)
self.job["spec"]["template"]["spec"]["volumes"].append(volume)

def _add_krb5_containers(self, secrets_store):
def _add_krb5_containers(self, secrets):
"""Add krb5 init and renew containers for a job."""
krb5_config = get_kerberos_k8s_config(
secrets_store,
secrets,
kubernetes_uid=self.kubernetes_uid,
)

Expand Down
23 changes: 20 additions & 3 deletions reana_job_controller/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)

from reana_db.models import JobStatus

from reana_commons.k8s.secrets import UserSecrets, UserSecretsStore

from reana_job_controller.errors import ComputingBackendSubmissionError
from reana_job_controller.job_db import (
Expand All @@ -46,6 +46,18 @@
job_schema = Job()


_SECRETS_CACHE = None
"""Cache for user secrets."""


def get_cached_user_secrets() -> UserSecrets:
"""Return cached user secrets."""
global _SECRETS_CACHE
if _SECRETS_CACHE is None:
_SECRETS_CACHE = UserSecretsStore.fetch(config.REANA_USER_ID)
return _SECRETS_CACHE


class JobCreationCondition:
"""Mechanism used to synchronize the creation of jobs.
Expand Down Expand Up @@ -269,15 +281,20 @@ def create_job(): # noqa
logging.error(msg, exc_info=True)
update_workflow_logs(job_request["workflow_uuid"], msg)
return jsonify({"job": msg}), 500

with current_app.app_context():
job_manager_cls = current_app.config["COMPUTE_BACKENDS"][compute_backend]()
try:
job_obj = job_manager_cls(**job_request)
job_obj = job_manager_cls(
**job_request,
# we pass the secrets from the local cache in order to avoid many calls
# to the k8s API when many jobs are executed at the same time
secrets=get_cached_user_secrets(),
)
except REANAKubernetesMemoryLimitExceeded as e:
return jsonify({"message": e.message}), 403
except REANAKubernetesWrongMemoryFormat as e:
return jsonify({"message": e.message}), 400

if not job_creation_condition.start_creation():
return jsonify({"message": "Cannot create new jobs, shutting down"}), 400

Expand Down

0 comments on commit a8d4a77

Please sign in to comment.