Skip to content

Commit

Permalink
Deprecate IsDefaultPS
Browse files Browse the repository at this point in the history
  • Loading branch information
ScorpioCPH committed Jan 24, 2018
1 parent 74a958b commit deb4fc2
Show file tree
Hide file tree
Showing 12 changed files with 9 additions and 303 deletions.
10 changes: 3 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ metadata:
data:
controller-config-file.yaml: |
accelerators:
grpcServerFilePath: /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py
alpha.kubernetes.io/nvidia-gpu:
volumes:
- name: <volume-name> # Desired name of the volume, ex: nvidia-libs
Expand Down Expand Up @@ -405,8 +404,7 @@ metadata:
spec:
RuntimeId: 76no
replicaSpecs:
- IsDefaultPS: false
replicas: 1
- replicas: 1
template:
metadata:
creationTimestamp: null
Expand All @@ -418,8 +416,7 @@ spec:
restartPolicy: OnFailure
tfPort: 2222
tfReplicaType: MASTER
- IsDefaultPS: false
replicas: 1
- replicas: 1
template:
metadata:
creationTimestamp: null
Expand All @@ -431,8 +428,7 @@ spec:
restartPolicy: OnFailure
tfPort: 2222
tfReplicaType: WORKER
- IsDefaultPS: true
replicas: 2
- replicas: 2
template:
metadata:
creationTimestamp: null
Expand Down
3 changes: 1 addition & 2 deletions build/images/tf_operator/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ RUN mkdir -p /opt/mlkube/test
RUN mkdir -p /opt/tensorflow_k8s/dashboard/
COPY tf_operator /opt/mlkube
COPY e2e /opt/mlkube/test
COPY grpc_tensorflow_server.py /opt/mlkube/grpc_tensorflow_server/grpc_tensorflow_server.py
COPY backend /opt/tensorflow_k8s/dashboard/
COPY build /opt/tensorflow_k8s/dashboard/frontend/build
RUN chmod a+x /opt/mlkube/tf_operator
RUN chmod a+x /opt/mlkube/test/e2e
RUN chmod a+x /opt/tensorflow_k8s/dashboard/backend

# TODO(jlewi): Reduce log level.
ENTRYPOINT ["/opt/mlkube/tf_operator", "-alsologtostderr"]
ENTRYPOINT ["/opt/mlkube/tf_operator", "-alsologtostderr"]
3 changes: 1 addition & 2 deletions build/images/tf_operator/build_and_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ def main(): # pylint: disable=too-many-locals, too-many-statements
os.path.join(go_path, "bin/tf_operator"),
os.path.join(go_path, "bin/e2e"),
os.path.join(go_path, "bin/backend"),
"dashboard/frontend/build",
"hack/grpc_tensorflow_server/grpc_tensorflow_server.py"
"dashboard/frontend/build"
]

for s in sources:
Expand Down
12 changes: 1 addition & 11 deletions developer_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,10 @@ export MY_POD_NAME=my-pod
set the corresponding namespace for the resource.
* TODO(jlewi): Do we still need to set MY_POD_NAME? Why?

Make a copy of `grpc_tensorflow_server.py` and create a config file named `controller-config-file.yaml`:

```sh
cp hack/grpc_tensorflow_server/grpc_tensorflow_server.py /tmp/grpc_tensorflow_server.py

cat > /tmp/controller-config-file.yaml << EOL
grpcServerFilePath: /tmp/grpc_tensorflow_server.py
EOL
```

Now we are ready to run operator locally:

```sh
tf_operator -controller_config_file=/tmp/controller_config_file.yaml
tf_operator --logtostderr
```

The command creates a CRD `tfjobs` and block watching for creation of the resource kind. To verify local
Expand Down
160 changes: 0 additions & 160 deletions hack/grpc_tensorflow_server/grpc_tensorflow_server.py

This file was deleted.

1 change: 0 additions & 1 deletion pkg/apis/tensorflow/v1alpha1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func SetDefaults_TFJob(obj *TFJob) {
}

func setDefault_PSPodTemplateSpec(r *TFReplicaSpec, tfImage string) {
r.IsDefaultPS = true
r.Template = &v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
Expand Down
1 change: 0 additions & 1 deletion pkg/apis/tensorflow/v1alpha1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func TestSetDefaults_TFJob(t *testing.T) {
},
},
TFReplicaType: PS,
IsDefaultPS: true,
},
},
TFImage: "tensorflow/tensorflow:1.3.0",
Expand Down
2 changes: 0 additions & 2 deletions pkg/apis/tensorflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ type TFReplicaSpec struct {
// TFPort is the port to use for TF services.
TFPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"`
TFReplicaType `json:"tfReplicaType"`
// IsDefaultPS denotes if the parameter server should use the default grpc_tensorflow_server
IsDefaultPS bool
}

type TensorBoardSpec struct {
Expand Down
95 changes: 0 additions & 95 deletions pkg/trainer/replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"sort"
"strings"

"github.com/tensorflow/k8s/pkg/util/k8sutil"
Expand Down Expand Up @@ -106,70 +104,7 @@ func (s *TFReplicaSet) Labels() KubernetesLabels {
"tf_job_name": s.Job.job.ObjectMeta.Name})
}

// Transforms the tfconfig to work with grpc_tensorflow_server
func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string {

// sort by keys to make unit testing easier
keys := []string{}
for k := range clusterSpec {
keys = append(keys, k)
}
sort.Strings(keys)

jobs := []string{}
for _, jobType := range keys {
hosts := []string{}
hosts = append(hosts, clusterSpec[jobType]...)
s := jobType + "|" + strings.Join(hosts, ";")
jobs = append(jobs, s)
}

return strings.Join(jobs, ",")
}

func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error {
if s.Spec.IsDefaultPS {
// Create the ConfigMap containing the sources for the default Parameter Server
err, cm := s.getDefaultPSConfigMap(config)
if err != nil {
log.Errorf("Error building PS ConfigMap: %v", err)
return err
}
createdCM, err := s.ClientSet.CoreV1().ConfigMaps(s.Job.job.ObjectMeta.Namespace).Create(cm)
if err != nil {
if k8s_errors.IsAlreadyExists(err) {
log.Infof("%v already exists.", createdCM.Name)
} else {
log.Errorf("Error creating PS ConfigMap: %v, %v", cm.ObjectMeta.Name, err)
return k8sErrors.NewAggregate([]error{fmt.Errorf("Creating PS ConfigMap %v returned error.", createdCM.Name), err})
}
} else {
s.recorder.Eventf(s.Job.job, v1.EventTypeNormal, SuccessfulCreateReason, "Created configmap: %v", createdCM.Name)
}

// Update Volumes to include the ConfigMap containing grpc_tensorflow_server.py
name := "ps-config-volume"
hasVolume := false
for _, v := range s.Spec.Template.Spec.Volumes {
if v.Name == name {
hasVolume = true
break
}
}
if !hasVolume {
s.Spec.Template.Spec.Volumes = append(s.Spec.Template.Spec.Volumes, v1.Volume{
Name: "ps-config-volume",
VolumeSource: v1.VolumeSource{
ConfigMap: &v1.ConfigMapVolumeSource{
LocalObjectReference: v1.LocalObjectReference{
Name: s.defaultPSConfigMapName(),
},
},
},
})
}
}

for index := int32(0); index < *s.Spec.Replicas; index++ {
taskLabels := s.Labels()
taskLabels["task_index"] = fmt.Sprintf("%v", index)
Expand Down Expand Up @@ -226,11 +161,6 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error {
return err
}

if s.Spec.IsDefaultPS {
cs := transformClusterSpecForDefaultPS(s.Job.ClusterSpec())
s.Spec.Template.Spec.Containers[0].Command = []string{"python", "/ps-server/grpc_tensorflow_server.py", "--cluster_spec", cs, "--job_name", "ps", "--task_id", fmt.Sprintf("%v", index)}
}

// Make a copy of the template because we will modify it below. .
newPodSpecTemplate := s.Spec.Template.DeepCopy()

Expand Down Expand Up @@ -294,31 +224,6 @@ func (s *TFReplicaSet) Create(config *tfv1alpha1.ControllerConfig) error {
return nil
}

// Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py)
// that will be used as default PS
func (s *TFReplicaSet) getDefaultPSConfigMap(config *tfv1alpha1.ControllerConfig) (error, *v1.ConfigMap) {
cm := &v1.ConfigMap{
ObjectMeta: meta_v1.ObjectMeta{
Name: s.defaultPSConfigMapName(),
},
Data: make(map[string]string),
}

//grab server sources from files
filePaths := map[string]string{
"grpc_tensorflow_server.py": config.GrpcServerFilePath,
}
for n, fp := range filePaths {
data, err := ioutil.ReadFile(fp)
if err != nil {
return err, nil
}
cm.Data[n] = string(data)
}

return nil, cm
}

// Delete deletes the replicas
func (s *TFReplicaSet) Delete() error {
selector, err := s.Labels().ToSelector()
Expand Down
Loading

0 comments on commit deb4fc2

Please sign in to comment.