Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A more thorough E2E test. #2

Merged
merged 3 commits into from
Jul 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions images/tf_operator/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# TODO(jlewi): How can we make it work with golang:1.8.2-alpine
FROM golang:1.8.2

RUN mkdir -p /opt/cmle
COPY tf_operator /opt/cmle
RUN chmod a+x /opt/cmle/tf_operator
RUN mkdir -p /opt/mlkube
RUN mkdir -p /opt/mlkube/test
COPY tf_operator /opt/mlkube
COPY e2e /opt/mlkube/test
RUN chmod a+x /opt/mlkube/tf_operator
RUN chmod a+x /opt/mlkube/test/e2e

# TODO(jlewi): Reduce log level.
ENTRYPOINT ["/opt/cmle/tf_operator", "-alsologtostderr"]
ENTRYPOINT ["/opt/mlkube/tf_operator", "-alsologtostderr"]
2 changes: 2 additions & 0 deletions images/tf_operator/build_and_push.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ IMAGE=${REGISTRY}/tf_operator:latest
DIR=`mktemp -d`
echo Use ${DIR} as context
go install mlkube.io/cmd/tf_operator
go install mlkube.io/test/e2e
cp ${GOPATH}/bin/tf_operator ${DIR}/
cp ${GOPATH}/bin/e2e ${DIR}/
cp ${SRC_DIR}/Dockerfile ${DIR}/

docker build -t $IMAGE -f ${DIR}/Dockerfile ${DIR}
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/k8sutil/tpr_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ func listTfJobsURI(ns string) string {
return fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural)
}

func (c *TfJobRestClient) Create(ns string, j *spec.TfJob) (*spec.TfJob, error) {
uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural)
b, err := c.restcli.Post().RequestURI(uri).Body(j).DoRaw()
if err != nil {
return nil, err
}
return readOutTfJob(b)
}

func (c *TfJobRestClient) Get(ns, name string) (*spec.TfJob, error) {
uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural, name)
b, err := c.restcli.Get().RequestURI(uri).DoRaw()
Expand All @@ -138,6 +147,13 @@ func (c *TfJobRestClient) Update(ns string, j *spec.TfJob) (*spec.TfJob, error)
return readOutTfJob(b)
}


func (c *TfJobRestClient) Delete(ns, name string) (error) {
uri := fmt.Sprintf("/apis/%s/%s/namespaces/%s/%s/%s", spec.TPRGroup, spec.TPRVersion, ns, spec.TPRKindPlural, name)
_, err := c.restcli.Delete().RequestURI(uri).DoRaw()
return err
}

func readOutTfJob(b []byte) (*spec.TfJob, error) {
cluster := &spec.TfJob{}
if err := json.Unmarshal(b, cluster); err != nil {
Expand Down
213 changes: 213 additions & 0 deletions test/e2e/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// e2e provides an E2E test for TfJobs.
//
// The test creates TfJobs and runs various checks to ensure various operations work as intended.
// The test is intended to run as a helm test that ensures the TfJob operator is working correctly.
// Thus, the program returns non-zero exit status on error.
//
// TODO(jlewi): Do we need to make the test output conform to the TAP(https://testanything.org/)
// protocol so we can fit into the K8s dashboard
//
package main

import (
"mlkube.io/pkg/util/k8sutil"
log "github.com/golang/glog"
"mlkube.io/pkg/spec"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"mlkube.io/pkg/util"
"github.com/gogo/protobuf/proto"
"k8s.io/client-go/pkg/api/v1"
"flag"
"time"
"strings"
"fmt"
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
"os"
)

const (
Namespace = "default"
)

var (
image = flag.String("image", "gcr.io/tf-on-k8s-dogfood/tf_sample:latest", "The Docker image to use with the TfJob.")
)

func run() error {
kubeCli := k8sutil.MustNewKubeClient()
tfJobClient, err := k8sutil.NewTfJobClient()
if err != nil {
return err
}

name := "e2e-test-job-" + util.RandString(4)

original := &spec.TfJob{
Metadata: metav1.ObjectMeta{
Name: name,
Labels: map[string] string {
"test.mlkube.io": "",
},
},
Spec: spec.TfJobSpec{
ReplicaSpecs: []*spec.TfReplicaSpec{
{
Replicas: proto.Int32(1),
TfPort: proto.Int32(2222),
TfReplicaType: spec.MASTER,
Template: &v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "tensorflow",
Image: *image,
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
{
Replicas: proto.Int32(1),
TfPort: proto.Int32(2222),
TfReplicaType: spec.PS,
Template: &v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "tensorflow",
Image: *image,
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
{
Replicas: proto.Int32(1),
TfPort: proto.Int32(2222),
TfReplicaType: spec.WORKER,
Template: &v1.PodTemplateSpec{
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "tensorflow",
Image: *image,
},
},
RestartPolicy: v1.RestartPolicyOnFailure,
},
},
},
},
},
}

_, err = tfJobClient.Create(Namespace, original)

if err != nil {
return err
}

// Wait for the job to complete for up to 2 minutes.
var tfJob *spec.TfJob
for endTime := time.Now().Add(2 * time.Minute); time.Now().Before(endTime); {
tfJob, err = tfJobClient.Get(Namespace, name)
if err != nil {
log.Warningf("There was a problem getting TfJob: %v; error %v", name, err)
}

if tfJob.Status.State == spec.StateSucceeded || tfJob.Status.State == spec.StateFailed {
break
}
log.Infof("Waiting for job %v to finish", name)
time.Sleep(5 * time.Second)
}

if tfJob == nil {
return fmt.Errorf("Failed to get TfJob %v", name)
}

if tfJob.Status.State != spec.StateSucceeded {
// TODO(jlewi): Should we clean up the job.
return fmt.Errorf("TfJob %v did not succeed;\n %v", name, util.Pformat(tfJob))
}

if tfJob.Spec.RuntimeId == "" {
return fmt.Errorf("TfJob %v doesn't have a RuntimeId", name)
}

// Loop over each replica and make sure the expected resources were created.
for _, r := range original.Spec.ReplicaSpecs {
baseName := strings.ToLower(string(r.TfReplicaType))

for i := 0; i < int(*r.Replicas); i += 1 {
jobName := fmt.Sprintf("%v-%v-%v", baseName, tfJob.Spec.RuntimeId, i)

_, err := kubeCli.BatchV1().Jobs(Namespace).Get(jobName, metav1.GetOptions{})

if err != nil {
return fmt.Errorf("Tfob %v did not create Job %v for ReplicaType %v Index %v", name, jobName, r.TfReplicaType, i)
}
}
}

// Delete the job and make sure all subresources are properly garbage collected.
if err := tfJobClient.Delete(Namespace, name); err != nil {
log.Fatal("Failed to delete TfJob %v; error %v", name, err)
}

// Define sets to keep track of Job controllers corresponding to Replicas
// that still exist.
jobs := make(map[string] bool)

// Loop over each replica and make sure the expected resources are being deleted.
for _, r := range original.Spec.ReplicaSpecs {
baseName := strings.ToLower(string(r.TfReplicaType))

for i := 0; i < int(*r.Replicas); i += 1 {
jobName := fmt.Sprintf("%v-%v-%v", baseName, tfJob.Spec.RuntimeId, i)

jobs[jobName] = true
}
}

// Wait for all jobs to be deleted.
for endTime := time.Now().Add(5 * time.Minute); time.Now().Before(endTime) && len(jobs) >0; {
for k := range jobs {
_, err := kubeCli.BatchV1().Jobs(Namespace).Get(k, metav1.GetOptions{})
if k8s_errors.IsNotFound(err) {
// Deleting map entry during loop is safe.
// See: https://stackoverflow.com/questions/23229975/is-it-safe-to-remove-selected-keys-from-golang-map-within-a-range-loop
delete(jobs, k)
} else {
log.Infof("Job %v still exists", k)
}
}

if len(jobs) > 0 {
time.Sleep(5 * time.Second)
}
}

if len(jobs) > 0 {
return fmt.Errorf("Not all Job controllers were successfully deleted.")
}
return nil
}

func main() {
flag.Parse()

err := run()

// Generate TAP (https://testanything.org/) output
fmt.Println("1..1")
if err == nil {
fmt.Println("ok 1 - Successfully ran TfJob")
} else {
fmt.Println("not ok 1 - Running TfJob failed %v", err)
// Exit with non zero exit code for Helm tests.
os.Exit(1)
}
}
30 changes: 0 additions & 30 deletions tf-job-chart/templates/tests/basic-test-config.yaml

This file was deleted.

29 changes: 5 additions & 24 deletions tf-job-chart/templates/tests/basic-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,9 @@ metadata:
spec:
containers:
- name: basic-test
# TODO(jlewi): Should we use an IMAGE that contains the relevant python test code? The example (i.e. the
# TensorFlow code used by examples/tf_job.yaml) is already pushed to a registry and therefore not the code
# pulled from the source tree.
image: python:latest
command: ["/bin/sh","-c"]
# TODO(jlewi): We download kubectl because the test uses kubectl to submit TfJobs to be used in the tests.
# We should probably use the Python API in the test (or maybe switch to go?) and then we don't need to
# download kubectl.
args: ["wget -NP /usr/bin https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl
&& chmod 755 /usr/bin/kubectl
&& pip install nose-tap
&& nosetests --with-tap /tests/run.py"]
volumeMounts:
- mountPath: /tests
name: tests
readOnly: true
- mountPath: /tools
name: tools
volumes:
- name: tests
configMap:
name: tfjob-tests
- name: tools
emptyDir: {}
image: gcr.io/tf-on-k8s-dogfood/tf_operator:latest
command:
- /opt/mlkube/test/e2e
- -alsologtostderr
- -v=1
restartPolicy: Never
2 changes: 1 addition & 1 deletion tf-job-chart/templates/tf_job_operator_deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ spec:
- name: tf-job-operator
image: gcr.io/tf-on-k8s-dogfood/tf_operator:latest
command:
- /opt/cmle/tf_operator
- /opt/mlkube/tf_operator
- -alsologtostderr
- -v=1
env:
Expand Down