diff --git a/examples/knative-builder/main.py b/examples/knative-builder/main.py index 6019e176..427dfa2e 100644 --- a/examples/knative-builder/main.py +++ b/examples/knative-builder/main.py @@ -28,7 +28,7 @@ import fairing from fairing import builders -from fairing.training import native +from fairing.training import kubernetes DOCKER_REPOSITORY_NAME = '' fairing.config.set_builder(builders.KnativeBuilder(repository=DOCKER_REPOSITORY_NAME)) @@ -46,7 +46,7 @@ 'tensorflow/mnist/logs/fully_connected_feed/', os.getenv('HOSTNAME', '')) MODEL_DIR = os.path.join(LOG_DIR, 'model.ckpt') -@native.Training() +@kubernetes.Training() class MyModel(object): def train(self): self.data_sets = input_data.read_data_sets(INPUT_DATA_DIR) diff --git a/examples/simple-training/main.py b/examples/simple-training/main.py index a1caa2f2..73793a68 100644 --- a/examples/simple-training/main.py +++ b/examples/simple-training/main.py @@ -29,7 +29,7 @@ import fairing from fairing import builders -from fairing.training import native +from fairing.training import kubernetes DOCKER_REPOSITORY_NAME = '' fairing.config.set_builder(builders.DockerBuilder(DOCKER_REPOSITORY_NAME)) @@ -47,7 +47,7 @@ 'tensorflow/mnist/logs/fully_connected_feed/', os.getenv('HOSTNAME', '')) MODEL_DIR = os.path.join(LOG_DIR, 'model.ckpt') -@native.Training() +@kubernetes.Training() class MyModel(object): def train(self): self.data_sets = input_data.read_data_sets(INPUT_DATA_DIR) diff --git a/fairing/backend/kubernetes/manager.py b/fairing/backend/kubernetes/manager.py index 9705709f..5dac254e 100644 --- a/fairing/backend/kubernetes/manager.py +++ b/fairing/backend/kubernetes/manager.py @@ -23,6 +23,14 @@ class KubeManager(object): def __init__(self): config.load_kube_config() + + def create_config_map(self, namespace, config_map): + """Creates a V1ConfigMap in the specified namespace""" + api_instance = client.CoreV1Api() + api_instance.create_namespaced_config_map( + namespace, + config_map + ) def create_job(self, namespace, job): """Creates a V1Job in the specified namespace""" diff --git a/fairing/builders/builder.py b/fairing/builders/builder.py index 12029769..2e6c586b 100644 --- a/fairing/builders/builder.py +++ b/fairing/builders/builder.py @@ -12,13 +12,9 @@ class BuilderInterface(object): @abc.abstractmethod - def execute(self): - """Will be called when the build needs to start""" + def execute(self, namespace, job_id, base_image): + """Will be called when the build needs to start, + This method should return a V1PodSpec with the correct image set. + This is also where the builder should set the environment variables + and volume/volumeMounts that it may need to work""" raise NotImplementedError('BuilderInterface.execute') - - @abc.abstractmethod - def generate_pod_spec(self): - """This method should return a V1PodSpec with the correct image set. - This is also where the builder should set the environment variables - and volume/volumeMounts that it may need to work""" - raise NotImplementedError('BuilderInterface.generate_pod_spec') diff --git a/fairing/builders/config_maps.py b/fairing/builders/config_maps.py new file mode 100644 index 00000000..2977cbef --- /dev/null +++ b/fairing/builders/config_maps.py @@ -0,0 +1,61 @@ +import os +import subprocess +import tempfile +import shutil + +from kubernetes import client + +from fairing import notebook_helper +from .builder import BuilderInterface +from fairing.backend import kubernetes + + +class ConfigMapBuilder(BuilderInterface): + + def __init__(self, notebook_name=None): + if notebook_name is None: + self.notebook_name = notebook_helper.get_notebook_name() + else: + self.notebook_name = notebook_name + + def execute(self, namespace, job_id, base_image): + nb_full_path = os.path.join(os.getcwd(), self.notebook_name) + temp_dir = tempfile.mkdtemp() + code_path = os.path.join(temp_dir, 'code.py') + try: + cmd = "jupyter nbconvert --to python {nb_path} --output {output}" + .format( + nb_path=nb_full_path, + output=code_path + ).split() + + subprocess.check_call(cmd) + with open(code_path, 'rb') as f: + code = f.read() + finally: + shutil.rmtree(temp_dir, ignore_errors=True) + + client.V1ConfigMap( + api_version="v1", + data={"code.py": code}, + metadata=client.V1ObjectMeta(name=self.job_id) + ) + k8s = kubernetes.KubeManager() + k8s.create_config_map(namespace, config_map) + return self._generate_pod_spec(job_id, base_image) + + def _generate_pod_spec(self, job_id, base_image): + volume_name = 'code' + return client.V1PodSpec( + containers=[client.V1Container( + name='model', + image=base_image, + command="python /code/code.py".split(), + volume_mounts=[client.V1VolumeMount(name=volume_name, mount_path='/code')] + )], + restart_policy='Never', + volumes=client.V1Volume( + name=volume_name, + config_map=client.V1ConfigMapVolumeSource(name=job_id) + ) + ) diff --git a/fairing/builders/docker_builder.py b/fairing/builders/docker_builder.py index e7abf072..16507716 100644 --- a/fairing/builders/docker_builder.py +++ b/fairing/builders/docker_builder.py @@ -6,11 +6,9 @@ from future import standard_library standard_library.install_aliases() -import shutil import os import json import logging -import sys from docker import APIClient from kubernetes import client @@ -29,12 +27,10 @@ def __init__(self, repository, image_name=DEFAULT_IMAGE_NAME, image_tag=None, - base_image=None, dockerfile_path=None): self.repository = repository self.image_name = image_name - self.base_image = base_image self.dockerfile_path = dockerfile_path if image_tag is None: @@ -48,7 +44,7 @@ def __init__(self, ) self.docker_client = None - def generate_pod_spec(self): + def _generate_pod_spec(self, job_id): """return a V1PodSpec initialized with the proper container""" return client.V1PodSpec( @@ -59,15 +55,17 @@ def generate_pod_spec(self): restart_policy='Never' ) - def execute(self): + def execute(self, namespace, job_id, base_image): write_dockerfile( dockerfile_path=self.dockerfile_path, - base_image=self.base_image) + base_image=base_image + ) self.docker_client = APIClient(version='auto') - self.build() - self.publish() + self._build() + self._publish() + return self._generate_pod_spec(job_id) - def build(self): + def _build(self): logger.warn('Building docker image {}...'.format(self.full_image_name)) bld = self.docker_client.build( path='.', @@ -78,7 +76,7 @@ def build(self): for line in bld: self._process_stream(line) - def publish(self): + def _publish(self): logger.warn('Publishing image {}...'.format(self.full_image_name)) for line in self.docker_client.push(self.full_image_name, stream=True): self._process_stream(line) diff --git a/fairing/builders/dockerfile.py b/fairing/builders/dockerfile.py index 6b9e5d46..7d1e621e 100644 --- a/fairing/builders/dockerfile.py +++ b/fairing/builders/dockerfile.py @@ -44,7 +44,7 @@ def get_default_base_image(): return '{uname}/fairing:dev'.format(uname=uname) return 'library/python:3.6' -def generate_dockerfile( base_image): +def generate_dockerfile(base_image): if base_image is None: base_image = get_default_base_image() @@ -70,11 +70,6 @@ def get_mandatory_steps(): ] return steps -# def get_env_steps(self, env): -# if env: -# return ["ENV {} {}".format(e['name'], e['value']) for e in env] -# return [] - def write_dockerfile(destination='Dockerfile', dockerfile_path=None, base_image=None): if dockerfile_path is not None: shutil.copy(dockerfile_path, destination) diff --git a/fairing/builders/knative/knative.py b/fairing/builders/knative/knative.py index d2aed365..365f1fb9 100644 --- a/fairing/builders/knative/knative.py +++ b/fairing/builders/knative/knative.py @@ -26,14 +26,12 @@ def __init__(self, repository, image_name=DEFAULT_IMAGE_NAME, image_tag=None, - base_image=None, dockerfile_path=None): self.repository = repository self.image_name = image_name - self.base_image = base_image self.dockerfile_path = dockerfile_path - self.namespace = self.get_current_namespace() + self.namespace = self._get_current_namespace() if image_tag is None: self.image_tag = utils.get_unique_tag() @@ -43,16 +41,17 @@ def __init__(self, # Unique build_id to avoid conflicts self._build_id = utils.get_unique_tag() - def execute(self): + def execute(self, namespace, job_id, base_image): dockerfile.write_dockerfile( dockerfile_path=self.dockerfile_path, - base_image=self.base_image + base_image=base_image ) - self.copy_src_to_mount_point() - self.build_and_push() + self._copy_src_to_mount_point() + self._build_and_push() + return self._generate_pod_spec() - def generate_pod_spec(self): + def _generate_pod_spec(self): """return a V1PodSpec initialized with the proper container""" return client.V1PodSpec( containers=[client.V1Container( @@ -66,15 +65,15 @@ def generate_pod_spec(self): restart_policy='Never' ) - def copy_src_to_mount_point(self): + def _copy_src_to_mount_point(self): context_dir = os.getcwdu() - dst = os.path.join(self.get_mount_point(), self._build_id) + dst = os.path.join(self._get_mount_point(), self._build_id) shutil.copytree(context_dir, dst) - def get_mount_point(self): + def _get_mount_point(self): return os.path.join(os.environ['HOME'], '.fairing/build-contexts/') - def build_and_push(self): + def _build_and_push(self): logger.warn( 'Building docker image {repository}/{image}:{tag}...'.format( repository=self.repository, @@ -82,22 +81,22 @@ def build_and_push(self): tag=self.image_tag ) ) - self.authenticate() + self._authenticate() - build_template = self.generate_build_template_resource() + build_template = self._generate_build_template_resource() build_template.maybe_create() - build = self.generate_build_resource() + build = self._generate_build_resource() build.create_sync() # TODO: clean build? - def authenticate(self): + def _authenticate(self): if utils.is_running_in_k8s(): config.load_incluster_config() else: config.load_kube_config() - def get_current_namespace(self): + def _get_current_namespace(self): if not utils.is_running_in_k8s(): logger.debug("""Fairing does not seem to be running inside Kubernetes, cannot infer namespace. Using namespaces 'fairing' @@ -105,7 +104,7 @@ def get_current_namespace(self): return 'fairing' return utils.get_current_k8s_namespace() - def generate_build_template_resource(self): + def _generate_build_template_resource(self): metadata = client.V1ObjectMeta( name='fairing-build', namespace=self.namespace) @@ -133,7 +132,7 @@ def generate_build_template_resource(self): parameters=params, steps=steps, volumes=[volume]) return BuildTemplate(metadata=metadata, spec=spec) - def generate_build_resource(self): + def _generate_build_resource(self): metadata = client.V1ObjectMeta( name='fairing-build-{}'.format(self.image_name), namespace=self.namespace) diff --git a/fairing/notebook_helper.py b/fairing/notebook_helper.py index a2e61515..643465ee 100644 --- a/fairing/notebook_helper.py +++ b/fairing/notebook_helper.py @@ -27,12 +27,10 @@ def get_notebook_name(): if nn['kernel']['id'] == kernel_id: full_path = nn['notebook']['path'] return os.path.basename(full_path) - - return f def is_in_notebook(): try: ipykernel.get_connection_info() except RuntimeError: return False - return True \ No newline at end of file + return True diff --git a/fairing/training/kubeflow/deployment.py b/fairing/training/kubeflow/deployment.py index 47d66f0e..c82d2067 100644 --- a/fairing/training/kubeflow/deployment.py +++ b/fairing/training/kubeflow/deployment.py @@ -1,11 +1,11 @@ from kubernetes import client as k8s_client -from ..native import deployment +from ..kubernetes import deployment -class KubeflowDeployment(deployment.NativeDeployment): +class KubeflowDeployment(deployment.KubernetesDeployment): - def __init__(self, namespace, runs, distribution): - super(KubeflowDeployment, self).__init__(namespace, runs) + def __init__(self, namespace, runs, distribution, base_image=None): + super(KubeflowDeployment, self).__init__(namespace, runs, base_image) self.distribution = distribution def deploy(self): @@ -37,7 +37,7 @@ def generate_job(self, pod_template_spec): tf_job = {} tf_job['kind'] = 'TFJob' tf_job['apiVersion'] = 'kubeflow.org/v1alpha2' - tf_job['metadata'] = k8s_client.V1ObjectMeta(name=self.name) + tf_job['metadata'] = k8s_client.V1ObjectMeta(name=self.job_id) tf_job['spec'] = spec return tf_job @@ -49,4 +49,4 @@ def set_container_name(self, pod_template_spec): def get_logs(self): selector='tf-replica-index=0,tf-replica-type=worker' - self.backend.log(self.name, self.namespace, selector) \ No newline at end of file + self.backend.log(self.job_id, self.namespace, selector) diff --git a/fairing/training/native/__init__.py b/fairing/training/kubernetes/__init__.py similarity index 74% rename from fairing/training/native/__init__.py rename to fairing/training/kubernetes/__init__.py index dba094b4..6bfe181a 100644 --- a/fairing/training/native/__init__.py +++ b/fairing/training/kubernetes/__init__.py @@ -5,6 +5,6 @@ from future import standard_library standard_library.install_aliases() -from .deployment import NativeDeployment -from .runtime import BasicNativeRuntime +from .deployment import KubernetesDeployment +from .runtime import BasicKubernetesRuntime from .decorators import Training diff --git a/fairing/training/native/decorators.py b/fairing/training/kubernetes/decorators.py similarity index 82% rename from fairing/training/native/decorators.py rename to fairing/training/kubernetes/decorators.py index 5bd7f15b..24e915bc 100644 --- a/fairing/training/native/decorators.py +++ b/fairing/training/kubernetes/decorators.py @@ -9,8 +9,8 @@ import logging from fairing.training import base -from .deployment import NativeDeployment -from .runtime import BasicNativeRuntime +from .deployment import KubernetesDeployment +from .runtime import BasicKubernetesRuntime logger = logging.getLogger(__name__) @@ -21,9 +21,10 @@ class Training(base.TrainingDecoratorInterface): namespace {string} -- (optional) here the training should be deployed """ - def __init__(self, namespace=None): + def __init__(self, namespace=None, base_image=None): self.namespace = namespace self.runs = 1 + self.base_image = base_image def _validate(self, user_object): """TODO: Verify that the training conforms to what is expected from @@ -31,11 +32,11 @@ def _validate(self, user_object): pass def _train(self, user_object): - runtime = BasicNativeRuntime() + runtime = BasicKubernetesRuntime() runtime.execute(user_object) def _deploy(self, user_object): - deployment = NativeDeployment(self.namespace, self.runs) + deployment = KubernetesDeployment(self.namespace, self.runs, self.base_image) deployment.execute() diff --git a/fairing/training/native/deployment.py b/fairing/training/kubernetes/deployment.py similarity index 83% rename from fairing/training/native/deployment.py rename to fairing/training/kubernetes/deployment.py index efad3cf8..15b4488e 100644 --- a/fairing/training/native/deployment.py +++ b/fairing/training/kubernetes/deployment.py @@ -16,7 +16,7 @@ logger = logging.getLogger(__name__) DEFAULT_JOB_NAME = 'fairing-job' -class NativeDeployment(object): +class KubernetesDeployment(object): """Handle all the k8s' template building for a training Attributes: namespace: k8s namespace where the training's components @@ -25,22 +25,22 @@ class NativeDeployment(object): will generate multiple jobs. """ - def __init__(self, namespace, runs): + def __init__(self, namespace, runs, base_image): if namespace is None: self.namespace = utils.get_default_target_namespace() else: self.namespace = namespace - # Used as pod and job name - self.name = "{}-{}".format(DEFAULT_JOB_NAME, utils.get_unique_tag()) + self.job_id = "{}-{}".format(DEFAULT_JOB_NAME, utils.get_unique_tag()) self.job_spec = None self.runs = runs + self.base_image = base_image self.builder = config.get_builder() self.backend = kubernetes.KubeManager() def execute(self): - pod_spec = self.builder.generate_pod_spec() + pod_spec = self.build() #self.builder.generate_pod_spec(self.job_id) pod_template_spec = self.generate_pod_template_spec(pod_spec) #TODO: @@ -53,12 +53,13 @@ def execute(self): #TODO: if needed, can be an extra validation step for the final template #self.validate(job_spec) - # Actually build and push the image, or generate ConfigMaps - self.builder.execute() self.deploy() logger.warn("Training(s) launched.") self.get_logs() + def build(self): + return self.builder.execute(self.namespace, self.job_id, self.base_image) + def generate_pod_template_spec(self, pod_spec): """Generate a V1PodTemplateSpec initiazlied with correct metadata and with the provided pod_spec""" @@ -66,9 +67,9 @@ def generate_pod_template_spec(self, pod_spec): raise TypeError('pod_spec must be a V1PodSpec, but got %s' % type(pod_spec)) labels = {} - labels['fairing-job-id'] = self.name + labels['fairing-job-id'] = self.job_id return k8s_client.V1PodTemplateSpec( - metadata=k8s_client.V1ObjectMeta(name=self.name, labels=labels), + metadata=k8s_client.V1ObjectMeta(name=self.job_id, labels=labels), spec=pod_spec) def generate_job(self, pod_template_spec): @@ -85,13 +86,13 @@ def generate_job(self, pod_template_spec): return k8s_client.V1Job( metadata=k8s_client.V1ObjectMeta( - name=self.name + name=self.job_id ), spec=job_spec ) def get_logs(self): - self.backend.log(self.name, self.namespace) + self.backend.log(self.job_id, self.namespace) def deploy(self): """Handles communication with kubeclient to deploy diff --git a/fairing/training/native/runtime.py b/fairing/training/kubernetes/runtime.py similarity index 93% rename from fairing/training/native/runtime.py rename to fairing/training/kubernetes/runtime.py index 013460cf..367c44c3 100644 --- a/fairing/training/native/runtime.py +++ b/fairing/training/kubernetes/runtime.py @@ -5,7 +5,7 @@ from future import standard_library standard_library.install_aliases() -class BasicNativeRuntime(object): +class BasicKubernetesRuntime(object): """BasicNativeRuntime represents the behavior of the code while training during a simple training job"""