Skip to content

Commit

Permalink
Merge pull request #2 from jlewi/testing
Browse files Browse the repository at this point in the history
A more thorough E2E test.
  • Loading branch information
jlewi authored Jul 23, 2017
2 parents d0d6644 + d398958 commit 052e178
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 59 deletions.
12 changes: 8 additions & 4 deletions images/tf_operator/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# TODO(jlewi): How can we make it work with golang:1.8.2-alpine
FROM golang:1.8.2

RUN mkdir -p /opt/cmle
COPY tf_operator /opt/cmle
RUN chmod a+x /opt/cmle/tf_operator
RUN mkdir -p /opt/mlkube
RUN mkdir -p /opt/mlkube/test
COPY tf_operator /opt/mlkube
COPY e2e /opt/mlkube/test
RUN chmod a+x /opt/mlkube/tf_operator
RUN chmod a+x /opt/mlkube/test/e2e

# TODO(jlewi): Reduce log level.
ENTRYPOINT ["/opt/cmle/tf_operator", "-alsologtostderr"]
ENTRYPOINT ["/opt/mlkube/tf_operator", "-alsologtostderr"]
2 changes: 2 additions & 0 deletions images/tf_operator/build_and_push.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ IMAGE=${REGISTRY}/tf_operator:latest
DIR=`mktemp -d`
echo Use ${DIR} as context
go install mlkube.io/cmd/tf_operator
go install mlkube.io/test/e2e
cp ${GOPATH}/bin/tf_operator ${DIR}/
cp ${GOPATH}/bin/e2e ${DIR}/
cp ${SRC_DIR}/Dockerfile ${DIR}/

docker build -t $IMAGE -f ${DIR}/Dockerfile ${DIR}
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/k8sutil/tpr_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ func listTfJobsURI(ns string) string {
return fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural)
}

func (c *TfJobRestClient) Create(ns string, j *spec.TfJob) (*spec.TfJob, error) {
uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural)
b, err := c.restcli.Post().RequestURI(uri).Body(j).DoRaw()
if err != nil {
return nil, err
}
return readOutTfJob(b)
}

func (c *TfJobRestClient) Get(ns, name string) (*spec.TfJob, error) {
uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural, name)
b, err := c.restcli.Get().RequestURI(uri).DoRaw()
Expand All @@ -138,6 +147,13 @@ func (c *TfJobRestClient) Update(ns string, j *spec.TfJob) (*spec.TfJob, error)
return readOutTfJob(b)
}


func (c *TfJobRestClient) Delete(ns, name string) (error) {
uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural, name)
_, err := c.restcli.Delete().RequestURI(uri).DoRaw()
return err
}

func readOutTfJob(b []byte) (*spec.TfJob, error) {
cluster := &spec.TfJob{}
if err := json.Unmarshal(b, cluster); err != nil {
Expand Down
213 changes: 213 additions & 0 deletions test/e2e/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// e2e provides an E2E test for TfJobs.
//
// The test creates TfJobs and runs various checks to ensure various operations work as intended.
// The test is intended to run as a helm test that ensures the TfJob operator is working correctly.
// Thus, the program returns non-zero exit status on error.
//
// TODO(jlewi): Do we need to make the test output conform to the TAP(https://testanything.org/)
// protocol so we can fit into the K8s dashboard
//
package main

import (
"mlkube.io/pkg/util/k8sutil"
log "github.com/golang/glog"
"mlkube.io/pkg/spec"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"mlkube.io/pkg/util"
"github.com/gogo/protobuf/proto"
"k8s.io/client-go/pkg/api/v1"
"flag"
"time"
"strings"
"fmt"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
"os"
)

const (
Namespace = "default"
)

var (
image = flag.String("image", "gcr.io/tf-on-k8s-dogfood/tf_sample:latest", "The Docker image to use with the TfJob.")
)

func run() error {
kubeCli := k8sutil.MustNewKubeClient()
tfJobClient, err := k8sutil.NewTfJobClient()
if err != nil {
return err
}

name := "e2e-test-job-" + util.RandString(4)

original := &spec.TfJob{
Metadata: metav1.ObjectMeta{
Name: name,
Labels: map[string] string {
"test.mlkube.io": "",
},
},
Spec: spec.TfJobSpec{
ReplicaSpecs: []*spec.TfReplicaSpec{
{
Replicas: proto.Int32(1),
TfPort: proto.Int32(2222),
TfReplicaType: spec.MASTER,
Template: &v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "tensorflow",
Image: *image,
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
{
Replicas: proto.Int32(1),
TfPort: proto.Int32(2222),
TfReplicaType: spec.PS,
Template: &v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "tensorflow",
Image: *image,
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
{
Replicas: proto.Int32(1),
TfPort: proto.Int32(2222),
TfReplicaType: spec.WORKER,
Template: &v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "tensorflow",
Image: *image,
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
},
},
}

_, err = tfJobClient.Create(Namespace, original)

if err != nil {
return err
}

// Wait for the job to complete for up to 2 minutes.
var tfJob *spec.TfJob
for endTime := time.Now().Add(2 * time.Minute); time.Now().Before(endTime); {
tfJob, err = tfJobClient.Get(Namespace, name)
if err != nil {
log.Warningf("There was a problem getting TfJob: %v; error %v", name, err)
}

if tfJob.Status.State == spec.StateSucceeded || tfJob.Status.State == spec.StateFailed {
break
}
log.Infof("Waiting for job %v to finish", name)
time.Sleep(5 * time.Second)
}

if tfJob == nil {
return fmt.Errorf("Failed to get TfJob %v", name)
}

if tfJob.Status.State != spec.StateSucceeded {
// TODO(jlewi): Should we clean up the job.
return fmt.Errorf("TfJob %v did not succeed;\n %v", name, util.Pformat(tfJob))
}

if tfJob.Spec.RuntimeId == "" {
return fmt.Errorf("TfJob %v doesn't have a RuntimeId", name)
}

// Loop over each replica and make sure the expected resources were created.
for _, r := range original.Spec.ReplicaSpecs {
baseName := strings.ToLower(string(r.TfReplicaType))

for i := 0; i < int(*r.Replicas); i += 1 {
jobName := fmt.Sprintf("%v-%v-%v", baseName, tfJob.Spec.RuntimeId, i)

_, err := kubeCli.BatchV1().Jobs(Namespace).Get(jobName, metav1.GetOptions{})

if err != nil {
return fmt.Errorf("Tfob %v did not create Job %v for ReplicaType %v Index %v", name, jobName, r.TfReplicaType, i)
}
}
}

// Delete the job and make sure all subresources are properly garbage collected.
if err := tfJobClient.Delete(Namespace, name); err != nil {
log.Fatal("Failed to delete TfJob %v; error %v", name, err)
}

// Define sets to keep track of Job controllers corresponding to Replicas
// that still exist.
jobs := make(map[string] bool)

// Loop over each replica and make sure the expected resources are being deleted.
for _, r := range original.Spec.ReplicaSpecs {
baseName := strings.ToLower(string(r.TfReplicaType))

for i := 0; i < int(*r.Replicas); i += 1 {
jobName := fmt.Sprintf("%v-%v-%v", baseName, tfJob.Spec.RuntimeId, i)

jobs[jobName] = true
}
}

// Wait for all jobs to be deleted.
for endTime := time.Now().Add(5 * time.Minute); time.Now().Before(endTime) && len(jobs) >0; {
for k := range jobs {
_, err := kubeCli.BatchV1().Jobs(Namespace).Get(k, metav1.GetOptions{})
if k8s_errors.IsNotFound(err) {
// Deleting map entry during loop is safe.
// See: https://stackoverflow.com/questions/23229975/is-it-safe-to-remove-selected-keys-from-golang-map-within-a-range-loop
delete(jobs, k)
} else {
log.Infof("Job %v still exists", k)
}
}

if len(jobs) > 0 {
time.Sleep(5 * time.Second)
}
}

if len(jobs) > 0 {
return fmt.Errorf("Not all Job controllers were successfully deleted.")
}
return nil
}

func main() {
flag.Parse()

err := run()

// Generate TAP (https://testanything.org/) output
fmt.Println("1..1")
if err == nil {
fmt.Println("ok 1 - Successfully ran TfJob")
} else {
fmt.Println("not ok 1 - Running TfJob failed %v", err)
// Exit with non zero exit code for Helm tests.
os.Exit(1)
}
}
30 changes: 0 additions & 30 deletions tf-job-chart/templates/tests/basic-test-config.yaml

This file was deleted.

29 changes: 5 additions & 24 deletions tf-job-chart/templates/tests/basic-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,9 @@ metadata:
spec:
containers:
- name: basic-test
# TODO(jlewi): Should we use an IMAGE that contains the relevant python test code? The example (i.e. the
# TensorFlow code used by examples/tf_job.yaml) is already pushed to a registry and therefore not the code
# pulled from the source tree.
image: python:latest
command: ["/bin/sh","-c"]
# TODO(jlewi): We download kubectl because the test uses kubectl to submit TfJobs to be used in the tests.
# We should probably use the Python API in the test (or maybe switch to go?) and then we don't need to
# download kubectl.
args: ["wget -NP /usr/bin https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl
&& chmod 755 /usr/bin/kubectl
&& pip install nose-tap
&& nosetests --with-tap /tests/run.py"]
volumeMounts:
- mountPath: /tests
name: tests
readOnly: true
- mountPath: /tools
name: tools
volumes:
- name: tests
configMap:
name: tfjob-tests
- name: tools
emptyDir: {}
image: gcr.io/tf-on-k8s-dogfood/tf_operator:latest
command:
- /opt/mlkube/test/e2e
- -alsologtostderr
- -v=1
restartPolicy: Never
2 changes: 1 addition & 1 deletion tf-job-chart/templates/tf_job_operator_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
- name: tf-job-operator
image: gcr.io/tf-on-k8s-dogfood/tf_operator:latest
command:
- /opt/cmle/tf_operator
- /opt/mlkube/tf_operator
- -alsologtostderr
- -v=1
env:
Expand Down

0 comments on commit 052e178

Please sign in to comment.