Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameter Server: Run TF server by default #36

Merged
merged 18 commits into from
Oct 19, 2017
Merged

Parameter Server: Run TF server by default #36

merged 18 commits into from
Oct 19, 2017

Conversation

wbuchwalter
Copy link
Contributor

@wbuchwalter wbuchwalter commented Sep 12, 2017

First draft for #16.

  • I did not provide a default value for the new tfVersion. While we could default to latest or something else, I feel like it might be difficult to figure out where the issue comes from in case of mismatch between PS and workers version.
  • Currently the script is only building one image for every stable minor version starting from 1.1.0. Do we want to add more releases?

I will need to change the repository where the PS default image are pushed before merging.


This change is Reviewable

Copy link
Contributor

@jlewi jlewi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on!

@@ -0,0 +1,5 @@
ARG BASE_IMAGE=tensorflow/tensorflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize we'd have to build our own Docker images. I thought grpc_tensorflow_server.py was already included in the standard TensorFlow image. Might be worth filing an FR for that.

@@ -0,0 +1,22 @@
# A very simple parameter server that joins the server defined by the cluster spec passed as environment variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use grpc_tensorflow_server.py (just make a copy)? Ideally this binary is eventually part of the TensorFlow Docker image. So the less divergence from the repo the better.

@@ -0,0 +1,22 @@
#!/bin/bash
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid creating additional Docker images? What if we create a ConfigMap with grpc_tensorflow_server.py (or app.py) and then mount that into the PS container. I think that would allow us to us a stock TensorFlow image.

@@ -91,6 +92,8 @@ type TfReplicaSpec struct {
// TfPort is the port to use for TF services.
TfPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"`
TfReplicaType `json:"tfReplicaType"`
//TfVersion is only used when TfReplicaType == PS to automatically start a PS server
TfVersion string `json:"tfVersion,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a TfJobSpec just have an optional TfImage? This would be a Docker Image URI for a container with default binaries. Right now we have two "tensorboard" and "grpc_std_server"?

Now that I'm looking at the code I think its better if we can avoid adding a layer of indirection in the form of a mapping from Tf versions to Docker images.

@wbuchwalter
Copy link
Contributor Author

wbuchwalter commented Sep 13, 2017

@jlewi We can use https://hub.docker.com/r/tensorflow/tf_grpc_server/ instead of building our own images, but there are only basically two versions: 0.8.0 and latest last built a year ago.
Can I assume that a new image is only ever build when a breaking changes occur? Or is not really maintained?

@jlewi
Copy link
Contributor

jlewi commented Sep 13, 2017

Can I assume that a new image is only ever build when a breaking changes occur? Or is not really maintained?

@wbuchwalter I don't think so. I think those images are outdated and unlikely to be compatible with the latest versions of TF. I don't think TF provides a lot of cross version guarantees. So ideally all Docker images in the job are derived from the same TensorFlow docker image so they are using the same version of TF.

@wbuchwalter
Copy link
Contributor Author

@jlewi I implemented your suggestion and it's now using a ConfigMap to avoid having to maintain images (great idea btw).
Only downside is that the TF_CONFIG passed by tfjob as env variable is in json format, while grpc_tensorflow_server.py expects something like --cluster_spec worker|worker-74om-0:2222;worker-74om-1:2222,ps|ps-74om-0:2222;ps-74om-1:2222,master|master-74om-0:2222.
So I had to add a second python file to the ConfigMap that converts from json to this expected format and calls grpc_tensorflow_server.py with the correct argument.

Let me know if you are okay with this or if you thinks this is too brittle.

Copy link
Contributor

@jlewi jlewi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wbuchwalter Instead of doing the json to string conversion in Python can we do it in the controller? e.g. in the call to setDefaults you could just convert the spec to the format expected and pass it as an argument to the Python program.

# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Python-based TensorFlow GRPC server.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO indicating that we'd eventually like to get rid of this once grpc_tensorflow_server is included in the TF container.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@wbuchwalter
Copy link
Contributor Author

wbuchwalter commented Sep 15, 2017

@jlewi Since the setDefaults happens in the spec package even before the validation we don't know the clusterSpec at this stage (unless I missed something).
Another option would be to add a custom label to the default PS pod to identify it in setDefaults.
I could then modify the Create function in replicas.go to check if the current replicas we are creating has this label, and if yes do the appropriate conversions, and update the Command of the pod.
The advantage is that if/when TF official image includes grpc_tensorflow_server.py this approach would still work.
What do you think?

@jlewi
Copy link
Contributor

jlewi commented Sep 15, 2017

@wbuchwalter You're right thanks for reminding me. It seems like there are two approaches

  1. Refactor the logic for creating the ClusterSpec so that its available in setDefaults
  2. Defer specifying the Image and Command to Create in replicas.go

There are a couple things I don't like about doing it in Create.

  1. It creates inconsistencies between handling of PS and other replicas; i.e. other replicas are fully specified after setDefaults whereas for PS its delayed until create.
  2. I think the actual PodTemplateSpec for each replica should end up in the TfJobSpec (I think this helps debuggability)
  3. We could update the TfJobSpec in Create, but this would violate normal K8s conventions that spec is fixed when resource is created.

So I think my preference would be to figure out how to make cluster_spec available in setDefaults.
I think the issue is that to compute cluster_spec we need to know the port for each process which is one of the defaults that gets filled in.

One solution would be to do two passes over the replicas to fill in setDefaults. First pass would figure out the port to use for each process. Then for the second pass, we can compute cluster_spec and use that to fill out Command.

@wbuchwalter
Copy link
Contributor Author

Sorry for the delay responding to this, currently travelling.
After trying different things I can't really find a clean solution to this that wouldn't need a significant refactoring.
The main issue is that grpc_tensorflow_server also needs to be passed the task_id as argument in the Command.
So that means that every replica in the TFReplicaSet needs a different Command which can't really be achieved currently before the TFReplicaSet.Create() is called.

Another option would be to be able to specify a TF_CONFIG transform function in spec that would be called just before setting the environment variable on the replica. But I'm not sure this is really clean nor easily understandable.

Unless you have other feature in mind that could benefit from this mechanism, I think the current approach (doing the transform in python at runtime) still is cleaner albeit not ideal.

What do you think? Do you see another way of doing what you said that I missed?

@jlewi
Copy link
Contributor

jlewi commented Sep 22, 2017

Thanks for the explanation (I should have thought of that).

In that case, I think its fine to do the transformation in Create and set the command appropriately. I'd prefer not to introduce extra python code. I'd also prefer not to use PodLabels as a way of signaling to Create that it needs to set the command. Instead can we set a private variable on either TfReplicaSet or TfJob to indicate that Create needs to set the command appropriately?

@wbuchwalter
Copy link
Contributor Author

@jlewi Could you give it another look?
Thanks!

Copy link
Contributor

@jlewi jlewi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Only one major comment regarding whether we should create a ConfigMap for each job.

return watchVersion, nil
}

//Create a ConfigMap containing the source for a simple grpc server (pkg/controller/grpc_tensorflow_server.py)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Space after "//"

func (c *Controller) createPSConfigMap() error {
//If a ConfigMap with the same name already exists, it was created by an earlier operator
//we delete and recreate it in case the grpc_tensorflow_server.py was updated in the meantime
cm, err := c.KubeCli.CoreV1().ConfigMaps(c.Namespace).Get(spec.PSConfigMapName(), metav1.GetOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this cause problems? Suppose a job is happily running using the supplied config map. Now suppose the operator gets restarted. The operator would then try to delete and recreate the ConfigMap which seems problematic since some jobs could already be mounting it.

Here are some options I can think of for avoiding this.

  • Create a unique ConfigMap for every job
  • Do not delete the ConfigMap if it already exists; just reuse it.

I favor the first option because the ConfigMap is just a temporary solution until the TensorFlow container has the gRPC server binary built in. I think ConfigMap per job makes it more robust and I think this is better than the slight efficiency we get by reusing ConfigMaps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
1 ConfigMap per job would also mean specifying the volumes configuration for the default PS in the create phase (probably where I specify the Command) because the name of the ConfigMap won't be known in advance anymore and will rely on the RuntimeId.
Are you ok with that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

@@ -91,6 +96,10 @@ type TfReplicaSpec struct {
// TfPort is the port to use for TF services.
TfPort *int32 `json:"tfPort,omitempty" protobuf:"varint,1,opt,name=tfPort"`
TfReplicaType `json:"tfReplicaType"`
//TfImage is only used when TfReplicaType == PS to automatically start a PS server
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Space after "//" here and for other comments please.

@@ -75,7 +79,8 @@ const (
type ContainerName string

const (
TENSORFLOW ContainerName = "tensorflow"
TENSORFLOW ContainerName = "tensorflow"
PsDefaultImage = "wbuchwalter/mlkube-tensorflow-ps"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we default to a TensorFlow image now? Can we make this a flag to make it easily customizable?

Copy link
Contributor Author

@wbuchwalter wbuchwalter Sep 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this is something I forgot to remove from my first implementation (with custom images).
Now, if you want to use the default PS, you would declare a template like this:

- replicas: 2
  tfReplicaType: PS
  tfImage: tensorflow/tensorflow:1.3.0

So it's already easily customizable.
I chose not to set any default for tfImage because it case of version mismatch between the PS and the other nodes it might be quite hard for the users to understand where the issue comes from.
Let me know if you would rather have a default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So tfImage is specified at the Replica level? Why not make it a property of the job instead? This way we only have to specify it once and we can reuse it for the TensorBoard replica and the PS. If you want to specify a custom image at the Replica level you can already do that by filling out a PodTemplateSpec although it would be more verbose.

@@ -88,6 +90,32 @@ func (s *TFReplicaSet) Labels() KubernetesLabels {
"runtime_id": s.Job.job.Spec.RuntimeId})
}

// Transforms the tfconfig to work with grpc_tensorflow_server
func transformClusterSpecForDefaultPS(clusterSpec ClusterSpec) string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you write a unittest for this please?

// We do the appropriate transformations here
cs := transformClusterSpecForDefaultPS(s.Job.ClusterSpec())
s.Spec.Template.Spec.Containers[0].Command = []string{"python", "/ps-server/grpc_tensorflow_server.py"}
s.Spec.Template.Spec.Containers[0].Args = []string{"--cluster_spec", cs, "--job_name", "ps", "--task_id", fmt.Sprintf("%v", index)}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you split it into command and args rather than just using command?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No good reason, I will change this.


//grab server sources from files
filePaths := map[string]string{
"grpc_tensorflow_server.py": "./grpc_tensorflow_server/grpc_tensorflow_server.py",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make the path of grpc_tensorflow_server.py a flag that gets plumbed through?

Do you plan on updating the Dockerfile for the controller to include grpc_tensorflow_server.py?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure I get this right, do you want this to be a flag at the controller level, so users could specify a custom `grpc_tensorflow_server.py, or do you want a flag at the job level to customize where this file gets mounted in the pod?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking at the controller level not the job level. Right now the Docker image and controller both have to hardcode the same location of grpc_tensorflow_server.py. That seems brittle.

If you make it a flag in main.go that gets plumbed through, then the Docker image can put grpc_tensorflow_server.py anywhere in the image and we can just specify the location as an argument to main.py.

@jlewi
Copy link
Contributor

jlewi commented Sep 29, 2017

I think I responded to all your questions but let me know if I missed something.
Thanks for being persistent.

@wbuchwalter
Copy link
Contributor Author

wbuchwalter commented Sep 29, 2017

Hopefully it's starting to look better this time 😬
I still have to add unit tests and a few other small things, but some questions:

  1. I added the path to grpc_tensorflow_server.py. as part of the ControllerConfig and not as a separate flag, is that okay with you?
  2. Since this is part of the config, that means we now have to pass the controllerconfig down quite a few levels, should TrainingJob own a reference to the ControllerConfig instead?

Thanks for your help getting this right.

@jlewi
Copy link
Contributor

jlewi commented Oct 1, 2017

Sorry for the slow reply.

I added the path to grpc_tensorflow_server.py. as part of the ControllerConfig and not as a separate flag, is that okay with you?

Seems reasonable to me.

Since this is part of the config, that means we now have to pass the controllerconfig down quite a few levels, should TrainingJob own a reference to the ControllerConfig instead?

Seems reasonable as well.

@wbuchwalter
Copy link
Contributor Author

@jlewi Ready for review.

I also made two changes to the build_and_push.py script:

  • fix an issue with encoding (output of the command is bytes not string)
  • add a --no-push arg

Copy link
Contributor

@jlewi jlewi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Please consider filing an issue to add an E2E test to cover the case where we use default PS.

@@ -51,6 +53,8 @@ def run(command, cwd=None):
help="Use Google Container Builder to build the image.")
parser.add_argument("--no-gcb", dest="use_gcb", action="store_false",
help="Use Docker to build the image.")
parser.add_argument("--no-push", dest="should_push", action="store_false",
help="Push the image once build is finished.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you also need to define a "--push" argument with action "store_push"? and then do
parser.set_defaults(should_push=True)?

The help string for --no-push also looks incorrect.

Can you also pull in the latest changes that I just committed and resolve the conflict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArgParse automatically initialise the flag to the opposite value of the existing argument (so in this case true). I can still add --push if you want this to be explicit.
image

@@ -147,7 +148,7 @@ func (c *Controller) handleTfJobEvent(event *Event) error {
//NewJob(kubeCli kubernetes.Interface, job spec.TfJob, stopC <-chan struct{}, wg *sync.WaitGroup)

c.stopChMap[clus.Metadata.Name] = stopC
c.jobs[clus.Metadata.Namespace + "-" + clus.Metadata.Name] = nc
c.jobs[clus.Metadata.Namespace+"-"+clus.Metadata.Name] = nc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think there should be spaces after the plus signs. Maybe run gofmt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I add this spaces back and run gofmt, it will remove them.
I am running go 1.9.1, could you have a different go version with different gofmt rules?

@@ -5,6 +5,9 @@ type ControllerConfig struct {
// This should match the value specified as a container limit.
// e.g. alpha.kubernetes.io/nvidia-gpu
Accelerators map[string]AcceleratorConfig

// Path to the file containing the grpc server sources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit "sources" -> "source"

@@ -75,7 +79,8 @@ const (
type ContainerName string

const (
TENSORFLOW ContainerName = "tensorflow"
TENSORFLOW ContainerName = "tensorflow"
DefaultTFImage = "tensorflow/tensorflow:latest"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we pin the default image to a particular version of TF? e.g. 1.2, 1.3 etc...? We can upgrade the default when we release a new TfJob CRD.

My concern is that TensorFlow introduces a lot of breaking changes with each TF version. So if people are relying on the default TF version supplied by the CRD, their jobs could start breaking suddenly when TF releases a new version.


var buf bytes.Buffer
isFirstJob := true
for _, k := range keys {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer this code to be written as a series of joins so we don't have to have statements dealing with first character

so something like (without the syntax errors)

jobs = []string{}
for _, jobType := range keys {
   hosts := []string{}
  for _, h := range clusterSpec[jobType] {
     hosts = append(hosts, h)
   }
   s = jobType + "|" + strings.join(hosts, ";")
  jobs = append(jobs, s)
}
return strings.join(jobs, ",")

The code might not generate the correct result but hopefully it illustrates what I mean by using strings.join

log.Errorf("Error building PS ConfigMap: %v", err)
return err
}
_, err = s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Create(cm)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be using the namespace of the job as opposed to the constant NAMESPACE?
I think #39 added support for creating TfJobs in other namespaces.
It looks like all the other resources use s.Job.job.Metadata.Namespace for the namespace

Can you delete the constant "NAMESPACE" in training.go since it should be unused?

@@ -237,6 +333,16 @@ func (s *TFReplicaSet) Delete() error {
}
}

// If the ConfigMap for the default parameter server exists, we delete it
_, err = s.ClientSet.CoreV1().ConfigMaps(NAMESPACE).Get(s.defaultPSConfigMapName(), meta_v1.GetOptions{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update NAMESPACE as mentioned above.

@wbuchwalter
Copy link
Contributor Author

@jlewi Should be good for another round.

@jlewi
Copy link
Contributor

jlewi commented Oct 18, 2017

/test all

@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

Reviewed 1 of 8 files at r1, 4 of 14 files at r4, 3 of 11 files at r5, 1 of 1 files at r6, 2 of 4 files at r7.
Review status: 11 of 16 files reviewed at latest revision, 6 unresolved discussions, some commit checks failed.


images/tf_operator/build_and_push.py, line 57 at r5 (raw file):

Previously, wbuchwalter (William Buchwalter) wrote…

ArgParse automatically initialise the flag to the opposite value of the existing argument (so in this case true). I can still add --push if you want this to be explicit.
image

Great thanks.


pkg/controller/controller.go, line 151 at r5 (raw file):

Previously, wbuchwalter (William Buchwalter) wrote…

If I add this spaces back and run gofmt, it will remove them.
I am running go 1.9.1, could you have a different go version with different gofmt rules?

Thanks. I filed a PR to set up lint checks as part of our testing.


Comments from Reviewable

@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

LGTM. I'm going to try to fix the test though before I merge it.

@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

/test all

2 similar comments
@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

/test all

@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

/test all

@@ -112,6 +118,8 @@ def run(command, cwd=None):
else:
run(["docker", "build", "-t", image, context_dir])
logging.info("Built image: %s", image)

if args.should_push:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if args.should_push statement should be inside the else block. We only push the image if we aren't using GCB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed,

@wbuchwalter
Copy link
Contributor Author

/retest

@k8s-ci-robot
Copy link

@wbuchwalter: you can't request testing unless you are a kubernetes member.

In response to this:

/retest

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

@kubeflow kubeflow deleted a comment from k8s-ci-robot Oct 19, 2017
@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

/test

@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

/test all

@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

:lgtm:


Review status: 10 of 16 files reviewed at latest revision, 5 unresolved discussions.


Comments from Reviewable

@jlewi jlewi merged commit ca33d3d into kubeflow:master Oct 19, 2017
@jlewi
Copy link
Contributor

jlewi commented Oct 19, 2017

Woo Hoo!
Thanks @wbuchwalter

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants