Skip to content

Commit

Permalink
api: add CVMFS support
Browse files Browse the repository at this point in the history
* Adds CVMFS field to jobs API. (closes reanahub#4)

* Switches JSON template file with a Python one.

Signed-off-by: Diego Rodriguez <[email protected]>
  • Loading branch information
Diego Rodriguez committed Jan 25, 2017
1 parent 930b7bd commit ca240c3
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 110 deletions.
17 changes: 7 additions & 10 deletions reana_job_controller/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
import json
import logging
import threading
import uuid
Expand All @@ -12,11 +11,6 @@
JOB_DB = {}


def get_config(experiment):
with open('config_template.json', 'r') as config:
return json.load(config)[experiment]


def filter_jobs(job_db):
job_db_copy = copy.deepcopy(job_db)
for job_name in job_db_copy:
Expand Down Expand Up @@ -49,18 +43,21 @@ def create_job():
cmd = request.json['cmd'] if 'cmd' in request.json else None
env_vars = (request.json['env-vars']
if 'env-vars' in request.json else {})
experiment_config = get_config(request.json['experiment'])

k8s_volume = experiment_config['k8s_volume']
if request.json.get('cvmfs_mounts'):
cvmfs_repos = request.json.get('cvmfs_mounts')
else:
cvmfs_repos = []

job_id = str(uuid.uuid4())

job_obj = k8s.create_job(job_id,
request.json['docker-img'],
cmd,
[(k8s_volume, '/data')],
cvmfs_repos,
env_vars,
request.json['experiment'])
request.json['experiment'],
shared_file_system=True)

if job_obj:
job = copy.deepcopy(request.json)
Expand Down
92 changes: 0 additions & 92 deletions reana_job_controller/config_template.json

This file was deleted.

34 changes: 26 additions & 8 deletions reana_job_controller/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time
import pykube
from six.moves.urllib.parse import urlencode
import volume_templates

api = pykube.HTTPClient(pykube.KubeConfig.from_service_account())
api.session.verify = False
Expand All @@ -12,7 +13,17 @@ def get_jobs():
filter(namespace=pykube.all)]


def create_job(job_id, docker_img, cmd, volumes, env_vars, namespace):
def add_shared_volume(job, namespace):
volume = volume_templates.get_k8s_cephfs_volume(namespace)
mount_path = volume_templates.CEPHFS_MOUNT_PATH
job['spec']['template']['spec']['containers'][0]['volumeMounts'].append(
{'name': volume['name'], 'mountPath': mount_path}
)
job['spec']['template']['spec']['volumes'].append(volume)


def create_job(job_id, docker_img, cmd, cvmfs_repos, env_vars, namespace,
shared_file_system):
job = {
'kind': 'Job',
'apiVersion': 'batch/v1',
Expand All @@ -30,31 +41,38 @@ def create_job(job_id, docker_img, cmd, volumes, env_vars, namespace):
'containers': [
{
'name': job_id,
'image': docker_img
'image': docker_img,
'env': [],
'volumeMounts': []
},
],
'volumes': [],
'restartPolicy': 'OnFailure'
}
}
}
}

import shlex
if cmd:
import shlex
(job['spec']['template']['spec']['containers']
[0]['command']) = shlex.split(cmd)

if env_vars:
job['spec']['template']['spec']['containers'][0]['env'] = []
for var, value in env_vars.items():
job['spec']['template']['spec']['containers'][0]['env'].append(
{'name': var, 'value': value}
)

if volumes:
job['spec']['template']['spec']['containers'][0]['volumeMounts'] = []
job['spec']['template']['spec']['volumes'] = []
for volume, mount_path in volumes:
if shared_file_system:
add_shared_volume(job, namespace)

if cvmfs_repos:
for num, repo in enumerate(cvmfs_repos):
volume = volume_templates.get_k8s_cvmfs_volume(namespace, repo)
mount_path = volume_templates.get_cvmfs_mount_point(repo)

volume['name'] += '-{}'.format(num)
(job['spec']['template']['spec']['containers'][0]
['volumeMounts'].append(
{'name': volume['name'], 'mountPath': mount_path}
Expand Down
87 changes: 87 additions & 0 deletions reana_job_controller/volume_templates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import json
from string import Template

CEPH_SECRET_NAME = 'ceph-secret'

CEPHFS_PATHS = {
'alice': '/k8s/alice',
'atlas': '/k8s/atlas',
'cms': '/k8s/cms',
'lhcb': '/k8s/lhcb',
'recast': '/k8s/recast'
}

CVMFS_REPOSITORIES = {
'alice': 'alice.cern.ch',
'alice-ocdb': 'alice-ocdb.cern.ch',
'atlas': 'atlas.cern.ch',
'atlas-condb': 'atlas-condb.cern.ch',
'cms': 'cms.cern.ch',
'lhcb': 'lhcb.cern.ch',
'na61': 'na61.cern.ch',
'boss': 'boss.cern.ch',
'grid': 'grid.cern.ch',
'sft': 'sft.cern.ch',
'geant4': 'geant4.cern.ch'
}

CEPHFS_MOUNT_PATH = '/data'

k8s_cephfs_template = Template("""{
"name": "cephfs-$experiment",
"cephfs": {
"monitors": [
"128.142.36.227:6790",
"128.142.39.77:6790",
"128.142.39.144:6790"
],
"path": "$path",
"user": "k8s",
"secretRef": {
"name": "$secret_name",
"readOnly": false
}
}
}""")

k8s_cvmfs_template = Template("""{
"name": "cvmfs-$experiment",
"flexVolume": {
"driver": "cern/cvmfs",
"options": {
"repository": "$repository"
}
}
}""")


def get_cvmfs_mount_point(repository_name):
return '/cvmfs/{repository}'.format(
repository=CVMFS_REPOSITORIES[repository_name]
)


def get_k8s_cephfs_volume(experiment):
"""Render k8s CephFS volume template
:param experiment: Experiment name.
:returns: k8s CephFS volume spec as a dictionary.
"""
return json.loads(
k8s_cephfs_template.substitute(experiment=experiment,
path=CEPHFS_PATHS[experiment],
secret_name=CEPH_SECRET_NAME)
)


def get_k8s_cvmfs_volume(experiment, repository):
"""Render k8s CVMFS volume template
:param experiment: Experiment name.
:returns: k8s CVMFS volume spec as a dictionary.
"""
if repository in CVMFS_REPOSITORIES:
return json.loads(k8s_cvmfs_template.substitute(experiment=experiment,
repository=repository))
else:
raise ValueError('The provided repository doesn\'t exist')

0 comments on commit ca240c3

Please sign in to comment.