diff --git a/README.md b/README.md index e66b2cdf1a..e877e12e59 100644 --- a/README.md +++ b/README.md @@ -147,7 +147,6 @@ metadata: data: controller-config-file.yaml: | accelerators: - grpcServerFilePath: /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py alpha.kubernetes.io/nvidia-gpu: volumes: - name: # Desired name of the volume, ex: nvidia-libs @@ -405,8 +404,7 @@ metadata: spec: RuntimeId: 76no replicaSpecs: - - IsDefaultPS: false - replicas: 1 + - replicas: 1 template: metadata: creationTimestamp: null @@ -418,8 +416,7 @@ spec: restartPolicy: OnFailure tfPort: 2222 tfReplicaType: MASTER - - IsDefaultPS: false - replicas: 1 + - replicas: 1 template: metadata: creationTimestamp: null @@ -431,8 +428,7 @@ spec: restartPolicy: OnFailure tfPort: 2222 tfReplicaType: WORKER - - IsDefaultPS: true - replicas: 2 + - replicas: 2 template: metadata: creationTimestamp: null diff --git a/build/images/tf_operator/Dockerfile b/build/images/tf_operator/Dockerfile index d9965e1ccd..bc81b24697 100644 --- a/build/images/tf_operator/Dockerfile +++ b/build/images/tf_operator/Dockerfile @@ -6,7 +6,6 @@ RUN mkdir -p /opt/mlkube/test RUN mkdir -p /opt/tensorflow_k8s/dashboard/ COPY tf_operator /opt/mlkube COPY e2e /opt/mlkube/test -COPY grpc_tensorflow_server.py /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py COPY backend /opt/tensorflow_k8s/dashboard/ COPY build /opt/tensorflow_k8s/dashboard/frontend/build RUN chmod a+x /opt/mlkube/tf_operator @@ -14,4 +13,4 @@ RUN chmod a+x /opt/mlkube/test/e2e RUN chmod a+x /opt/tensorflow_k8s/dashboard/backend # TODO(jlewi): Reduce log level. -ENTRYPOINT ["/opt/mlkube/tf_operator", "-alsologtostderr"] \ No newline at end of file +ENTRYPOINT ["/opt/mlkube/tf_operator", "-alsologtostderr"] diff --git a/build/images/tf_operator/build_and_push.py b/build/images/tf_operator/build_and_push.py index 917bca5043..75f91b02f2 100755 --- a/build/images/tf_operator/build_and_push.py +++ b/build/images/tf_operator/build_and_push.py @@ -123,8 +123,7 @@ def main(): # pylint: disable=too-many-locals, too-many-statements os.path.join(go_path, "bin/tf_operator"), os.path.join(go_path, "bin/e2e"), os.path.join(go_path, "bin/backend"), - "dashboard/frontend/build", - "hack/grpc_tensorflow_server/grpc_tensorflow_server.py" + "dashboard/frontend/build" ] for s in sources: diff --git a/developer_guide.md b/developer_guide.md index 1560f91cc9..f6d7b8c7c3 100644 --- a/developer_guide.md +++ b/developer_guide.md @@ -121,20 +121,10 @@ export MY_POD_NAME=my-pod set the corresponding namespace for the resource. * TODO(jlewi): Do we still need to set MY_POD_NAME? Why? -Make a copy of `grpc_tensorflow_server.py` and create a config file named `controller-config-file.yaml`: - -```sh -cp hack/grpc_tensorflow_server/grpc_tensorflow_server.py /tmp/grpc_tensorflow_server.py - -cat > /tmp/controller-config-file.yaml << EOL -grpcServerFilePath: /tmp/grpc_tensorflow_server.py -EOL -``` - Now we are ready to run operator locally: ```sh -tf_operator -controller_config_file=/tmp/controller_config_file.yaml +tf_operator --logtostderr ``` The command creates a CRD `tfjobs` and block watching for creation of the resource kind. To verify local diff --git a/hack/grpc_tensorflow_server/grpc_tensorflow_server.py b/hack/grpc_tensorflow_server/grpc_tensorflow_server.py deleted file mode 100644 index c9959bc1b3..0000000000 --- a/hack/grpc_tensorflow_server/grpc_tensorflow_server.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/python -# Copyright 2016 The TensorFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ============================================================================== -""" -TODO: Once grpc_tensorflow_server.py is included in tensorflow -docker image we should use it instead. - -Python-based TensorFlow GRPC server. - -Takes input arguments cluster_spec, job_name and task_id, and start a blocking -TensorFlow GRPC server. - -Usage: - grpc_tensorflow_server.py --cluster_spec=SPEC --job_name=NAME --task_id=ID - -Where: - SPEC is (,)* - JOB is |(;)* - NAME is a valid job name ([a-z][0-9a-z]*) - HOST is a hostname or IP address - PORT is a port number -""" - -from __future__ import absolute_import, division, print_function - -import argparse -import sys - -from tensorflow.core.protobuf import config_pb2, tensorflow_server_pb2 # pylint: disable=no-name-in-module -from tensorflow.python.platform import app # pylint: disable=no-name-in-module -from tensorflow.python.training import server_lib # pylint: disable=no-name-in-module - - -def parse_cluster_spec(cluster_spec, cluster, verbose=False): - """Parse content of cluster_spec string and inject info into cluster protobuf. - - Args: - cluster_spec: cluster specification string, e.g., - "local|localhost:2222;localhost:2223" - cluster: cluster protobuf. - verbose: If verbose logging is requested. - - Raises: - ValueError: if the cluster_spec string is invalid. - """ - - job_strings = cluster_spec.split(",") - - if not cluster_spec: - raise ValueError("Empty cluster_spec string") - - for job_string in job_strings: - job_def = cluster.job.add() - - if job_string.count("|") != 1: - raise ValueError("Not exactly one instance of '|' in cluster_spec") - - job_name = job_string.split("|")[0] - - if not job_name: - raise ValueError("Empty job_name in cluster_spec") - - job_def.name = job_name - - if verbose: - print("Added job named \"%s\"" % job_name) - - job_tasks = job_string.split("|")[1].split(";") - for i in range(len(job_tasks)): # pylint: disable=consider-using-enumerate - if not job_tasks[i]: - raise ValueError("Empty task string at position %d" % i) - - job_def.tasks[i] = job_tasks[i] - - if verbose: - print(" Added task \"%s\" to job \"%s\"" % (job_tasks[i], job_name)) - - -def main(_): - # Create Protobuf ServerDef - server_def = tensorflow_server_pb2.ServerDef(protocol="grpc") - - # Cluster info - parse_cluster_spec(FLAGS.cluster_spec, server_def.cluster, FLAGS.verbose) - - # Job name - if not FLAGS.job_name: - raise ValueError("Empty job_name") - server_def.job_name = FLAGS.job_name - - # Task index - if FLAGS.task_id < 0: - raise ValueError("Invalid task_id: %d" % FLAGS.task_id) - server_def.task_index = FLAGS.task_id - - config = config_pb2.ConfigProto(gpu_options=config_pb2.GPUOptions( - per_process_gpu_memory_fraction=FLAGS.gpu_memory_fraction)) - - # Create GRPC Server instance - server = server_lib.Server(server_def, config=config) - - # join() is blocking, unlike start() - server.join() - - -if __name__ == "__main__": - parser = argparse.ArgumentParser() - parser.register("type", "bool", lambda v: v.lower() == "true") - parser.add_argument( - "--cluster_spec", - type=str, - default="", - help="""\ - Cluster spec: SPEC. SPEC is (,)*," JOB is - |(;)*," NAME is a valid job name - ([a-z][0-9a-z]*)," HOST is a hostname or IP address," PORT is a - port number." E.g., local|localhost:2222;localhost:2223, - ps|ps0:2222;ps1:2222\ - """ - ) - parser.add_argument( - "--job_name", - type=str, - default="", - help="Job name: e.g., local" - ) - parser.add_argument( - "--task_id", - type=int, - default=0, - help="Task index, e.g., 0" - ) - parser.add_argument( - "--gpu_memory_fraction", - type=float, - default=1.0, - help="Fraction of GPU memory allocated",) - parser.add_argument( - "--verbose", - type="bool", - nargs="?", - const=True, - default=False, - help="Verbose mode" - ) - - FLAGS, unparsed = parser.parse_known_args() - app.run(main=main, argv=[sys.argv[0]] + unparsed) diff --git a/pkg/apis/tensorflow/v1alpha1/defaults.go b/pkg/apis/tensorflow/v1alpha1/defaults.go index 2d6dce504f..39eda13a9e 100644 --- a/pkg/apis/tensorflow/v1alpha1/defaults.go +++ b/pkg/apis/tensorflow/v1alpha1/defaults.go @@ -2,7 +2,6 @@ package v1alpha1 import ( "github.com/golang/protobuf/proto" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -32,11 +31,6 @@ func SetDefaults_TFJob(obj *TFJob) { if r.Replicas == nil { r.Replicas = proto.Int32(Replicas) } - - //Set the default configuration for a PS server if the user didn't specify a PodTemplateSpec - if r.Template == nil && r.TFReplicaType == PS { - setDefault_PSPodTemplateSpec(r, c.TFImage) - } } if c.TerminationPolicy == nil { c.TerminationPolicy = &TerminationPolicySpec{ @@ -48,24 +42,3 @@ func SetDefaults_TFJob(obj *TFJob) { } } - -func setDefault_PSPodTemplateSpec(r *TFReplicaSpec, tfImage string) { - r.IsDefaultPS = true - r.Template = &v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - v1.Container{ - Image: tfImage, - Name: "tensorflow", - VolumeMounts: []v1.VolumeMount{ - v1.VolumeMount{ - Name: "ps-config-volume", - MountPath: "/ps-server", - }, - }, - }, - }, - RestartPolicy: v1.RestartPolicyOnFailure, - }, - } -} diff --git a/pkg/apis/tensorflow/v1alpha1/defaults_test.go b/pkg/apis/tensorflow/v1alpha1/defaults_test.go index 6cdc35d54c..fbc39fb5d4 100644 --- a/pkg/apis/tensorflow/v1alpha1/defaults_test.go +++ b/pkg/apis/tensorflow/v1alpha1/defaults_test.go @@ -78,27 +78,9 @@ func TestSetDefaults_TFJob(t *testing.T) { Spec: TFJobSpec{ ReplicaSpecs: []*TFReplicaSpec{ { - Replicas: proto.Int32(1), - TFPort: proto.Int32(2222), - Template: &v1.PodTemplateSpec{ - Spec: v1.PodSpec{ - Containers: []v1.Container{ - v1.Container{ - Image: "tensorflow/tensorflow:1.3.0", - Name: "tensorflow", - VolumeMounts: []v1.VolumeMount{ - v1.VolumeMount{ - Name: "ps-config-volume", - MountPath: "/ps-server", - }, - }, - }, - }, - RestartPolicy: v1.RestartPolicyOnFailure, - }, - }, + Replicas: proto.Int32(1), + TFPort: proto.Int32(2222), TFReplicaType: PS, - IsDefaultPS: true, }, }, TFImage: "tensorflow/tensorflow:1.3.0", diff --git a/pkg/apis/tensorflow/v1alpha1/types.go b/pkg/apis/tensorflow/v1alpha1/types.go index e48b576cd4..968a612fae 100644 --- a/pkg/apis/tensorflow/v1alpha1/types.go +++ b/pkg/apis/tensorflow/v1alpha1/types.go @@ -88,8 +88,6 @@ type TFReplicaSpec struct { // TFPort is the port to use for TF services. TFPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"` TFReplicaType `json:"tfReplicaType"` - // IsDefaultPS denotes if the parameter server should use the default grpc_tensorflow_server - IsDefaultPS bool } type TensorBoardSpec struct { diff --git a/pkg/apis/tensorflow/validation/validation.go b/pkg/apis/tensorflow/validation/validation.go index b910229d51..2914793a87 100644 --- a/pkg/apis/tensorflow/validation/validation.go +++ b/pkg/apis/tensorflow/validation/validation.go @@ -19,7 +19,7 @@ func ValidateTFJobSpec(c *tfv1.TFJobSpec) error { // Check that each replica has a TensorFlow container and a chief. for _, r := range c.ReplicaSpecs { found := false - if r.Template == nil && r.TFReplicaType != tfv1.PS { + if r.Template == nil { return fmt.Errorf("Replica is missing Template; %v", util.Pformat(r)) } diff --git a/pkg/trainer/replicas.go b/pkg/trainer/replicas.go index a76d66ad91..2942c07efb 100644 --- a/pkg/trainer/replicas.go +++ b/pkg/trainer/replicas.go @@ -4,8 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "io/ioutil" - "sort" "strings" "github.com/tensorflow/k8s/pkg/util/k8sutil" @@ -106,70 +104,7 @@ func (s *TFReplicaSet) Labels() KubernetesLabels { "tf_job_name": s.Job.job.ObjectMeta.Name}) } -// Transforms the tfconfig to work with grpc_tensorflow_server -func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string { - - // sort by keys to make unit testing easier - keys := []string{} - for k := range clusterSpec { - keys = append(keys, k) - } - sort.Strings(keys) - - jobs := []string{} - for _, jobType := range keys { - hosts := []string{} - hosts = append(hosts, clusterSpec[jobType]...) - s := jobType + "|" + strings.Join(hosts, ";") - jobs = append(jobs, s) - } - - return strings.Join(jobs, ",") -} - func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error { - if s.Spec.IsDefaultPS { - // Create the ConfigMap containing the sources for the default Parameter Server - err, cm := s.getDefaultPSConfigMap(config) - if err != nil { - log.Errorf("Error building PS ConfigMap: %v", err) - return err - } - createdCM, err := s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Create(cm) - if err != nil { - if k8s_errors.IsAlreadyExists(err) { - log.Infof("%v already exists.", createdCM.Name) - } else { - log.Errorf("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err) - return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating PS ConfigMap %v returned error.", createdCM.Name), err}) - } - } else { - s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created configmap: %v", createdCM.Name) - } - - // Update Volumes to include the ConfigMap containing grpc_tensorflow_server.py - name := "ps-config-volume" - hasVolume := false - for _, v := range s.Spec.Template.Spec.Volumes { - if v.Name == name { - hasVolume = true - break - } - } - if !hasVolume { - s.Spec.Template.Spec.Volumes = append(s.Spec.Template.Spec.Volumes, v1.Volume{ - Name: "ps-config-volume", - VolumeSource: v1.VolumeSource{ - ConfigMap: &v1.ConfigMapVolumeSource{ - LocalObjectReference: v1.LocalObjectReference{ - Name: s.defaultPSConfigMapName(), - }, - }, - }, - }) - } - } - for index := int32(0); index < *s.Spec.Replicas; index++ { taskLabels := s.Labels() taskLabels["task_index"] = fmt.Sprintf("%v", index) @@ -226,11 +161,6 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error { return err } - if s.Spec.IsDefaultPS { - cs := transformClusterSpecForDefaultPS(s.Job.ClusterSpec()) - s.Spec.Template.Spec.Containers[0].Command = []string{"python", "/ps-server/grpc_tensorflow_server.py", "--cluster_spec", cs, "--job_name", "ps", "--task_id", fmt.Sprintf("%v", index)} - } - // Make a copy of the template because we will modify it below. . newPodSpecTemplate := s.Spec.Template.DeepCopy() @@ -294,31 +224,6 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error { return nil } -// Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py) -// that will be used as default PS -func (s *TFReplicaSet) getDefaultPSConfigMap(config *tfv1alpha1.ControllerConfig) (error, *v1.ConfigMap) { - cm := &v1.ConfigMap{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: s.defaultPSConfigMapName(), - }, - Data: make(map[string]string), - } - - //grab server sources from files - filePaths := map[string]string{ - "grpc_tensorflow_server.py": config.GrpcServerFilePath, - } - for n, fp := range filePaths { - data, err := ioutil.ReadFile(fp) - if err != nil { - return err, nil - } - cm.Data[n] = string(data) - } - - return nil, cm -} - // Delete deletes the replicas func (s *TFReplicaSet) Delete() error { selector, err := s.Labels().ToSelector() diff --git a/pkg/trainer/replicas_test.go b/pkg/trainer/replicas_test.go index 8ce57f7be5..6add6b9f67 100644 --- a/pkg/trainer/replicas_test.go +++ b/pkg/trainer/replicas_test.go @@ -346,19 +346,3 @@ func TestTFReplicaSetStatusFromPodList(t *testing.T) { } } } - -func TestTransformClusterSpecForDefaultPS(t *testing.T) { - - cs := ClusterSpec{ - "master": {"master-0:2222"}, - "worker": {"worker-0:2222", "worker-1:2222"}, - "ps": {"localhost:2222", "ps-1:2222"}, - } - expected := "master|master-0:2222,ps|localhost:2222;ps-1:2222,worker|worker-0:2222;worker-1:2222" - - tx := transformClusterSpecForDefaultPS(cs) - - if tx != expected { - t.Errorf("transformClusterSpecForDefaultPS() expected: %v, received: %v", expected, tx) - } -} diff --git a/py/release.py b/py/release.py index 126dde358a..bd420ca0a6 100755 --- a/py/release.py +++ b/py/release.py @@ -155,8 +155,7 @@ def build_operator_image(root_dir, registry, project=None, should_push=True): os.path.join(go_path, "bin/tf_operator"), os.path.join(go_path, "bin/e2e"), os.path.join(go_path, "bin/backend"), - "dashboard/frontend/build", - "hack/grpc_tensorflow_server/grpc_tensorflow_server.py" + "dashboard/frontend/build" ] for s in sources: diff --git a/tf-job-operator-chart/templates/config.yaml b/tf-job-operator-chart/templates/config.yaml index c7c6c515ce..60a554e55b 100644 --- a/tf-job-operator-chart/templates/config.yaml +++ b/tf-job-operator-chart/templates/config.yaml @@ -8,7 +8,6 @@ metadata: data: controller-config-file.yaml: | tfImage: {{ .Values.tfImage }} - grpcServerFilePath: /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py accelerators: alpha.kubernetes.io/nvidia-gpu: volumes: @@ -20,7 +19,7 @@ data: hostPath: /usr/lib/nvidia-384/bin - name: libcuda mountPath: /usr/lib/x86_64-linux-gnu/libcuda.so.1 - hostPath: /usr/lib/x86_64-linux-gnu/libcuda.so.1 + hostPath: /usr/lib/x86_64-linux-gnu/libcuda.so.1 {{ else if eq $cloud "gke" }} apiVersion: v1 kind: ConfigMap @@ -29,5 +28,4 @@ metadata: data: controller-config-file.yaml: | tfImage: {{ .Values.tfImage }} - grpcServerFilePath: /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py -{{ end }} \ No newline at end of file +{{ end }}