diff --git a/images/tf_operator/Dockerfile b/images/tf_operator/Dockerfile index f6bb84a052..50a0048b10 100644 --- a/images/tf_operator/Dockerfile +++ b/images/tf_operator/Dockerfile @@ -1,8 +1,12 @@ # TODO(jlewi): How can we make it work with golang:1.8.2-alpine FROM golang:1.8.2 -RUN mkdir -p /opt/cmle -COPY tf_operator /opt/cmle -RUN chmod a+x /opt/cmle/tf_operator +RUN mkdir -p /opt/mlkube +RUN mkdir -p /opt/mlkube/test +COPY tf_operator /opt/mlkube +COPY e2e /opt/mlkube/test +RUN chmod a+x /opt/mlkube/tf_operator +RUN chmod a+x /opt/mlkube/test/e2e + # TODO(jlewi): Reduce log level. -ENTRYPOINT ["/opt/cmle/tf_operator", "-alsologtostderr"] \ No newline at end of file +ENTRYPOINT ["/opt/mlkube/tf_operator", "-alsologtostderr"] \ No newline at end of file diff --git a/images/tf_operator/build_and_push.sh b/images/tf_operator/build_and_push.sh index 4ced85e912..f7ad33b4ca 100755 --- a/images/tf_operator/build_and_push.sh +++ b/images/tf_operator/build_and_push.sh @@ -11,7 +11,9 @@ IMAGE=${REGISTRY}/tf_operator:latest DIR=`mktemp -d` echo Use ${DIR} as context go install mlkube.io/cmd/tf_operator +go install mlkube.io/test/e2e cp ${GOPATH}/bin/tf_operator ${DIR}/ +cp ${GOPATH}/bin/e2e ${DIR}/ cp ${SRC_DIR}/Dockerfile ${DIR}/ docker build -t $IMAGE -f ${DIR}/Dockerfile ${DIR} diff --git a/pkg/util/k8sutil/tpr_util.go b/pkg/util/k8sutil/tpr_util.go index ff0932a2cc..890e038caa 100644 --- a/pkg/util/k8sutil/tpr_util.go +++ b/pkg/util/k8sutil/tpr_util.go @@ -120,6 +120,15 @@ func listTfJobsURI(ns string) string { return fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural) } +func (c *TfJobRestClient) Create(ns string, j *spec.TfJob) (*spec.TfJob, error) { + uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural) + b, err := c.restcli.Post().RequestURI(uri).Body(j).DoRaw() + if err != nil { + return nil, err + } + return readOutTfJob(b) +} + func (c *TfJobRestClient) Get(ns, name string) (*spec.TfJob, error) { uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural, name) b, err := c.restcli.Get().RequestURI(uri).DoRaw() @@ -138,6 +147,13 @@ func (c *TfJobRestClient) Update(ns string, j *spec.TfJob) (*spec.TfJob, error) return readOutTfJob(b) } + +func (c *TfJobRestClient) Delete(ns, name string) (error) { + uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural, name) + _, err := c.restcli.Delete().RequestURI(uri).DoRaw() + return err +} + func readOutTfJob(b []byte) (*spec.TfJob, error) { cluster := &spec.TfJob{} if err := json.Unmarshal(b, cluster); err != nil { diff --git a/test/e2e/main.go b/test/e2e/main.go new file mode 100644 index 0000000000..e1837c2c24 --- /dev/null +++ b/test/e2e/main.go @@ -0,0 +1,213 @@ +// e2e provides an E2E test for TfJobs. +// +// The test creates TfJobs and runs various checks to ensure various operations work as intended. +// The test is intended to run as a helm test that ensures the TfJob operator is working correctly. +// Thus, the program returns non-zero exit status on error. +// +// TODO(jlewi): Do we need to make the test output conform to the TAP(https://testanything.org/) +// protocol so we can fit into the K8s dashboard +// +package main + +import ( + "mlkube.io/pkg/util/k8sutil" + log "github.com/golang/glog" + "mlkube.io/pkg/spec" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "mlkube.io/pkg/util" + "github.com/gogo/protobuf/proto" + "k8s.io/client-go/pkg/api/v1" + "flag" + "time" + "strings" + "fmt" + k8s_errors "k8s.io/apimachinery/pkg/api/errors" + "os" +) + +const ( + Namespace = "default" +) + +var ( + image = flag.String("image", "gcr.io/tf-on-k8s-dogfood/tf_sample:latest", "The Docker image to use with the TfJob.") +) + +func run() error { + kubeCli := k8sutil.MustNewKubeClient() + tfJobClient, err := k8sutil.NewTfJobClient() + if err != nil { + return err + } + + name := "e2e-test-job-" + util.RandString(4) + + original := &spec.TfJob{ + Metadata: metav1.ObjectMeta{ + Name: name, + Labels: map[string] string { + "test.mlkube.io": "", + }, + }, + Spec: spec.TfJobSpec{ + ReplicaSpecs: []*spec.TfReplicaSpec{ + { + Replicas: proto.Int32(1), + TfPort: proto.Int32(2222), + TfReplicaType: spec.MASTER, + Template: &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "tensorflow", + Image: *image, + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + }, + }, + { + Replicas: proto.Int32(1), + TfPort: proto.Int32(2222), + TfReplicaType: spec.PS, + Template: &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "tensorflow", + Image: *image, + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + }, + }, + { + Replicas: proto.Int32(1), + TfPort: proto.Int32(2222), + TfReplicaType: spec.WORKER, + Template: &v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "tensorflow", + Image: *image, + }, + }, + RestartPolicy: v1.RestartPolicyOnFailure, + }, + }, + }, + }, + }, + } + + _, err = tfJobClient.Create(Namespace, original) + + if err != nil { + return err + } + + // Wait for the job to complete for up to 2 minutes. + var tfJob *spec.TfJob + for endTime := time.Now().Add(2 * time.Minute); time.Now().Before(endTime); { + tfJob, err = tfJobClient.Get(Namespace, name) + if err != nil { + log.Warningf("There was a problem getting TfJob: %v; error %v", name, err) + } + + if tfJob.Status.State == spec.StateSucceeded || tfJob.Status.State == spec.StateFailed { + break + } + log.Infof("Waiting for job %v to finish", name) + time.Sleep(5 * time.Second) + } + + if tfJob == nil { + return fmt.Errorf("Failed to get TfJob %v", name) + } + + if tfJob.Status.State != spec.StateSucceeded { + // TODO(jlewi): Should we clean up the job. + return fmt.Errorf("TfJob %v did not succeed;\n %v", name, util.Pformat(tfJob)) + } + + if tfJob.Spec.RuntimeId == "" { + return fmt.Errorf("TfJob %v doesn't have a RuntimeId", name) + } + + // Loop over each replica and make sure the expected resources were created. + for _, r := range original.Spec.ReplicaSpecs { + baseName := strings.ToLower(string(r.TfReplicaType)) + + for i := 0; i < int(*r.Replicas); i += 1 { + jobName := fmt.Sprintf("%v-%v-%v", baseName, tfJob.Spec.RuntimeId, i) + + _, err := kubeCli.BatchV1().Jobs(Namespace).Get(jobName, metav1.GetOptions{}) + + if err != nil { + return fmt.Errorf("Tfob %v did not create Job %v for ReplicaType %v Index %v", name, jobName, r.TfReplicaType, i) + } + } + } + + // Delete the job and make sure all subresources are properly garbage collected. + if err := tfJobClient.Delete(Namespace, name); err != nil { + log.Fatal("Failed to delete TfJob %v; error %v", name, err) + } + + // Define sets to keep track of Job controllers corresponding to Replicas + // that still exist. + jobs := make(map[string] bool) + + // Loop over each replica and make sure the expected resources are being deleted. + for _, r := range original.Spec.ReplicaSpecs { + baseName := strings.ToLower(string(r.TfReplicaType)) + + for i := 0; i < int(*r.Replicas); i += 1 { + jobName := fmt.Sprintf("%v-%v-%v", baseName, tfJob.Spec.RuntimeId, i) + + jobs[jobName] = true + } + } + + // Wait for all jobs to be deleted. + for endTime := time.Now().Add(5 * time.Minute); time.Now().Before(endTime) && len(jobs) >0; { + for k := range jobs { + _, err := kubeCli.BatchV1().Jobs(Namespace).Get(k, metav1.GetOptions{}) + if k8s_errors.IsNotFound(err) { + // Deleting map entry during loop is safe. + // See: https://stackoverflow.com/questions/23229975/is-it-safe-to-remove-selected-keys-from-golang-map-within-a-range-loop + delete(jobs, k) + } else { + log.Infof("Job %v still exists", k) + } + } + + if len(jobs) > 0 { + time.Sleep(5 * time.Second) + } + } + + if len(jobs) > 0 { + return fmt.Errorf("Not all Job controllers were successfully deleted.") + } + return nil +} + +func main() { + flag.Parse() + + err := run() + + // Generate TAP (https://testanything.org/) output + fmt.Println("1..1") + if err == nil { + fmt.Println("ok 1 - Successfully ran TfJob") + } else { + fmt.Println("not ok 1 - Running TfJob failed %v", err) + // Exit with non zero exit code for Helm tests. + os.Exit(1) + } +} diff --git a/tf-job-chart/templates/tests/basic-test-config.yaml b/tf-job-chart/templates/tests/basic-test-config.yaml deleted file mode 100644 index 7a8526b694..0000000000 --- a/tf-job-chart/templates/tests/basic-test-config.yaml +++ /dev/null @@ -1,30 +0,0 @@ -# This ConfigMap is used by the basic-test helm chart to define the python script to use to run the tests. -# -# TODO(jlewi): Is it a common convention to use a ConfigMap to define the tests? I think one advantage of this -# approach is that you don't have to push the test code anywhere. If we pulled down the python file from somewhere -# else (e.g. github or as a Docker image) we'd have to push the code somewhere first. -# However, the test however, is already pulling tf_job.yaml from github so already there is some mismatch between -# the code and the test. The helm package also doesn't deploy the TfJob operator. So arguably we already have to -# build and deploy various artifacts in order to run the test and we can probably reuse those mechanisms to deploy -# the actual python test files. -apiVersion: v1 -kind: ConfigMap -metadata: - name: tfjob-tests -data: - run.py: |- - #! /usr/bin/python - from subprocess import call - def test_trivial(): - assert "a" == "a" - - def test_create(): - # TODO(jlewi): This is just an initial hack. The job is deleted in case there is a previous run lying around. - # delete will return an error if the resource doesn't exist. - # A better solution is probably to give job a unique id so that different runs don't interfere. - return_code = call("kubectl delete -f https://raw.githubusercontent.com/jlewi/mlkube.io/master/examples/tf_job.yaml", shell=True) - - return_code = call("kubectl create -f https://raw.githubusercontent.com/jlewi/mlkube.io/master/examples/tf_job.yaml", shell=True) - assert(return_code == 0) - - # more tests here diff --git a/tf-job-chart/templates/tests/basic-test.yaml b/tf-job-chart/templates/tests/basic-test.yaml index 78235a14c3..437cf7a5b3 100644 --- a/tf-job-chart/templates/tests/basic-test.yaml +++ b/tf-job-chart/templates/tests/basic-test.yaml @@ -10,28 +10,9 @@ metadata: spec: containers: - name: basic-test - # TODO(jlewi): Should we use an IMAGE that contains the relevant python test code? The example (i.e. the - # TensorFlow code used by examples/tf_job.yaml) is already pushed to a registry and therefore not the code - # pulled from the source tree. - image: python:latest - command: ["/bin/sh","-c"] - # TODO(jlewi): We download kubectl because the test uses kubectl to submit TfJobs to be used in the tests. - # We should probably use the Python API in the test (or maybe switch to go?) and then we don't need to - # download kubectl. - args: ["wget -NP /usr/bin https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl - && chmod 755 /usr/bin/kubectl - && pip install nose-tap - && nosetests --with-tap /tests/run.py"] - volumeMounts: - - mountPath: /tests - name: tests - readOnly: true - - mountPath: /tools - name: tools - volumes: - - name: tests - configMap: - name: tfjob-tests - - name: tools - emptyDir: {} + image: gcr.io/tf-on-k8s-dogfood/tf_operator:latest + command: + - /opt/mlkube/test/e2e + - -alsologtostderr + - -v=1 restartPolicy: Never diff --git a/tf-job-chart/templates/tf_job_operator_deployment.yaml b/tf-job-chart/templates/tf_job_operator_deployment.yaml index 570c62b729..2bac2429c2 100644 --- a/tf-job-chart/templates/tf_job_operator_deployment.yaml +++ b/tf-job-chart/templates/tf_job_operator_deployment.yaml @@ -13,7 +13,7 @@ spec: - name: tf-job-operator image: gcr.io/tf-on-k8s-dogfood/tf_operator:latest command: - - /opt/cmle/tf_operator + - /opt/mlkube/tf_operator - -alsologtostderr - -v=1 env: