From ca240c34adad1128f8edf439c1e5278c98c80e98 Mon Sep 17 00:00:00 2001 From: Diego Rodriguez Date: Fri, 20 Jan 2017 15:06:40 +0100 Subject: [PATCH] api: add CVMFS support * Adds CVMFS field to jobs API. (closes #4) * Switches JSON template file with a Python one. Signed-off-by: Diego Rodriguez --- reana_job_controller/app.py | 17 ++--- reana_job_controller/config_template.json | 92 ----------------------- reana_job_controller/k8s.py | 34 +++++++-- reana_job_controller/volume_templates.py | 87 +++++++++++++++++++++ 4 files changed, 120 insertions(+), 110 deletions(-) delete mode 100644 reana_job_controller/config_template.json create mode 100644 reana_job_controller/volume_templates.py diff --git a/reana_job_controller/app.py b/reana_job_controller/app.py index 4add7742..8f4ef263 100644 --- a/reana_job_controller/app.py +++ b/reana_job_controller/app.py @@ -1,5 +1,4 @@ import copy -import json import logging import threading import uuid @@ -12,11 +11,6 @@ JOB_DB = {} -def get_config(experiment): - with open('config_template.json', 'r') as config: - return json.load(config)[experiment] - - def filter_jobs(job_db): job_db_copy = copy.deepcopy(job_db) for job_name in job_db_copy: @@ -49,18 +43,21 @@ def create_job(): cmd = request.json['cmd'] if 'cmd' in request.json else None env_vars = (request.json['env-vars'] if 'env-vars' in request.json else {}) - experiment_config = get_config(request.json['experiment']) - k8s_volume = experiment_config['k8s_volume'] + if request.json.get('cvmfs_mounts'): + cvmfs_repos = request.json.get('cvmfs_mounts') + else: + cvmfs_repos = [] job_id = str(uuid.uuid4()) job_obj = k8s.create_job(job_id, request.json['docker-img'], cmd, - [(k8s_volume, '/data')], + cvmfs_repos, env_vars, - request.json['experiment']) + request.json['experiment'], + shared_file_system=True) if job_obj: job = copy.deepcopy(request.json) diff --git a/reana_job_controller/config_template.json b/reana_job_controller/config_template.json deleted file mode 100644 index 2ed153b5..00000000 --- a/reana_job_controller/config_template.json +++ /dev/null @@ -1,92 +0,0 @@ -{ - "alice": { - "k8s_volume": { - "name": "pv", - "cephfs": { - "monitors": [ - "128.142.36.227:6790", - "128.142.39.77:6790", - "128.142.39.144:6790" - ], - "path": "/k8s/alice", - "user": "k8s", - "secretRef": { - "name": "ceph-secret", - "readOnly": false - } - } - } - }, - "atlas": { - "k8s_volume": { - "name": "pv", - "cephfs": { - "monitors": [ - "128.142.36.227:6790", - "128.142.39.77:6790", - "128.142.39.144:6790" - ], - "path": "/k8s/atlas", - "user": "k8s", - "secretRef": { - "name": "ceph-secret", - "readOnly": false - } - } - } - }, - "cms": { - "k8s_volume": { - "name": "pv", - "cephfs": { - "monitors": [ - "128.142.36.227:6790", - "128.142.39.77:6790", - "128.142.39.144:6790" - ], - "path": "/k8s/cms", - "user": "k8s", - "secretRef": { - "name": "ceph-secret", - "readOnly": false - } - } - } - }, - "lhcb": { - "k8s_volume": { - "name": "pv", - "cephfs": { - "monitors": [ - "128.142.36.227:6790", - "128.142.39.77:6790", - "128.142.39.144:6790" - ], - "path": "/k8s/lhcb", - "user": "k8s", - "secretRef": { - "name": "ceph-secret", - "readOnly": false - } - } - } - }, - "recast": { - "k8s_volume": { - "name": "pv", - "cephfs": { - "monitors": [ - "128.142.36.227:6790", - "128.142.39.77:6790", - "128.142.39.144:6790" - ], - "path": "/k8s/recast", - "user": "k8s", - "secretRef": { - "name": "ceph-secret", - "readOnly": false - } - } - } - } -} diff --git a/reana_job_controller/k8s.py b/reana_job_controller/k8s.py index c428cc19..0e440e26 100644 --- a/reana_job_controller/k8s.py +++ b/reana_job_controller/k8s.py @@ -2,6 +2,7 @@ import time import pykube from six.moves.urllib.parse import urlencode +import volume_templates api = pykube.HTTPClient(pykube.KubeConfig.from_service_account()) api.session.verify = False @@ -12,7 +13,17 @@ def get_jobs(): filter(namespace=pykube.all)] -def create_job(job_id, docker_img, cmd, volumes, env_vars, namespace): +def add_shared_volume(job, namespace): + volume = volume_templates.get_k8s_cephfs_volume(namespace) + mount_path = volume_templates.CEPHFS_MOUNT_PATH + job['spec']['template']['spec']['containers'][0]['volumeMounts'].append( + {'name': volume['name'], 'mountPath': mount_path} + ) + job['spec']['template']['spec']['volumes'].append(volume) + + +def create_job(job_id, docker_img, cmd, cvmfs_repos, env_vars, namespace, + shared_file_system): job = { 'kind': 'Job', 'apiVersion': 'batch/v1', @@ -30,31 +41,38 @@ def create_job(job_id, docker_img, cmd, volumes, env_vars, namespace): 'containers': [ { 'name': job_id, - 'image': docker_img + 'image': docker_img, + 'env': [], + 'volumeMounts': [] }, ], + 'volumes': [], 'restartPolicy': 'OnFailure' } } } } - import shlex if cmd: + import shlex (job['spec']['template']['spec']['containers'] [0]['command']) = shlex.split(cmd) if env_vars: - job['spec']['template']['spec']['containers'][0]['env'] = [] for var, value in env_vars.items(): job['spec']['template']['spec']['containers'][0]['env'].append( {'name': var, 'value': value} ) - if volumes: - job['spec']['template']['spec']['containers'][0]['volumeMounts'] = [] - job['spec']['template']['spec']['volumes'] = [] - for volume, mount_path in volumes: + if shared_file_system: + add_shared_volume(job, namespace) + + if cvmfs_repos: + for num, repo in enumerate(cvmfs_repos): + volume = volume_templates.get_k8s_cvmfs_volume(namespace, repo) + mount_path = volume_templates.get_cvmfs_mount_point(repo) + + volume['name'] += '-{}'.format(num) (job['spec']['template']['spec']['containers'][0] ['volumeMounts'].append( {'name': volume['name'], 'mountPath': mount_path} diff --git a/reana_job_controller/volume_templates.py b/reana_job_controller/volume_templates.py new file mode 100644 index 00000000..228a5101 --- /dev/null +++ b/reana_job_controller/volume_templates.py @@ -0,0 +1,87 @@ +import json +from string import Template + +CEPH_SECRET_NAME = 'ceph-secret' + +CEPHFS_PATHS = { + 'alice': '/k8s/alice', + 'atlas': '/k8s/atlas', + 'cms': '/k8s/cms', + 'lhcb': '/k8s/lhcb', + 'recast': '/k8s/recast' +} + +CVMFS_REPOSITORIES = { + 'alice': 'alice.cern.ch', + 'alice-ocdb': 'alice-ocdb.cern.ch', + 'atlas': 'atlas.cern.ch', + 'atlas-condb': 'atlas-condb.cern.ch', + 'cms': 'cms.cern.ch', + 'lhcb': 'lhcb.cern.ch', + 'na61': 'na61.cern.ch', + 'boss': 'boss.cern.ch', + 'grid': 'grid.cern.ch', + 'sft': 'sft.cern.ch', + 'geant4': 'geant4.cern.ch' +} + +CEPHFS_MOUNT_PATH = '/data' + +k8s_cephfs_template = Template("""{ + "name": "cephfs-$experiment", + "cephfs": { + "monitors": [ + "128.142.36.227:6790", + "128.142.39.77:6790", + "128.142.39.144:6790" + ], + "path": "$path", + "user": "k8s", + "secretRef": { + "name": "$secret_name", + "readOnly": false + } + } +}""") + +k8s_cvmfs_template = Template("""{ + "name": "cvmfs-$experiment", + "flexVolume": { + "driver": "cern/cvmfs", + "options": { + "repository": "$repository" + } + } +}""") + + +def get_cvmfs_mount_point(repository_name): + return '/cvmfs/{repository}'.format( + repository=CVMFS_REPOSITORIES[repository_name] + ) + + +def get_k8s_cephfs_volume(experiment): + """Render k8s CephFS volume template + + :param experiment: Experiment name. + :returns: k8s CephFS volume spec as a dictionary. + """ + return json.loads( + k8s_cephfs_template.substitute(experiment=experiment, + path=CEPHFS_PATHS[experiment], + secret_name=CEPH_SECRET_NAME) + ) + + +def get_k8s_cvmfs_volume(experiment, repository): + """Render k8s CVMFS volume template + + :param experiment: Experiment name. + :returns: k8s CVMFS volume spec as a dictionary. + """ + if repository in CVMFS_REPOSITORIES: + return json.loads(k8s_cvmfs_template.substitute(experiment=experiment, + repository=repository)) + else: + raise ValueError('The provided repository doesn\'t exist')