From 95ddfa88a0d238b20b703d63d2ab5f635158bc48 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Mon, 11 Sep 2017 11:05:10 -0400 Subject: [PATCH 01/14] implement default PS image --- examples/tf_job.yaml | 4 +++- images/default-ps/Dockerfile | 5 ++++ images/default-ps/build_and_push.sh | 22 ++++++++++++++++++ images/default-ps/main.py | 22 ++++++++++++++++++ pkg/spec/tf_job.go | 36 ++++++++++++++++++++++------- pkg/trainer/replicas.go | 11 ++++----- 6 files changed, 85 insertions(+), 15 deletions(-) create mode 100644 images/default-ps/Dockerfile create mode 100755 images/default-ps/build_and_push.sh create mode 100644 images/default-ps/main.py diff --git a/examples/tf_job.yaml b/examples/tf_job.yaml index 6955973094..30d26c4be4 100644 --- a/examples/tf_job.yaml +++ b/examples/tf_job.yaml @@ -19,4 +19,6 @@ spec: containers: - image: gcr.io/tf-on-k8s-dogfood/tf_sample:dc944ff name: tensorflow - restartPolicy: OnFailure \ No newline at end of file + restartPolicy: OnFailure + - replicas: 2 + tfReplicaType: PS \ No newline at end of file diff --git a/images/default-ps/Dockerfile b/images/default-ps/Dockerfile new file mode 100644 index 0000000000..ce336031d6 --- /dev/null +++ b/images/default-ps/Dockerfile @@ -0,0 +1,5 @@ +ARG BASE_IMAGE=tensorflow/tensorflow +FROM $BASE_IMAGE +RUN mkdir /app +COPY ./main.py /app/main.py +CMD ["python", "/app/main.py"] \ No newline at end of file diff --git a/images/default-ps/build_and_push.sh b/images/default-ps/build_and_push.sh new file mode 100755 index 0000000000..4cb1e662a6 --- /dev/null +++ b/images/default-ps/build_and_push.sh @@ -0,0 +1,22 @@ +#!/bin/bash +set -e + +SRC_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" +ROOT_DIR=${SRC_DIR}/../../ + +#. ${ROOT_DIR}/config.sh +REGISTRY="wbuchwalter" +TAGS=("1.1.0" "1.2.0" "1.2.1" "1.3.0" "latest") + +for i in "${TAGS[@]}" +do + BASEIMAGE="tensorflow/tensorflow:$i" + IMAGE="${REGISTRY}/mlkube-tensorflow-ps:$i" + + DIR=`mktemp -d` + echo Use ${DIR} as context + cp ${SRC_DIR}/Dockerfile ${DIR}/Dockerfile + cp ${SRC_DIR}/main.py ${DIR}/main.py + docker build -t $IMAGE --build-arg BASE_IMAGE=$BASEIMAGE -f ${DIR}/Dockerfile ${DIR} + docker push $IMAGE +done \ No newline at end of file diff --git a/images/default-ps/main.py b/images/default-ps/main.py new file mode 100644 index 0000000000..2ff27ed1de --- /dev/null +++ b/images/default-ps/main.py @@ -0,0 +1,22 @@ +# A very simple parameter server that joins the server defined by the cluster spec passed as environment variable + +import tensorflow as tf +import os +import json + +tf_config_json = os.environ.get("TF_CONFIG", "{}") +tf_config = json.loads(tf_config_json) +task = tf_config.get("task", {}) +cluster_spec = tf_config.get("cluster", {}) +cluster_spec_object = tf.train.ClusterSpec(cluster_spec) +job_name = task["type"] +task_id = task["index"] +server_def = tf.train.ServerDef( + cluster=cluster_spec_object.as_cluster_def(), + protocol="grpc", + job_name=job_name, + task_index=task_id) +server = tf.train.Server(server_def) + +if job_name == 'ps': + server.join() diff --git a/pkg/spec/tf_job.go b/pkg/spec/tf_job.go index 79950f131a..946a165650 100644 --- a/pkg/spec/tf_job.go +++ b/pkg/spec/tf_job.go @@ -6,23 +6,23 @@ import ( "fmt" "time" + "github.com/golang/protobuf/proto" + "github.com/jlewi/mlkube.io/pkg/util" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/pkg/api/v1" - "github.com/jlewi/mlkube.io/pkg/util" - "github.com/golang/protobuf/proto" ) const ( - CRDKind = "TfJob" - CRDKindPlural = "tfjobs" - CRDGroup = "mlkube.io" - CRDVersion = "v1beta1" + CRDKind = "TfJob" + CRDKindPlural = "tfjobs" + CRDGroup = "mlkube.io" + CRDVersion = "v1beta1" // Value of the APP label that gets applied to a lot of entities. AppLabel = "tensorflow-job" // Defaults for the Spec - TfPort = 2222 + TfPort = 2222 Replicas = 1 ) @@ -76,6 +76,7 @@ type ContainerName string const ( TENSORFLOW ContainerName = "tensorflow" + PsDefaultImage = "wbuchwalter/mlkube-tensorflow-ps" ) // TODO(jlewi): We probably want to add a name field. This would allow us to have more than 1 type of each worker. @@ -91,6 +92,8 @@ type TfReplicaSpec struct { // TfPort is the port to use for TF services. TfPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"` TfReplicaType `json:"tfReplicaType"` + //TfVersion is only used when TfReplicaType == PS to automatically start a PS server + TfVersion *string `json:"tfVersion"` } type TensorBoardSpec struct { @@ -106,10 +109,14 @@ func (c *TfJobSpec) Validate() error { // Check that each replica has a TensorFlow container. for _, r := range c.ReplicaSpecs { found := false - if r.Template == nil { + if r.Template == nil && r.TfReplicaType != PS { return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) } + if r.TfReplicaType == PS && r.Template == nil && r.TfVersion == nil { + return errors.New("PS must either have TfVersion or Template specified.") + } + if r.TfReplicaType == MASTER && *r.Replicas != 1 { return errors.New("The MASTER must have Replicas = 1") } @@ -222,6 +229,19 @@ func (c *TfJobSpec) SetDefaults() error { if r.Replicas == nil { r.Replicas = proto.Int32(Replicas) } + + if r.TfReplicaType == PS && r.Template == nil{ + r.Template = &v1.PodTemplateSpec{ + Spec: &v1.PodSpec{ + Containers: []v1.Container{ + &v1.Container{ + Image: fmt.Sprintf("%s:%s", PsDefaultImage, r.TfVersion) + Name: "tensorflow-ps" + } + } + } + } + } } return nil } diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index cec8ac0990..fa3763bb82 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -11,13 +11,13 @@ import ( log "github.com/golang/glog" "github.com/golang/protobuf/proto" // TOOO(jlewi): Rename to apiErrors + "github.com/jlewi/mlkube.io/pkg/util" k8s_errors "k8s.io/apimachinery/pkg/api/errors" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sErrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" - k8sErrors "k8s.io/apimachinery/pkg/util/errors" batch "k8s.io/client-go/pkg/apis/batch/v1" - "github.com/jlewi/mlkube.io/pkg/util" ) // TFReplicaSet is a set of TF processes all acting as the same role (e.g. worker @@ -53,8 +53,8 @@ func NewTFReplicaSet(clientSet kubernetes.Interface, tfReplicaSpec spec.TfReplic return nil, errors.New("tfReplicaSpec.TfPort can't be nil.") } - if tfReplicaSpec.Template == nil { - return nil, errors.New("tfReplicaSpec.Template can't be nil.") + if tfReplicaSpec.Template == nil && tfReplicaSpec.TfReplicaType != spec.PS { + return nil, fmt.Errorf("tfReplicaSpec.Template can't be nil for replica type %v.", tfReplicaSpec.TfReplicaType) } // Make sure the replica type is valid. @@ -71,6 +71,7 @@ func NewTFReplicaSet(clientSet kubernetes.Interface, tfReplicaSpec spec.TfReplic if !isValidReplicaType { return nil, fmt.Errorf("tfReplicaSpec.TfReplicaType is %v but must be one of %v", tfReplicaSpec.TfReplicaType, validReplicaTypes) } + return &TFReplicaSet{ ClientSet: clientSet, Job: job, @@ -277,7 +278,6 @@ func replicaStatusFromPodList(l v1.PodList, name spec.ContainerName) spec.Replic } } - if tfState.Running != nil || tfState.Waiting != nil { return spec.ReplicaStateRunning } @@ -287,7 +287,6 @@ func replicaStatusFromPodList(l v1.PodList, name spec.ContainerName) spec.Replic return spec.ReplicaStateSucceeded } - if isRetryableTerminationState(tfState.Terminated) { // Since its a retryable error just return RUNNING. // We can just let Kubernetes restart the container to retry. From 69ac7cc3a16f2d573ff6df430f5b1d7192cb87d5 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Tue, 12 Sep 2017 10:24:41 -0400 Subject: [PATCH 02/14] Add default PS PodSpec --- pkg/spec/tf_job.go | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/spec/tf_job.go b/pkg/spec/tf_job.go index 946a165650..7cf6f7a576 100644 --- a/pkg/spec/tf_job.go +++ b/pkg/spec/tf_job.go @@ -75,8 +75,8 @@ const ( type ContainerName string const ( - TENSORFLOW ContainerName = "tensorflow" - PsDefaultImage = "wbuchwalter/mlkube-tensorflow-ps" + TENSORFLOW ContainerName = "tensorflow" + PsDefaultImage = "wbuchwalter/mlkube-tensorflow-ps" ) // TODO(jlewi): We probably want to add a name field. This would allow us to have more than 1 type of each worker. @@ -214,8 +214,8 @@ func (c *TfJobSpec) ConfigureAccelerators(accelerators map[string]AcceleratorCon func (c *TfJobSpec) SetDefaults() error { // Check that each replica has a TensorFlow container. for _, r := range c.ReplicaSpecs { - if r.Template == nil { - return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) + if r.Template == nil && r.TfReplicaType != PS { + return fmt.Errorf("ReplicaType: %v, Replica is missing Template; %v", r.TfReplicaType, util.Pformat(r)) } if r.TfPort == nil { @@ -230,16 +230,17 @@ func (c *TfJobSpec) SetDefaults() error { r.Replicas = proto.Int32(Replicas) } - if r.TfReplicaType == PS && r.Template == nil{ + if r.Template == nil && r.TfReplicaType == PS { r.Template = &v1.PodTemplateSpec{ - Spec: &v1.PodSpec{ + Spec: v1.PodSpec{ Containers: []v1.Container{ - &v1.Container{ - Image: fmt.Sprintf("%s:%s", PsDefaultImage, r.TfVersion) - Name: "tensorflow-ps" - } - } - } + v1.Container{ + Image: fmt.Sprintf("%s:%s", PsDefaultImage, *r.TfVersion), + Name: "tensorflow", + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, } } } From c9f2d660d98bbc9350ec9725957970487340e493 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Tue, 12 Sep 2017 11:21:14 -0400 Subject: [PATCH 03/14] add tests --- pkg/spec/tf_job.go | 6 +++--- pkg/spec/tf_job_test.go | 39 +++++++++++++++++++++++++++++++++++++-- test/e2e/main.go | 12 +----------- 3 files changed, 41 insertions(+), 16 deletions(-) diff --git a/pkg/spec/tf_job.go b/pkg/spec/tf_job.go index 7cf6f7a576..f1aa44b96c 100644 --- a/pkg/spec/tf_job.go +++ b/pkg/spec/tf_job.go @@ -93,7 +93,7 @@ type TfReplicaSpec struct { TfPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"` TfReplicaType `json:"tfReplicaType"` //TfVersion is only used when TfReplicaType == PS to automatically start a PS server - TfVersion *string `json:"tfVersion"` + TfVersion string `json:"tfVersion,omitempty"` } type TensorBoardSpec struct { @@ -113,7 +113,7 @@ func (c *TfJobSpec) Validate() error { return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) } - if r.TfReplicaType == PS && r.Template == nil && r.TfVersion == nil { + if r.TfReplicaType == PS && r.Template == nil && r.TfVersion == "" { return errors.New("PS must either have TfVersion or Template specified.") } @@ -235,7 +235,7 @@ func (c *TfJobSpec) SetDefaults() error { Spec: v1.PodSpec{ Containers: []v1.Container{ v1.Container{ - Image: fmt.Sprintf("%s:%s", PsDefaultImage, *r.TfVersion), + Image: fmt.Sprintf("%s:%s", PsDefaultImage, r.TfVersion), Name: "tensorflow", }, }, diff --git a/pkg/spec/tf_job_test.go b/pkg/spec/tf_job_test.go index c8db5566a3..ed3a1cdc21 100644 --- a/pkg/spec/tf_job_test.go +++ b/pkg/spec/tf_job_test.go @@ -1,13 +1,14 @@ package spec import ( + "fmt" "reflect" "testing" "github.com/gogo/protobuf/proto" + "github.com/jlewi/mlkube.io/pkg/util" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/client-go/pkg/api/v1" - "github.com/jlewi/mlkube.io/pkg/util" ) func TestAddAccelertor(t *testing.T) { @@ -238,6 +239,9 @@ func TestSetDefaults(t *testing.T) { expected *TfJobSpec } + defaultPsImage := "wbuchwalter/mlkube-tensorflow-ps" + tag := "1.3.0" + testCases := []testCase{ { in: &TfJobSpec{ @@ -274,6 +278,37 @@ func TestSetDefaults(t *testing.T) { }, }, }, + { + in: &TfJobSpec{ + ReplicaSpecs: []*TfReplicaSpec{ + { + TfReplicaType: PS, + TfVersion: tag, + }, + }, + }, + expected: &TfJobSpec{ + ReplicaSpecs: []*TfReplicaSpec{ + { + Replicas: proto.Int32(1), + TfPort: proto.Int32(2222), + Template: &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "tensorflow", + Image: fmt.Sprintf("%s:%s", defaultPsImage, tag), + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + }, + TfReplicaType: PS, + TfVersion: tag, + }, + }, + }, + }, } for _, c := range testCases { @@ -284,4 +319,4 @@ func TestSetDefaults(t *testing.T) { t.Errorf("Want\n%v; Got\n %v", util.Pformat(c.expected), util.Pformat(c.in)) } } -} \ No newline at end of file +} diff --git a/test/e2e/main.go b/test/e2e/main.go index 65389cd57f..8151b7fd3c 100644 --- a/test/e2e/main.go +++ b/test/e2e/main.go @@ -73,17 +73,7 @@ func run() error { Replicas: proto.Int32(1), TfPort: proto.Int32(2222), TfReplicaType: spec.PS, - Template: &v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - { - Name: "tensorflow", - Image: *image, - }, - }, - RestartPolicy: v1.RestartPolicyOnFailure, - }, - }, + TfVersion: "1.3.0", }, { Replicas: proto.Int32(1), From de8f8f42ab7d281d99a69610ab041a5d917ca302 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Thu, 14 Sep 2017 15:45:34 -0400 Subject: [PATCH 04/14] PS: Use ConfigMap instead of custom image --- .../grpc_tensorflow_server.py | 159 ++++++++++++++++++ grpc_tensorflow_server/start_server.py | 37 ++++ images/default-ps/Dockerfile | 5 - images/default-ps/build_and_push.sh | 22 --- images/default-ps/main.py | 22 --- pkg/controller/controller.go | 89 ++++++++-- pkg/spec/tf_job.go | 36 +++- pkg/spec/tf_job_test.go | 31 +++- 8 files changed, 320 insertions(+), 81 deletions(-) create mode 100644 grpc_tensorflow_server/grpc_tensorflow_server.py create mode 100644 grpc_tensorflow_server/start_server.py delete mode 100644 images/default-ps/Dockerfile delete mode 100755 images/default-ps/build_and_push.sh delete mode 100644 images/default-ps/main.py diff --git a/grpc_tensorflow_server/grpc_tensorflow_server.py b/grpc_tensorflow_server/grpc_tensorflow_server.py new file mode 100644 index 0000000000..dd26c4509d --- /dev/null +++ b/grpc_tensorflow_server/grpc_tensorflow_server.py @@ -0,0 +1,159 @@ +#!/usr/bin/python +# Copyright 2016 The TensorFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +"""Python-based TensorFlow GRPC server. + +Takes input arguments cluster_spec, job_name and task_id, and start a blocking +TensorFlow GRPC server. + +Usage: + grpc_tensorflow_server.py --cluster_spec=SPEC --job_name=NAME --task_id=ID + +Where: + SPEC is (,)* + JOB is |(;)* + NAME is a valid job name ([a-z][0-9a-z]*) + HOST is a hostname or IP address + PORT is a port number +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import argparse +import sys + +from tensorflow.core.protobuf import config_pb2 +from tensorflow.core.protobuf import tensorflow_server_pb2 +from tensorflow.python.platform import app +from tensorflow.python.training import server_lib + + +def parse_cluster_spec(cluster_spec, cluster, verbose=False): + """Parse content of cluster_spec string and inject info into cluster protobuf. + + Args: + cluster_spec: cluster specification string, e.g., + "local|localhost:2222;localhost:2223" + cluster: cluster protobuf. + verbose: If verbose logging is requested. + + Raises: + ValueError: if the cluster_spec string is invalid. + """ + + job_strings = cluster_spec.split(",") + + if not cluster_spec: + raise ValueError("Empty cluster_spec string") + + for job_string in job_strings: + job_def = cluster.job.add() + + if job_string.count("|") != 1: + raise ValueError("Not exactly one instance of '|' in cluster_spec") + + job_name = job_string.split("|")[0] + + if not job_name: + raise ValueError("Empty job_name in cluster_spec") + + job_def.name = job_name + + if verbose: + print("Added job named \"%s\"" % job_name) + + job_tasks = job_string.split("|")[1].split(";") + for i in range(len(job_tasks)): + if not job_tasks[i]: + raise ValueError("Empty task string at position %d" % i) + + job_def.tasks[i] = job_tasks[i] + + if verbose: + print(" Added task \"%s\" to job \"%s\"" % (job_tasks[i], job_name)) + + +def main(unused_args): + # Create Protobuf ServerDef + server_def = tensorflow_server_pb2.ServerDef(protocol="grpc") + + # Cluster info + parse_cluster_spec(FLAGS.cluster_spec, server_def.cluster, FLAGS.verbose) + + # Job name + if not FLAGS.job_name: + raise ValueError("Empty job_name") + server_def.job_name = FLAGS.job_name + + # Task index + if FLAGS.task_id < 0: + raise ValueError("Invalid task_id: %d" % FLAGS.task_id) + server_def.task_index = FLAGS.task_id + + config = config_pb2.ConfigProto(gpu_options=config_pb2.GPUOptions( + per_process_gpu_memory_fraction=FLAGS.gpu_memory_fraction)) + + # Create GRPC Server instance + server = server_lib.Server(server_def, config=config) + + # join() is blocking, unlike start() + server.join() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.register("type", "bool", lambda v: v.lower() == "true") + parser.add_argument( + "--cluster_spec", + type=str, + default="", + help="""\ + Cluster spec: SPEC. SPEC is (,)*," JOB is + |(;)*," NAME is a valid job name + ([a-z][0-9a-z]*)," HOST is a hostname or IP address," PORT is a + port number." E.g., local|localhost:2222;localhost:2223, + ps|ps0:2222;ps1:2222\ + """ + ) + parser.add_argument( + "--job_name", + type=str, + default="", + help="Job name: e.g., local" + ) + parser.add_argument( + "--task_id", + type=int, + default=0, + help="Task index, e.g., 0" + ) + parser.add_argument( + "--gpu_memory_fraction", + type=float, + default=1.0, + help="Fraction of GPU memory allocated",) + parser.add_argument( + "--verbose", + type="bool", + nargs="?", + const=True, + default=False, + help="Verbose mode" + ) + + FLAGS, unparsed = parser.parse_known_args() + app.run(main=main, argv=[sys.argv[0]] + unparsed) \ No newline at end of file diff --git a/grpc_tensorflow_server/start_server.py b/grpc_tensorflow_server/start_server.py new file mode 100644 index 0000000000..8b90f40771 --- /dev/null +++ b/grpc_tensorflow_server/start_server.py @@ -0,0 +1,37 @@ +""" +Transform the ClusterSpec from json to piped and calls grpc_tensorflow_server.py +""" +import os +import json + +def main(): + tf_config = json.loads(os.environ["TF_CONFIG"]) + cluster_spec = tf_config["cluster"] + task_id = tf_config["task"]["index"] + + transformed_cluster_spec = "" + + isFirstJob = True + for job_name in cluster_spec: + if not isFirstJob: + #separator between different job kinds + transformed_cluster_spec += ',' + isFirstJob = False + + #separator between job name and it's element + transformed_cluster_spec += job_name + '|' + isFirstElement = True + for el in cluster_spec[job_name]: + if not isFirstElement: + #separator between different elements with same job type + transformed_cluster_spec += ';' + isFirstElement = False + transformed_cluster_spec += el + + dir_path = os.path.dirname(os.path.realpath(__file__)) + cmd = "python {}/grpc_tensorflow_server.py --cluster_spec '{}' --job_name ps --task_id {}".format(dir_path, transformed_cluster_spec, task_id) + print(cmd) + os.system(cmd) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/images/default-ps/Dockerfile b/images/default-ps/Dockerfile deleted file mode 100644 index ce336031d6..0000000000 --- a/images/default-ps/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -ARG BASE_IMAGE=tensorflow/tensorflow -FROM $BASE_IMAGE -RUN mkdir /app -COPY ./main.py /app/main.py -CMD ["python", "/app/main.py"] \ No newline at end of file diff --git a/images/default-ps/build_and_push.sh b/images/default-ps/build_and_push.sh deleted file mode 100755 index 4cb1e662a6..0000000000 --- a/images/default-ps/build_and_push.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/bash -set -e - -SRC_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -ROOT_DIR=${SRC_DIR}/../../ - -#. ${ROOT_DIR}/config.sh -REGISTRY="wbuchwalter" -TAGS=("1.1.0" "1.2.0" "1.2.1" "1.3.0" "latest") - -for i in "${TAGS[@]}" -do - BASEIMAGE="tensorflow/tensorflow:$i" - IMAGE="${REGISTRY}/mlkube-tensorflow-ps:$i" - - DIR=`mktemp -d` - echo Use ${DIR} as context - cp ${SRC_DIR}/Dockerfile ${DIR}/Dockerfile - cp ${SRC_DIR}/main.py ${DIR}/main.py - docker build -t $IMAGE --build-arg BASE_IMAGE=$BASEIMAGE -f ${DIR}/Dockerfile ${DIR} - docker push $IMAGE -done \ No newline at end of file diff --git a/images/default-ps/main.py b/images/default-ps/main.py deleted file mode 100644 index 2ff27ed1de..0000000000 --- a/images/default-ps/main.py +++ /dev/null @@ -1,22 +0,0 @@ -# A very simple parameter server that joins the server defined by the cluster spec passed as environment variable - -import tensorflow as tf -import os -import json - -tf_config_json = os.environ.get("TF_CONFIG", "{}") -tf_config = json.loads(tf_config_json) -task = tf_config.get("task", {}) -cluster_spec = tf_config.get("cluster", {}) -cluster_spec_object = tf.train.ClusterSpec(cluster_spec) -job_name = task["type"] -task_id = task["index"] -server_def = tf.train.ServerDef( - cluster=cluster_spec_object.as_cluster_def(), - protocol="grpc", - job_name=job_name, - task_index=task_id) -server = tf.train.Server(server_def) - -if job_name == 'ps': - server.join() diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index cbec67ed1c..1faf386bcd 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,24 +6,27 @@ import ( "errors" "fmt" "io" - "k8s.io/client-go/kubernetes" - "github.com/jlewi/mlkube.io/pkg/spec" - "github.com/jlewi/mlkube.io/pkg/trainer" - "github.com/jlewi/mlkube.io/pkg/util/k8sutil" + "io/ioutil" "net/http" "reflect" "sync" "time" - apierrors "k8s.io/apimachinery/pkg/api/errors" - apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - v1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + "github.com/jlewi/mlkube.io/pkg/spec" + "github.com/jlewi/mlkube.io/pkg/trainer" + "github.com/jlewi/mlkube.io/pkg/util/k8sutil" + "k8s.io/client-go/kubernetes" + log "github.com/golang/glog" + "github.com/jlewi/mlkube.io/pkg/util" + v1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kwatch "k8s.io/apimachinery/pkg/watch" k8sErrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" - "github.com/jlewi/mlkube.io/pkg/util" + kwatch "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/pkg/api/v1" ) var ( @@ -66,7 +69,7 @@ func New(kubeCli kubernetes.Interface, apiCli apiextensionsclient.Interface, tfJ return &Controller{ Namespace: ns, KubeCli: kubeCli, - ApiCli: apiCli, + ApiCli: apiCli, TfJobClient: tfJobClient, // TODO(jlewi)): What to do about cluster.Cluster? jobs: make(map[string]*trainer.TrainingJob), @@ -226,25 +229,75 @@ func (c *Controller) initResource() (string, error) { return "", fmt.Errorf("fail to create CRD: %v", err) } } + + err = c.createPSConfigMap() + if err != nil { + log.Errorf("createPSConfigMap() returned error: %v", err) + } + return watchVersion, nil } +//Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py) +//that will be used as default PS +func (c *Controller) createPSConfigMap() error { + //If a ConfigMap with the same name already exists, it was created by an earlier operator + //we delete and recreate it in case the grpc_tensorflow_server.py was updated in the meantime + cm, err := c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Get(spec.PSConfigMapName(), metav1.GetOptions{}) + if err != nil { + if !k8sutil.IsKubernetesResourceNotFoundError(err) { + return err + } + } else { + err = c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Delete(spec.PSConfigMapName(), &metav1.DeleteOptions{}) + if err != nil { + return err + } + } + + cm = &v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: spec.PSConfigMapName(), + }, + Data: make(map[string]string), + } + + //grab server sources from files + filePaths := map[string]string{ + "start_server.py": "./grpc_tensorflow_server/start_server.py", + "grpc_tensorflow_server.py": "./grpc_tensorflow_server/grpc_tensorflow_server.py", + } + for n, fp := range filePaths { + data, err := ioutil.ReadFile(fp) + if err != nil { + return err + } + cm.Data[n] = string(data) + } + + _, err = c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Create(cm) + if err != nil { + return err + } + return nil +} + func (c *Controller) createCRD() error { crd := &v1beta1.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{ Name: spec.CRDName(), }, Spec: v1beta1.CustomResourceDefinitionSpec{ - Group: spec.CRDGroup, + Group: spec.CRDGroup, Version: spec.CRDVersion, - Scope: v1beta1.NamespaceScoped, - Names: v1beta1.CustomResourceDefinitionNames{ - Plural: spec.CRDKindPlural, - // TODO(jlewi): Do we want to set the singular name? - // Kind is the serialized kind of the resource. It is normally CamelCase and singular. - Kind: reflect.TypeOf(spec.TfJob{}).Name(), - }, + Scope: v1beta1.NamespaceScoped, + Names: v1beta1.CustomResourceDefinitionNames{ + Plural: spec.CRDKindPlural, + // TODO(jlewi): Do we want to set the singular name? + // Kind is the serialized kind of the resource. It is normally CamelCase and singular. + Kind: reflect.TypeOf(spec.TfJob{}).Name(), }, + }, } _, err := c.ApiCli.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd) diff --git a/pkg/spec/tf_job.go b/pkg/spec/tf_job.go index f1aa44b96c..951aab4745 100644 --- a/pkg/spec/tf_job.go +++ b/pkg/spec/tf_job.go @@ -30,6 +30,10 @@ func CRDName() string { return fmt.Sprintf("%s.%s", CRDKindPlural, CRDGroup) } +func PSConfigMapName() string { + return fmt.Sprintf("%s-%s", CRDName(), "ps") +} + type TfJob struct { metav1.TypeMeta `json:",inline"` Metadata metav1.ObjectMeta `json:"metadata,omitempty"` @@ -92,8 +96,8 @@ type TfReplicaSpec struct { // TfPort is the port to use for TF services. TfPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"` TfReplicaType `json:"tfReplicaType"` - //TfVersion is only used when TfReplicaType == PS to automatically start a PS server - TfVersion string `json:"tfVersion,omitempty"` + //TfImage is only used when TfReplicaType == PS to automatically start a PS server + TfImage string `json:"tfImage,omitempty"` } type TensorBoardSpec struct { @@ -113,8 +117,8 @@ func (c *TfJobSpec) Validate() error { return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) } - if r.TfReplicaType == PS && r.Template == nil && r.TfVersion == "" { - return errors.New("PS must either have TfVersion or Template specified.") + if r.TfReplicaType == PS && r.Template == nil && r.TfImage == "" { + return errors.New("PS must either have TfImage or Template specified.") } if r.TfReplicaType == MASTER && *r.Replicas != 1 { @@ -230,13 +234,33 @@ func (c *TfJobSpec) SetDefaults() error { r.Replicas = proto.Int32(Replicas) } - if r.Template == nil && r.TfReplicaType == PS { + //Set the default configuration for a PS server if the user didn't specify a PodTemplateSpec + if r.Template == nil && r.TfReplicaType == PS && r.TfImage != "" { r.Template = &v1.PodTemplateSpec{ Spec: v1.PodSpec{ Containers: []v1.Container{ v1.Container{ - Image: fmt.Sprintf("%s:%s", PsDefaultImage, r.TfVersion), + Image: r.TfImage, Name: "tensorflow", + VolumeMounts: []v1.VolumeMount{ + v1.VolumeMount{ + Name: "ps-config-volume", + MountPath: "/ps-server", + }, + }, + Command: []string{"python", "/ps-server/start_server.py"}, + }, + }, + Volumes: []v1.Volume{ + v1.Volume{ + Name: "ps-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: PSConfigMapName(), + }, + }, + }, }, }, RestartPolicy: v1.RestartPolicyOnFailure, diff --git a/pkg/spec/tf_job_test.go b/pkg/spec/tf_job_test.go index ed3a1cdc21..5c039a3271 100644 --- a/pkg/spec/tf_job_test.go +++ b/pkg/spec/tf_job_test.go @@ -1,7 +1,6 @@ package spec import ( - "fmt" "reflect" "testing" @@ -239,9 +238,6 @@ func TestSetDefaults(t *testing.T) { expected *TfJobSpec } - defaultPsImage := "wbuchwalter/mlkube-tensorflow-ps" - tag := "1.3.0" - testCases := []testCase{ { in: &TfJobSpec{ @@ -283,7 +279,7 @@ func TestSetDefaults(t *testing.T) { ReplicaSpecs: []*TfReplicaSpec{ { TfReplicaType: PS, - TfVersion: tag, + TfImage: "tensorflow/tensorflow:1.3.", }, }, }, @@ -295,16 +291,35 @@ func TestSetDefaults(t *testing.T) { Template: &v1.PodTemplateSpec{ Spec: v1.PodSpec{ Containers: []v1.Container{ - { + v1.Container{ + Image: "tensorflow/tensorflow:1.3.", Name: "tensorflow", - Image: fmt.Sprintf("%s:%s", defaultPsImage, tag), + VolumeMounts: []v1.VolumeMount{ + v1.VolumeMount{ + Name: "ps-config-volume", + MountPath: "/ps-server", + }, + }, + Command: []string{"python", "/ps-server/start_server.py"}, + }, + }, + Volumes: []v1.Volume{ + v1.Volume{ + Name: "ps-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: PSConfigMapName(), + }, + }, + }, }, }, RestartPolicy: v1.RestartPolicyOnFailure, }, }, TfReplicaType: PS, - TfVersion: tag, + TfImage: "tensorflow/tensorflow:1.3.", }, }, }, From 8082db385e551e9da35de68708c61e56486a127b Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Tue, 26 Sep 2017 14:00:42 -0400 Subject: [PATCH 05/14] move default PS clusterspec transform in Create --- .../grpc_tensorflow_server.py | 3 + grpc_tensorflow_server/start_server.py | 37 ----------- pkg/controller/controller.go | 1 - pkg/spec/tf_job.go | 66 ++++++++++--------- pkg/spec/tf_job_test.go | 2 +- pkg/trainer/replicas.go | 35 ++++++++++ 6 files changed, 75 insertions(+), 69 deletions(-) delete mode 100644 grpc_tensorflow_server/start_server.py diff --git a/grpc_tensorflow_server/grpc_tensorflow_server.py b/grpc_tensorflow_server/grpc_tensorflow_server.py index dd26c4509d..a046682180 100644 --- a/grpc_tensorflow_server/grpc_tensorflow_server.py +++ b/grpc_tensorflow_server/grpc_tensorflow_server.py @@ -1,3 +1,6 @@ +""" TODO: Once grpc_tensorflow_server.py is included in tensorflow +docker image we should use it instead" """ + #!/usr/bin/python # Copyright 2016 The TensorFlow Authors. All Rights Reserved. # diff --git a/grpc_tensorflow_server/start_server.py b/grpc_tensorflow_server/start_server.py deleted file mode 100644 index 8b90f40771..0000000000 --- a/grpc_tensorflow_server/start_server.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -Transform the ClusterSpec from json to piped and calls grpc_tensorflow_server.py -""" -import os -import json - -def main(): - tf_config = json.loads(os.environ["TF_CONFIG"]) - cluster_spec = tf_config["cluster"] - task_id = tf_config["task"]["index"] - - transformed_cluster_spec = "" - - isFirstJob = True - for job_name in cluster_spec: - if not isFirstJob: - #separator between different job kinds - transformed_cluster_spec += ',' - isFirstJob = False - - #separator between job name and it's element - transformed_cluster_spec += job_name + '|' - isFirstElement = True - for el in cluster_spec[job_name]: - if not isFirstElement: - #separator between different elements with same job type - transformed_cluster_spec += ';' - isFirstElement = False - transformed_cluster_spec += el - - dir_path = os.path.dirname(os.path.realpath(__file__)) - cmd = "python {}/grpc_tensorflow_server.py --cluster_spec '{}' --job_name ps --task_id {}".format(dir_path, transformed_cluster_spec, task_id) - print(cmd) - os.system(cmd) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 1faf386bcd..2b57e1486d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -264,7 +264,6 @@ func (c *Controller) createPSConfigMap() error { //grab server sources from files filePaths := map[string]string{ - "start_server.py": "./grpc_tensorflow_server/start_server.py", "grpc_tensorflow_server.py": "./grpc_tensorflow_server/grpc_tensorflow_server.py", } for n, fp := range filePaths { diff --git a/pkg/spec/tf_job.go b/pkg/spec/tf_job.go index 951aab4745..02de861394 100644 --- a/pkg/spec/tf_job.go +++ b/pkg/spec/tf_job.go @@ -98,6 +98,8 @@ type TfReplicaSpec struct { TfReplicaType `json:"tfReplicaType"` //TfImage is only used when TfReplicaType == PS to automatically start a PS server TfImage string `json:"tfImage,omitempty"` + //IsDefaultPS denotes if the parameter server should use the default grpc_tensorflow_server + IsDefaultPS bool } type TensorBoardSpec struct { @@ -236,36 +238,7 @@ func (c *TfJobSpec) SetDefaults() error { //Set the default configuration for a PS server if the user didn't specify a PodTemplateSpec if r.Template == nil && r.TfReplicaType == PS && r.TfImage != "" { - r.Template = &v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - v1.Container{ - Image: r.TfImage, - Name: "tensorflow", - VolumeMounts: []v1.VolumeMount{ - v1.VolumeMount{ - Name: "ps-config-volume", - MountPath: "/ps-server", - }, - }, - Command: []string{"python", "/ps-server/start_server.py"}, - }, - }, - Volumes: []v1.Volume{ - v1.Volume{ - Name: "ps-config-volume", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: PSConfigMapName(), - }, - }, - }, - }, - }, - RestartPolicy: v1.RestartPolicyOnFailure, - }, - } + r.setDefaultPSPodTemplateSpec() } } return nil @@ -278,6 +251,39 @@ func (c *TfJobSpec) Cleanup() { // We should have default container images so user doesn't have to provide these. } +func (r *TfReplicaSpec) setDefaultPSPodTemplateSpec() { + r.IsDefaultPS = true + r.Template = &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Image: r.TfImage, + Name: "tensorflow", + VolumeMounts: []v1.VolumeMount{ + v1.VolumeMount{ + Name: "ps-config-volume", + MountPath: "/ps-server", + }, + }, + }, + }, + Volumes: []v1.Volume{ + v1.Volume{ + Name: "ps-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: PSConfigMapName(), + }, + }, + }, + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + } +} + type TfJobPhase string const ( diff --git a/pkg/spec/tf_job_test.go b/pkg/spec/tf_job_test.go index 5c039a3271..7caf36f879 100644 --- a/pkg/spec/tf_job_test.go +++ b/pkg/spec/tf_job_test.go @@ -300,7 +300,6 @@ func TestSetDefaults(t *testing.T) { MountPath: "/ps-server", }, }, - Command: []string{"python", "/ps-server/start_server.py"}, }, }, Volumes: []v1.Volume{ @@ -320,6 +319,7 @@ func TestSetDefaults(t *testing.T) { }, TfReplicaType: PS, TfImage: "tensorflow/tensorflow:1.3.", + IsDefaultPS: true, }, }, }, diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index fa3763bb82..53fadc4628 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -1,6 +1,7 @@ package trainer import ( + "bytes" "encoding/json" "errors" "fmt" @@ -89,6 +90,32 @@ func (s *TFReplicaSet) Labels() KubernetesLabels { "runtime_id": s.Job.job.Spec.RuntimeId}) } +// Transforms the tfconfig to work with grpc_tensorflow_server +func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { + var buf bytes.Buffer + isFirstJob := true + for k, v := range clusterSpec { + if !isFirstJob { + //separator between different job kinds + buf.WriteString(",") + } + isFirstJob = false + buf.WriteString(k) + //separator between job name and it's element + buf.WriteString("|") + isFirstElement := true + for _, e := range v { + if !isFirstElement { + //separator between different elements with same job type + buf.WriteString(";") + } + isFirstElement = false + buf.WriteString(e) + } + } + return buf.String() +} + func (s *TFReplicaSet) Create() error { for index := int32(0); index < *s.Spec.Replicas; index++ { taskLabels := s.Labels() @@ -140,6 +167,14 @@ func (s *TFReplicaSet) Create() error { return err } + if s.Spec.IsDefaultPS { + // grpc_tensorflow_server.py requires the clusterSpec to be passed as argument in a specific format. + // We do the appropriate transformations here + cs := transformClusterSpecForDefaultPS(s.Job.ClusterSpec()) + s.Spec.Template.Spec.Containers[0].Command = []string{"python", "/ps-server/grpc_tensorflow_server.py"} + s.Spec.Template.Spec.Containers[0].Args = []string{"--cluster_spec", cs, "--job_name", "ps", "--task_id", fmt.Sprintf("%v", index)} + } + // Make a copy of the template because we will modify it below. // TODO(jlewi): I don't fully understand why this works but setting Template: *s.Spec.Template // leads to TF_CONFIG being added multiples as an environment variable. From 7fa80cb443d8cd401ec1029157fa633c49b9bf0e Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Fri, 29 Sep 2017 11:27:44 -0400 Subject: [PATCH 06/14] implement reviewer comments --- config.sh | 2 +- .../grpc_tensorflow_server.py | 9 ++- images/tf_operator/build_and_push.sh | 2 +- pkg/controller/controller.go | 50 ------------- pkg/spec/tf_job.go | 42 ++++------- pkg/trainer/replicas.go | 73 ++++++++++++++++++- pkg/trainer/tensorboard.go | 6 +- test/e2e/main.go | 1 - tf-job-operator-chart/values.yaml | 2 +- 9 files changed, 94 insertions(+), 93 deletions(-) diff --git a/config.sh b/config.sh index b6b2c6a6a1..e623baa0c5 100644 --- a/config.sh +++ b/config.sh @@ -2,4 +2,4 @@ # Set REGISTRY to the Docker registry where Docker images will # be pushed. -REGISTRY=gcr.io/tf-on-k8s-dogfood +REGISTRY=wbuchwalter diff --git a/grpc_tensorflow_server/grpc_tensorflow_server.py b/grpc_tensorflow_server/grpc_tensorflow_server.py index a046682180..703b64270a 100644 --- a/grpc_tensorflow_server/grpc_tensorflow_server.py +++ b/grpc_tensorflow_server/grpc_tensorflow_server.py @@ -1,6 +1,3 @@ -""" TODO: Once grpc_tensorflow_server.py is included in tensorflow -docker image we should use it instead" """ - #!/usr/bin/python # Copyright 2016 The TensorFlow Authors. All Rights Reserved. # @@ -16,7 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -"""Python-based TensorFlow GRPC server. +""" +TODO: Once grpc_tensorflow_server.py is included in tensorflow +docker image we should use it instead" + +Python-based TensorFlow GRPC server. Takes input arguments cluster_spec, job_name and task_id, and start a blocking TensorFlow GRPC server. diff --git a/images/tf_operator/build_and_push.sh b/images/tf_operator/build_and_push.sh index 411e288830..6f180f1988 100755 --- a/images/tf_operator/build_and_push.sh +++ b/images/tf_operator/build_and_push.sh @@ -11,7 +11,7 @@ GITHASH=$(git rev-parse --short HEAD) CHANGES=$(git diff-index --quiet HEAD -- || echo "untracked") if [ -n "$CHANGES" ]; then # Get the hash of the diff. - DIFFHASH=$(git diff | sha256sum) + DIFFHASH=$(git diff | shasum -a 256) DIFFHASH=${DIFFHASH:0:7} GITHASH=${GITHASH}-dirty-${DIFFHASH} fi diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 2b57e1486d..69afaa41c3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "reflect" "sync" @@ -26,7 +25,6 @@ import ( k8sErrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/wait" kwatch "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/pkg/api/v1" ) var ( @@ -230,57 +228,9 @@ func (c *Controller) initResource() (string, error) { } } - err = c.createPSConfigMap() - if err != nil { - log.Errorf("createPSConfigMap() returned error: %v", err) - } - return watchVersion, nil } -//Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py) -//that will be used as default PS -func (c *Controller) createPSConfigMap() error { - //If a ConfigMap with the same name already exists, it was created by an earlier operator - //we delete and recreate it in case the grpc_tensorflow_server.py was updated in the meantime - cm, err := c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Get(spec.PSConfigMapName(), metav1.GetOptions{}) - if err != nil { - if !k8sutil.IsKubernetesResourceNotFoundError(err) { - return err - } - } else { - err = c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Delete(spec.PSConfigMapName(), &metav1.DeleteOptions{}) - if err != nil { - return err - } - } - - cm = &v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: spec.PSConfigMapName(), - }, - Data: make(map[string]string), - } - - //grab server sources from files - filePaths := map[string]string{ - "grpc_tensorflow_server.py": "./grpc_tensorflow_server/grpc_tensorflow_server.py", - } - for n, fp := range filePaths { - data, err := ioutil.ReadFile(fp) - if err != nil { - return err - } - cm.Data[n] = string(data) - } - - _, err = c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Create(cm) - if err != nil { - return err - } - return nil -} - func (c *Controller) createCRD() error { crd := &v1beta1.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/spec/tf_job.go b/pkg/spec/tf_job.go index 02de861394..6878cfb065 100644 --- a/pkg/spec/tf_job.go +++ b/pkg/spec/tf_job.go @@ -30,10 +30,6 @@ func CRDName() string { return fmt.Sprintf("%s.%s", CRDKindPlural, CRDGroup) } -func PSConfigMapName() string { - return fmt.Sprintf("%s-%s", CRDName(), "ps") -} - type TfJob struct { metav1.TypeMeta `json:",inline"` Metadata metav1.ObjectMeta `json:"metadata,omitempty"` @@ -64,6 +60,10 @@ type TfJobSpec struct { // ReplicaSpecs specifies the TF replicas to run. ReplicaSpecs []*TfReplicaSpec `json:"replicaSpecs"` + + // TfImage defines the tensorflow docker image that should be used for Tensorboard + // and the default parameter server + TfImage string `json:"tfImage,omitempty"` } // TfReplicaType determines how a set of TF processes are handled. @@ -80,7 +80,7 @@ type ContainerName string const ( TENSORFLOW ContainerName = "tensorflow" - PsDefaultImage = "wbuchwalter/mlkube-tensorflow-ps" + DefaultTFImage = "tensorflow/tensorflow:latest" ) // TODO(jlewi): We probably want to add a name field. This would allow us to have more than 1 type of each worker. @@ -96,9 +96,7 @@ type TfReplicaSpec struct { // TfPort is the port to use for TF services. TfPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"` TfReplicaType `json:"tfReplicaType"` - //TfImage is only used when TfReplicaType == PS to automatically start a PS server - TfImage string `json:"tfImage,omitempty"` - //IsDefaultPS denotes if the parameter server should use the default grpc_tensorflow_server + // IsDefaultPS denotes if the parameter server should use the default grpc_tensorflow_server IsDefaultPS bool } @@ -119,10 +117,6 @@ func (c *TfJobSpec) Validate() error { return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) } - if r.TfReplicaType == PS && r.Template == nil && r.TfImage == "" { - return errors.New("PS must either have TfImage or Template specified.") - } - if r.TfReplicaType == MASTER && *r.Replicas != 1 { return errors.New("The MASTER must have Replicas = 1") } @@ -218,6 +212,10 @@ func (c *TfJobSpec) ConfigureAccelerators(accelerators map[string]AcceleratorCon // SetDefaults sets any unspecified values to defaults func (c *TfJobSpec) SetDefaults() error { + if c.TfImage == "" { + c.TfImage = DefaultTFImage + } + // Check that each replica has a TensorFlow container. for _, r := range c.ReplicaSpecs { if r.Template == nil && r.TfReplicaType != PS { @@ -237,8 +235,8 @@ func (c *TfJobSpec) SetDefaults() error { } //Set the default configuration for a PS server if the user didn't specify a PodTemplateSpec - if r.Template == nil && r.TfReplicaType == PS && r.TfImage != "" { - r.setDefaultPSPodTemplateSpec() + if r.Template == nil && r.TfReplicaType == PS { + r.setDefaultPSPodTemplateSpec(c.TfImage) } } return nil @@ -251,13 +249,13 @@ func (c *TfJobSpec) Cleanup() { // We should have default container images so user doesn't have to provide these. } -func (r *TfReplicaSpec) setDefaultPSPodTemplateSpec() { +func (r *TfReplicaSpec) setDefaultPSPodTemplateSpec(tfImage string) { r.IsDefaultPS = true r.Template = &v1.PodTemplateSpec{ Spec: v1.PodSpec{ Containers: []v1.Container{ v1.Container{ - Image: r.TfImage, + Image: tfImage, Name: "tensorflow", VolumeMounts: []v1.VolumeMount{ v1.VolumeMount{ @@ -267,18 +265,6 @@ func (r *TfReplicaSpec) setDefaultPSPodTemplateSpec() { }, }, }, - Volumes: []v1.Volume{ - v1.Volume{ - Name: "ps-config-volume", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: PSConfigMapName(), - }, - }, - }, - }, - }, RestartPolicy: v1.RestartPolicyOnFailure, }, } diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index 53fadc4628..8335315bf3 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -5,8 +5,11 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" "strings" + "github.com/jlewi/mlkube.io/pkg/util/k8sutil" + "github.com/jlewi/mlkube.io/pkg/spec" log "github.com/golang/glog" @@ -117,6 +120,32 @@ func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { } func (s *TFReplicaSet) Create() error { + if s.Spec.IsDefaultPS { + // Create the ConfigMap containing the sources for the default Parameter Server + err, cm := s.getDefaultPSConfigMap() + if err != nil { + log.Infof("Error building PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) + return err + } + _, err = s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Create(cm) + if err != nil { + log.Infof("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) + return err + } + + // Update Volumes to include the ConfigMap containing grpc_tensorflow_server.py + s.Spec.Template.Spec.Volumes = append(s.Spec.Template.Spec.Volumes, v1.Volume{ + Name: "ps-config-volume", + VolumeSource: v1.VolumeSource{ + ConfigMap: &v1.ConfigMapVolumeSource{ + LocalObjectReference: v1.LocalObjectReference{ + Name: s.defaultPSConfigMapName(), + }, + }, + }, + }) + } + for index := int32(0); index < *s.Spec.Replicas; index++ { taskLabels := s.Labels() taskLabels["task_index"] = fmt.Sprintf("%v", index) @@ -168,11 +197,8 @@ func (s *TFReplicaSet) Create() error { } if s.Spec.IsDefaultPS { - // grpc_tensorflow_server.py requires the clusterSpec to be passed as argument in a specific format. - // We do the appropriate transformations here cs := transformClusterSpecForDefaultPS(s.Job.ClusterSpec()) - s.Spec.Template.Spec.Containers[0].Command = []string{"python", "/ps-server/grpc_tensorflow_server.py"} - s.Spec.Template.Spec.Containers[0].Args = []string{"--cluster_spec", cs, "--job_name", "ps", "--task_id", fmt.Sprintf("%v", index)} + s.Spec.Template.Spec.Containers[0].Command = []string{"python", "/ps-server/grpc_tensorflow_server.py", "--cluster_spec", cs, "--job_name", "ps", "--task_id", fmt.Sprintf("%v", index)} } // Make a copy of the template because we will modify it below. @@ -234,6 +260,31 @@ func (s *TFReplicaSet) Create() error { return nil } +// Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py) +// that will be used as default PS +func (s *TFReplicaSet) getDefaultPSConfigMap() (error, *v1.ConfigMap) { + cm := &v1.ConfigMap{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: s.defaultPSConfigMapName(), + }, + Data: make(map[string]string), + } + + //grab server sources from files + filePaths := map[string]string{ + "grpc_tensorflow_server.py": "./grpc_tensorflow_server/grpc_tensorflow_server.py", + } + for n, fp := range filePaths { + data, err := ioutil.ReadFile(fp) + if err != nil { + return err, nil + } + cm.Data[n] = string(data) + } + + return nil, cm +} + // Delete deletes the replicas func (s *TFReplicaSet) Delete() error { selector, err := s.Labels().ToSelector() @@ -272,6 +323,16 @@ func (s *TFReplicaSet) Delete() error { } } + // If the ConfigMap for the default parameter server exists, we delete it + _, err = s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{}) + if err != nil { + if !k8sutil.IsKubernetesResourceNotFoundError(err) { + log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err) + } + } else { + s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{}) + } + if failures { return errors.New("Some of the replicas resources could not be deleted") } @@ -417,3 +478,7 @@ func (s *TFReplicaSet) GetStatus() (spec.TfReplicaStatus, error) { func (s *TFReplicaSet) jobName(index int32) string { return fmt.Sprintf("%v-%v-%v", strings.ToLower(string(s.Spec.TfReplicaType)), s.Job.job.Spec.RuntimeId, index) } + +func (s *TFReplicaSet) defaultPSConfigMapName() string { + return fmt.Sprintf("cm-ps-%v", s.Job.job.Spec.RuntimeId) +} diff --git a/pkg/trainer/tensorboard.go b/pkg/trainer/tensorboard.go index b21e798344..e781c3c521 100644 --- a/pkg/trainer/tensorboard.go +++ b/pkg/trainer/tensorboard.go @@ -88,7 +88,7 @@ func (s *TBReplicaSet) Create() error { MatchLabels: s.Labels(), }, Replicas: proto.Int32(1), - Template: s.getDeploymentSpecTemplate(), + Template: s.getDeploymentSpecTemplate(s.Job.job.Spec.TfImage), }, } @@ -129,11 +129,11 @@ func (s *TBReplicaSet) Delete() error { return nil } -func (s *TBReplicaSet) getDeploymentSpecTemplate() v1.PodTemplateSpec { +func (s *TBReplicaSet) getDeploymentSpecTemplate(image string) v1.PodTemplateSpec { // TODO: make the TensorFlow image a parameter of the job operator. c := &v1.Container{ Name: s.jobName(), - Image: "tensorflow/tensorflow", + Image: image, Command: []string{ "tensorboard", "--logdir", s.Spec.LogDir, "--host", "0.0.0.0", }, diff --git a/test/e2e/main.go b/test/e2e/main.go index 8151b7fd3c..ac0685b535 100644 --- a/test/e2e/main.go +++ b/test/e2e/main.go @@ -73,7 +73,6 @@ func run() error { Replicas: proto.Int32(1), TfPort: proto.Int32(2222), TfReplicaType: spec.PS, - TfVersion: "1.3.0", }, { Replicas: proto.Int32(1), diff --git a/tf-job-operator-chart/values.yaml b/tf-job-operator-chart/values.yaml index d90c7156dd..ebd502ecb6 100644 --- a/tf-job-operator-chart/values.yaml +++ b/tf-job-operator-chart/values.yaml @@ -1,5 +1,5 @@ # Docker image to use. -image: gcr.io/tf-on-k8s-dogfood/tf_operator:4dd012d +image: wbuchwalter/tf_operator:8082db3-dirty-16f460e test_image: gcr.io/tf-on-k8s-dogfood/tf_sample:dc944ff From 8b233ef413df4f6b0750450b53c52dc990a34c09 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Fri, 29 Sep 2017 16:52:30 -0400 Subject: [PATCH 07/14] update docker image --- config.sh | 2 +- glide.lock | 2 +- images/tf_operator/Dockerfile | 1 + images/tf_operator/build_and_push.sh | 3 ++- pkg/trainer/replicas.go | 6 +++--- tf-job-operator-chart/values.yaml | 2 +- 6 files changed, 9 insertions(+), 7 deletions(-) diff --git a/config.sh b/config.sh index e623baa0c5..b6b2c6a6a1 100644 --- a/config.sh +++ b/config.sh @@ -2,4 +2,4 @@ # Set REGISTRY to the Docker registry where Docker images will # be pushed. -REGISTRY=wbuchwalter +REGISTRY=gcr.io/tf-on-k8s-dogfood diff --git a/glide.lock b/glide.lock index 71b5f61481..6cb62595ed 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ hash: b9c34817e80d7914b889e41fb456c8b0595f707cf3d2923da3f7b2ac6a6de44a -updated: 2017-08-16T11:49:07.068368434-07:00 +updated: 2017-09-29T11:30:21.243398665-04:00 imports: - name: cloud.google.com/go version: 3b1ae45394a234c385be014e9a488f2bb6eef821 diff --git a/images/tf_operator/Dockerfile b/images/tf_operator/Dockerfile index 50a0048b10..52e10fc14d 100644 --- a/images/tf_operator/Dockerfile +++ b/images/tf_operator/Dockerfile @@ -5,6 +5,7 @@ RUN mkdir -p /opt/mlkube RUN mkdir -p /opt/mlkube/test COPY tf_operator /opt/mlkube COPY e2e /opt/mlkube/test +COPY grpc_tensorflow_server.py /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py RUN chmod a+x /opt/mlkube/tf_operator RUN chmod a+x /opt/mlkube/test/e2e diff --git a/images/tf_operator/build_and_push.sh b/images/tf_operator/build_and_push.sh index 6f180f1988..df2d76b46c 100755 --- a/images/tf_operator/build_and_push.sh +++ b/images/tf_operator/build_and_push.sh @@ -11,7 +11,7 @@ GITHASH=$(git rev-parse --short HEAD) CHANGES=$(git diff-index --quiet HEAD -- || echo "untracked") if [ -n "$CHANGES" ]; then # Get the hash of the diff. - DIFFHASH=$(git diff | shasum -a 256) + DIFFHASH=$(git diff | sha256sum) DIFFHASH=${DIFFHASH:0:7} GITHASH=${GITHASH}-dirty-${DIFFHASH} fi @@ -24,6 +24,7 @@ go install github.com/jlewi/mlkube.io/test/e2e cp ${GOPATH}/bin/tf_operator ${DIR}/ cp ${GOPATH}/bin/e2e ${DIR}/ cp ${SRC_DIR}/Dockerfile ${DIR}/ +cp ${ROOT_DIR}/grpc_tensorflow_server/grpc_tensorflow_server.py ${DIR}/ docker build -t $IMAGE -f ${DIR}/Dockerfile ${DIR} gcloud docker -- push $IMAGE diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index 8335315bf3..74c59c32df 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -124,12 +124,12 @@ func (s *TFReplicaSet) Create() error { // Create the ConfigMap containing the sources for the default Parameter Server err, cm := s.getDefaultPSConfigMap() if err != nil { - log.Infof("Error building PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) + log.Errorf("Error building PS ConfigMap: %v", err) return err } _, err = s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Create(cm) if err != nil { - log.Infof("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) + log.Errorf("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) return err } @@ -272,7 +272,7 @@ func (s *TFReplicaSet) getDefaultPSConfigMap() (error, *v1.ConfigMap) { //grab server sources from files filePaths := map[string]string{ - "grpc_tensorflow_server.py": "./grpc_tensorflow_server/grpc_tensorflow_server.py", + "grpc_tensorflow_server.py": "/opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py", } for n, fp := range filePaths { data, err := ioutil.ReadFile(fp) diff --git a/tf-job-operator-chart/values.yaml b/tf-job-operator-chart/values.yaml index ebd502ecb6..cfc9a85d3c 100644 --- a/tf-job-operator-chart/values.yaml +++ b/tf-job-operator-chart/values.yaml @@ -1,5 +1,5 @@ # Docker image to use. -image: wbuchwalter/tf_operator:8082db3-dirty-16f460e +image: gcr.io/tf-on-k8s-dogfood/tf_operator:10b10fd test_image: gcr.io/tf-on-k8s-dogfood/tf_sample:dc944ff From d5907df5b663947ca411f2136cfdf90c8eff1e34 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Fri, 29 Sep 2017 17:57:04 -0400 Subject: [PATCH 08/14] make path to grpc server part of config --- cmd/tf_operator/main.go | 5 +++-- pkg/spec/controller.go | 3 +++ pkg/trainer/replicas.go | 8 ++++---- pkg/trainer/training.go | 10 +++++----- .../templates/tf_job_operator_deployment.yaml | 1 + 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/cmd/tf_operator/main.go b/cmd/tf_operator/main.go index e7d573b69a..824cebe0c6 100644 --- a/cmd/tf_operator/main.go +++ b/cmd/tf_operator/main.go @@ -22,10 +22,10 @@ import ( "io/ioutil" + "github.com/jlewi/mlkube.io/pkg/spec" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/tools/record" - "github.com/jlewi/mlkube.io/pkg/spec" ) var ( @@ -36,6 +36,7 @@ var ( chaosLevel int controllerConfigFile string printVersion bool + grpcServerFile string ) var ( @@ -52,7 +53,6 @@ func init() { flag.BoolVar(&printVersion, "version", false, "Show version and quit") flag.DurationVar(&gcInterval, "gc-interval", 10*time.Minute, "GC interval") flag.StringVar(&controllerConfigFile, "controller_config_file", "", "Path to file containing the controller config.") - flag.Parse() // Workaround for watching TPR resource. @@ -84,6 +84,7 @@ func init() { } else { log.Info("No controller_config_file provided; using empty config.") } + } func main() { diff --git a/pkg/spec/controller.go b/pkg/spec/controller.go index d08e054a0b..3a6c4412db 100644 --- a/pkg/spec/controller.go +++ b/pkg/spec/controller.go @@ -5,6 +5,9 @@ type ControllerConfig struct { // This should match the value specified as a container limit. // e.g. alpha.kubernetes.io/nvidia-gpu Accelerators map[string]AcceleratorConfig + + // Path to the file containing the grpc server sources + GrpcServerFilePath string } // AcceleratorVolume represents a host path that must be mounted into diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index 74c59c32df..e33258215b 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -119,10 +119,10 @@ func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { return buf.String() } -func (s *TFReplicaSet) Create() error { +func (s *TFReplicaSet) Create(config *spec.ControllerConfig) error { if s.Spec.IsDefaultPS { // Create the ConfigMap containing the sources for the default Parameter Server - err, cm := s.getDefaultPSConfigMap() + err, cm := s.getDefaultPSConfigMap(config) if err != nil { log.Errorf("Error building PS ConfigMap: %v", err) return err @@ -262,7 +262,7 @@ func (s *TFReplicaSet) Create() error { // Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py) // that will be used as default PS -func (s *TFReplicaSet) getDefaultPSConfigMap() (error, *v1.ConfigMap) { +func (s *TFReplicaSet) getDefaultPSConfigMap(config *spec.ControllerConfig) (error, *v1.ConfigMap) { cm := &v1.ConfigMap{ ObjectMeta: meta_v1.ObjectMeta{ Name: s.defaultPSConfigMapName(), @@ -272,7 +272,7 @@ func (s *TFReplicaSet) getDefaultPSConfigMap() (error, *v1.ConfigMap) { //grab server sources from files filePaths := map[string]string{ - "grpc_tensorflow_server.py": "/opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py", + "grpc_tensorflow_server.py": config.GrpcServerFilePath, } for n, fp := range filePaths { data, err := ioutil.ReadFile(fp) diff --git a/pkg/trainer/training.go b/pkg/trainer/training.go index ed33f66ec4..f91c4e90a0 100644 --- a/pkg/trainer/training.go +++ b/pkg/trainer/training.go @@ -123,7 +123,7 @@ func NewJob(kubeCli kubernetes.Interface, tfJobClient k8sutil.TfJobClient, job * } return } - j.run(stopC) + j.run(config, stopC) }() return j, nil @@ -146,9 +146,9 @@ func (j *TrainingJob) ClusterSpec() ClusterSpec { } // createResources creates all the replicas and TensorBoard if requested -func (j *TrainingJob) createResources() error { +func (j *TrainingJob) createResources(config *spec.ControllerConfig) error { for _, r := range j.Replicas { - if err := r.Create(); err != nil { + if err := r.Create(config); err != nil { return err } } @@ -417,7 +417,7 @@ func (j *TrainingJob) updateTPRStatus() error { return nil } -func (j *TrainingJob) run(stopC <-chan struct{}) { +func (j *TrainingJob) run(config *spec.ControllerConfig, stopC <-chan struct{}) { // TODO(jlewi): What does the run function do? clusterFailed := false @@ -477,7 +477,7 @@ func (j *TrainingJob) run(stopC <-chan struct{}) { // now we always call Create. if j.job.Status.Phase == spec.TfJobPhaseRunning { // We call Create to make sure all the resources exist and are running. - if cErr := j.createResources(); cErr != nil { + if cErr := j.createResources(config); cErr != nil { log.Errorf("trainingJobCreateReplicas() error; %v", cErr) } diff --git a/tf-job-operator-chart/templates/tf_job_operator_deployment.yaml b/tf-job-operator-chart/templates/tf_job_operator_deployment.yaml index bed44870fb..1cf3d16f16 100644 --- a/tf-job-operator-chart/templates/tf_job_operator_deployment.yaml +++ b/tf-job-operator-chart/templates/tf_job_operator_deployment.yaml @@ -5,6 +5,7 @@ metadata: namespace: default data: controller_config_file.yaml: | + grpcServerFilePath: /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py accelerators: alpha.kubernetes.io/nvidia-gpu: volumes: From c104f0e9c7ff3ad3b3559a9d094a6e5883a590c1 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Tue, 17 Oct 2017 14:11:57 -0400 Subject: [PATCH 09/14] fix and add unit tests --- pkg/spec/tf_job_test.go | 20 +++++--------------- pkg/trainer/replicas.go | 12 ++++++++++-- pkg/trainer/replicas_test.go | 22 +++++++++++++++++++--- 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/pkg/spec/tf_job_test.go b/pkg/spec/tf_job_test.go index 7caf36f879..cb563362b4 100644 --- a/pkg/spec/tf_job_test.go +++ b/pkg/spec/tf_job_test.go @@ -254,6 +254,7 @@ func TestSetDefaults(t *testing.T) { }, }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, expected: &TfJobSpec{ ReplicaSpecs: []*TfReplicaSpec{ @@ -272,6 +273,7 @@ func TestSetDefaults(t *testing.T) { TfReplicaType: MASTER, }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, }, { @@ -279,9 +281,9 @@ func TestSetDefaults(t *testing.T) { ReplicaSpecs: []*TfReplicaSpec{ { TfReplicaType: PS, - TfImage: "tensorflow/tensorflow:1.3.", }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, expected: &TfJobSpec{ ReplicaSpecs: []*TfReplicaSpec{ @@ -292,7 +294,7 @@ func TestSetDefaults(t *testing.T) { Spec: v1.PodSpec{ Containers: []v1.Container{ v1.Container{ - Image: "tensorflow/tensorflow:1.3.", + Image: "tensorflow/tensorflow:1.3.0", Name: "tensorflow", VolumeMounts: []v1.VolumeMount{ v1.VolumeMount{ @@ -302,26 +304,14 @@ func TestSetDefaults(t *testing.T) { }, }, }, - Volumes: []v1.Volume{ - v1.Volume{ - Name: "ps-config-volume", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: PSConfigMapName(), - }, - }, - }, - }, - }, RestartPolicy: v1.RestartPolicyOnFailure, }, }, TfReplicaType: PS, - TfImage: "tensorflow/tensorflow:1.3.", IsDefaultPS: true, }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, }, } diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index d60beb2e8a..44f7955cd9 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io/ioutil" + "sort" "strings" "github.com/jlewi/mlkube.io/pkg/util/k8sutil" @@ -95,9 +96,16 @@ func (s *TFReplicaSet) Labels() KubernetesLabels { // Transforms the tfconfig to work with grpc_tensorflow_server func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { + + keys := []string{} + for k := range clusterSpec { + keys = append(keys, k) + } + sort.Strings(keys) + var buf bytes.Buffer isFirstJob := true - for k, v := range clusterSpec { + for _, k := range keys { if !isFirstJob { //separator between different job kinds buf.WriteString(",") @@ -107,7 +115,7 @@ func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { //separator between job name and it's element buf.WriteString("|") isFirstElement := true - for _, e := range v { + for _, e := range clusterSpec[k] { if !isFirstElement { //separator between different elements with same job type buf.WriteString(";") diff --git a/pkg/trainer/replicas_test.go b/pkg/trainer/replicas_test.go index e45382e964..9a31646f80 100644 --- a/pkg/trainer/replicas_test.go +++ b/pkg/trainer/replicas_test.go @@ -10,11 +10,11 @@ import ( "sync" "time" + "github.com/jlewi/mlkube.io/pkg/spec" + tfJobFake "github.com/jlewi/mlkube.io/pkg/util/k8sutil/fake" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/pkg/api/v1" - "github.com/jlewi/mlkube.io/pkg/spec" - tfJobFake "github.com/jlewi/mlkube.io/pkg/util/k8sutil/fake" ) func TestTFReplicaSet(t *testing.T) { @@ -49,7 +49,7 @@ func TestTFReplicaSet(t *testing.T) { t.Fatalf("NewTFReplicaSet failed: %v", err) } - if err := replica.Create(); err != nil { + if err := replica.Create(&spec.ControllerConfig{}); err != nil { t.Fatalf("replica.Create() error; %v", err) } @@ -270,3 +270,19 @@ func TestTFReplicaSetStatusFromPodList(t *testing.T) { } } } + +func TestTransformClusterSpecForDefaultPS(t *testing.T) { + + cs := ClusterSpec{ + "master": {"master-0:2222"}, + "worker": {"worker-0:2222", "worker-1:2222"}, + "ps": {"localhost:2222", "ps-1:2222"}, + } + expected := "master|master-0:2222,ps|localhost:2222;ps-1:2222,worker|worker-0:2222;worker-1:2222" + + tx := transformClusterSpecForDefaultPS(cs) + + if tx != expected { + t.Errorf("transformClusterSpecForDefaultPS() expected: %v, received: %v", expected, tx) + } +} From 188e5cdc5da42a7675769a85da8293ca6c963bfd Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Tue, 17 Oct 2017 14:11:57 -0400 Subject: [PATCH 10/14] fix and add unit tests --- pkg/spec/tf_job_test.go | 20 +++++--------------- pkg/trainer/replicas.go | 13 +++++++++++-- pkg/trainer/replicas_test.go | 22 +++++++++++++++++++--- 3 files changed, 35 insertions(+), 20 deletions(-) diff --git a/pkg/spec/tf_job_test.go b/pkg/spec/tf_job_test.go index 7caf36f879..cb563362b4 100644 --- a/pkg/spec/tf_job_test.go +++ b/pkg/spec/tf_job_test.go @@ -254,6 +254,7 @@ func TestSetDefaults(t *testing.T) { }, }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, expected: &TfJobSpec{ ReplicaSpecs: []*TfReplicaSpec{ @@ -272,6 +273,7 @@ func TestSetDefaults(t *testing.T) { TfReplicaType: MASTER, }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, }, { @@ -279,9 +281,9 @@ func TestSetDefaults(t *testing.T) { ReplicaSpecs: []*TfReplicaSpec{ { TfReplicaType: PS, - TfImage: "tensorflow/tensorflow:1.3.", }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, expected: &TfJobSpec{ ReplicaSpecs: []*TfReplicaSpec{ @@ -292,7 +294,7 @@ func TestSetDefaults(t *testing.T) { Spec: v1.PodSpec{ Containers: []v1.Container{ v1.Container{ - Image: "tensorflow/tensorflow:1.3.", + Image: "tensorflow/tensorflow:1.3.0", Name: "tensorflow", VolumeMounts: []v1.VolumeMount{ v1.VolumeMount{ @@ -302,26 +304,14 @@ func TestSetDefaults(t *testing.T) { }, }, }, - Volumes: []v1.Volume{ - v1.Volume{ - Name: "ps-config-volume", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: PSConfigMapName(), - }, - }, - }, - }, - }, RestartPolicy: v1.RestartPolicyOnFailure, }, }, TfReplicaType: PS, - TfImage: "tensorflow/tensorflow:1.3.", IsDefaultPS: true, }, }, + TfImage: "tensorflow/tensorflow:1.3.0", }, }, } diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index d60beb2e8a..ebdab16d82 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io/ioutil" + "sort" "strings" "github.com/jlewi/mlkube.io/pkg/util/k8sutil" @@ -95,9 +96,17 @@ func (s *TFReplicaSet) Labels() KubernetesLabels { // Transforms the tfconfig to work with grpc_tensorflow_server func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { + + // sort by keys to make unit testing easier + keys := []string{} + for k := range clusterSpec { + keys = append(keys, k) + } + sort.Strings(keys) + var buf bytes.Buffer isFirstJob := true - for k, v := range clusterSpec { + for _, k := range keys { if !isFirstJob { //separator between different job kinds buf.WriteString(",") @@ -107,7 +116,7 @@ func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { //separator between job name and it's element buf.WriteString("|") isFirstElement := true - for _, e := range v { + for _, e := range clusterSpec[k] { if !isFirstElement { //separator between different elements with same job type buf.WriteString(";") diff --git a/pkg/trainer/replicas_test.go b/pkg/trainer/replicas_test.go index e45382e964..9a31646f80 100644 --- a/pkg/trainer/replicas_test.go +++ b/pkg/trainer/replicas_test.go @@ -10,11 +10,11 @@ import ( "sync" "time" + "github.com/jlewi/mlkube.io/pkg/spec" + tfJobFake "github.com/jlewi/mlkube.io/pkg/util/k8sutil/fake" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/pkg/api/v1" - "github.com/jlewi/mlkube.io/pkg/spec" - tfJobFake "github.com/jlewi/mlkube.io/pkg/util/k8sutil/fake" ) func TestTFReplicaSet(t *testing.T) { @@ -49,7 +49,7 @@ func TestTFReplicaSet(t *testing.T) { t.Fatalf("NewTFReplicaSet failed: %v", err) } - if err := replica.Create(); err != nil { + if err := replica.Create(&spec.ControllerConfig{}); err != nil { t.Fatalf("replica.Create() error; %v", err) } @@ -270,3 +270,19 @@ func TestTFReplicaSetStatusFromPodList(t *testing.T) { } } } + +func TestTransformClusterSpecForDefaultPS(t *testing.T) { + + cs := ClusterSpec{ + "master": {"master-0:2222"}, + "worker": {"worker-0:2222", "worker-1:2222"}, + "ps": {"localhost:2222", "ps-1:2222"}, + } + expected := "master|master-0:2222,ps|localhost:2222;ps-1:2222,worker|worker-0:2222;worker-1:2222" + + tx := transformClusterSpecForDefaultPS(cs) + + if tx != expected { + t.Errorf("transformClusterSpecForDefaultPS() expected: %v, received: %v", expected, tx) + } +} From c6f047d40095e72f307a92b990726875575a3972 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Tue, 17 Oct 2017 15:48:23 -0400 Subject: [PATCH 11/14] remove build_and_push.sh --- images/tf_operator/build_and_push.sh | 31 ---------------------------- 1 file changed, 31 deletions(-) delete mode 100755 images/tf_operator/build_and_push.sh diff --git a/images/tf_operator/build_and_push.sh b/images/tf_operator/build_and_push.sh deleted file mode 100755 index df2d76b46c..0000000000 --- a/images/tf_operator/build_and_push.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/bin/bash -set -e - -SRC_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" -ROOT_DIR=${SRC_DIR}/../../ - -. ${ROOT_DIR}/config.sh - -# The image tag is based on the githash. -GITHASH=$(git rev-parse --short HEAD) -CHANGES=$(git diff-index --quiet HEAD -- || echo "untracked") -if [ -n "$CHANGES" ]; then - # Get the hash of the diff. - DIFFHASH=$(git diff | sha256sum) - DIFFHASH=${DIFFHASH:0:7} - GITHASH=${GITHASH}-dirty-${DIFFHASH} -fi -IMAGE=${REGISTRY}/tf_operator:${GITHASH} - -DIR=`mktemp -d` -echo Use ${DIR} as context -go install github.com/jlewi/mlkube.io/cmd/tf_operator -go install github.com/jlewi/mlkube.io/test/e2e -cp ${GOPATH}/bin/tf_operator ${DIR}/ -cp ${GOPATH}/bin/e2e ${DIR}/ -cp ${SRC_DIR}/Dockerfile ${DIR}/ -cp ${ROOT_DIR}/grpc_tensorflow_server/grpc_tensorflow_server.py ${DIR}/ - -docker build -t $IMAGE -f ${DIR}/Dockerfile ${DIR} -gcloud docker -- push $IMAGE -echo pushed $IMAGE From 7176834f050cc5b58ae71d363b3649f9243e7b95 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Tue, 17 Oct 2017 16:32:34 -0400 Subject: [PATCH 12/14] fix build_and_push --- images/tf_operator/build_and_push.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/images/tf_operator/build_and_push.py b/images/tf_operator/build_and_push.py index b20774d4d0..efb83de7d9 100755 --- a/images/tf_operator/build_and_push.py +++ b/images/tf_operator/build_and_push.py @@ -13,8 +13,9 @@ def GetGitHash(): # The image tag is based on the githash. - git_hash = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]) + git_hash = subprocess.check_output(["git", "rev-parse", "--short", "HEAD"]).decode("utf-8") git_hash=git_hash.strip() + modified_files = subprocess.check_output(["git", "ls-files", "--modified"]) untracked_files = subprocess.check_output( ["git", "ls-files", "--others", "--exclude-standard"]) @@ -23,7 +24,8 @@ def GetGitHash(): sha = hashlib.sha256() sha.update(diff) diffhash = sha.hexdigest()[0:7] - git_hash = "{0}-dirty-{1}".format(git_hash, diffhash) + git_hash = "{0}-dirty-{1}".format(git_hash, diffhash) + return git_hash def run(command, cwd=None): @@ -51,6 +53,8 @@ def run(command, cwd=None): help="Use Google Container Builder to build the image.") parser.add_argument("--no-gcb", dest="use_gcb", action="store_false", help="Use Docker to build the image.") + parser.add_argument("--no-push", dest="should_push", action="store_false", + help="Push the image once build is finished.") parser.set_defaults(feature=False) args = parser.parse_args() @@ -84,6 +88,7 @@ def run(command, cwd=None): "images/tf_operator/Dockerfile", os.path.join(go_path, "bin/tf_operator"), os.path.join(go_path, "bin/e2e"), + "grpc_tensorflow_server/grpc_tensorflow_server.py" ] for s in sources: @@ -106,5 +111,7 @@ def run(command, cwd=None): else: run(["docker", "build", "-t", image, context_dir]) logging.info("Built image: %s", image) + + if args.should_push: run(["gcloud", "docker", "--", "push", image]) logging.info("Pushed image: %s", image) From 9a62ee96cabfeb0a10fba304a634771325501e82 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Wed, 18 Oct 2017 17:22:46 -0400 Subject: [PATCH 13/14] implement reviewer comments --- pkg/spec/controller.go | 2 +- pkg/spec/tf_job.go | 2 +- pkg/trainer/replicas.go | 36 ++++++++++++------------------------ pkg/trainer/training.go | 4 ---- 4 files changed, 14 insertions(+), 30 deletions(-) diff --git a/pkg/spec/controller.go b/pkg/spec/controller.go index 3a6c4412db..446a989f85 100644 --- a/pkg/spec/controller.go +++ b/pkg/spec/controller.go @@ -6,7 +6,7 @@ type ControllerConfig struct { // e.g. alpha.kubernetes.io/nvidia-gpu Accelerators map[string]AcceleratorConfig - // Path to the file containing the grpc server sources + // Path to the file containing the grpc server source GrpcServerFilePath string } diff --git a/pkg/spec/tf_job.go b/pkg/spec/tf_job.go index 6878cfb065..63ce7ffd7e 100644 --- a/pkg/spec/tf_job.go +++ b/pkg/spec/tf_job.go @@ -80,7 +80,7 @@ type ContainerName string const ( TENSORFLOW ContainerName = "tensorflow" - DefaultTFImage = "tensorflow/tensorflow:latest" + DefaultTFImage = "tensorflow/tensorflow:1.3.0" ) // TODO(jlewi): We probably want to add a name field. This would allow us to have more than 1 type of each worker. diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index ebdab16d82..20b93e4db7 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -1,7 +1,6 @@ package trainer import ( - "bytes" "encoding/json" "errors" "fmt" @@ -104,28 +103,17 @@ func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { } sort.Strings(keys) - var buf bytes.Buffer - isFirstJob := true - for _, k := range keys { - if !isFirstJob { - //separator between different job kinds - buf.WriteString(",") - } - isFirstJob = false - buf.WriteString(k) - //separator between job name and it's element - buf.WriteString("|") - isFirstElement := true - for _, e := range clusterSpec[k] { - if !isFirstElement { - //separator between different elements with same job type - buf.WriteString(";") - } - isFirstElement = false - buf.WriteString(e) + jobs := []string{} + for _, jobType := range keys { + hosts := []string{} + for _, h := range clusterSpec[jobType] { + hosts = append(hosts, h) } + s := jobType + "|" + strings.Join(hosts, ";") + jobs = append(jobs, s) } - return buf.String() + + return strings.Join(jobs, ",") } func (s *TFReplicaSet) Create(config *spec.ControllerConfig) error { @@ -136,7 +124,7 @@ func (s *TFReplicaSet) Create(config *spec.ControllerConfig) error { log.Errorf("Error building PS ConfigMap: %v", err) return err } - _, err = s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Create(cm) + _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.Metadata.Namespace).Create(cm) if err != nil { log.Errorf("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) return err @@ -334,13 +322,13 @@ func (s *TFReplicaSet) Delete() error { } // If the ConfigMap for the default parameter server exists, we delete it - _, err = s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{}) + _, err = s.ClientSet.CoreV1().ConfigMaps(s.Job.job.Metadata.Namespace).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{}) if err != nil { if !k8sutil.IsKubernetesResourceNotFoundError(err) { log.Errorf("Error deleting ConfigMap %v; %v", s.defaultPSConfigMapName(), err) } } else { - s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{}) + s.ClientSet.CoreV1().ConfigMaps(s.Job.job.Metadata.Namespace).Delete(s.defaultPSConfigMapName(), &meta_v1.DeleteOptions{}) } if failures { diff --git a/pkg/trainer/training.go b/pkg/trainer/training.go index 96e007a4e2..d40009480e 100644 --- a/pkg/trainer/training.go +++ b/pkg/trainer/training.go @@ -24,10 +24,6 @@ import ( "k8s.io/client-go/pkg/api/v1" ) -const ( - NAMESPACE string = "default" -) - var ( reconcileInterval = 8 * time.Second ) From 2c32e0b5f235e35cffd4dfe64f37d7ac7f2f8439 Mon Sep 17 00:00:00 2001 From: William Buchwalter Date: Thu, 19 Oct 2017 09:30:36 -0400 Subject: [PATCH 14/14] Do not push with GCB --- images/tf_operator/build_and_push.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/images/tf_operator/build_and_push.py b/images/tf_operator/build_and_push.py index f560ec179f..449606b321 100755 --- a/images/tf_operator/build_and_push.py +++ b/images/tf_operator/build_and_push.py @@ -117,11 +117,11 @@ def run(command, cwd=None): "--tag=" + image, "--project=" + args.project ]) else: run(["docker", "build", "-t", image, context_dir]) - logging.info("Built image: %s", image) - - if args.should_push: - run(["gcloud", "docker", "--", "push", image]) - logging.info("Pushed image: %s", image) + logging.info("Built image: %s", image) + + if args.should_push: + run(["gcloud", "docker", "--", "push", image]) + logging.info("Pushed image: %s", image) if args.output: logging.info("Writing build information to %s", args.output)