Skip to content

Commit

Permalink
*: Move test util to separate package (kubeflow#666)
Browse files Browse the repository at this point in the history
* *: Refactor

Signed-off-by: Ce Gao <[email protected]>

* travis: Remove util from coverage test

Signed-off-by: Ce Gao <[email protected]>

* test: Add copyright holder

Signed-off-by: Ce Gao <[email protected]>

* *: Fix errors

Signed-off-by: Ce Gao <[email protected]>

* generator: Fix test

Signed-off-by: Ce Gao <[email protected]>

* pods: Add error handler

Signed-off-by: Ce Gao <[email protected]>

* testutil: Fix linting errors

Signed-off-by: Ce Gao <[email protected]>
  • Loading branch information
gaocegege authored and k8s-ci-robot committed Jun 15, 2018
1 parent 3b90d7f commit f2bbf41
Show file tree
Hide file tree
Showing 21 changed files with 509 additions and 343 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ script:
# For now though we just run all tests in pkg.
# And we can not use ** because goveralls uses filepath.Match
# to match ignore files and it does not support it.
- goveralls -service=travis-ci -v -package ./pkg/... -ignore "pkg/client/*/*.go,pkg/client/*/*/*.go,pkg/client/*/*/*/*.go,pkg/client/*/*/*/*/*.go,pkg/client/*/*/*/*/*/*.go,pkg/client/*/*/*/*/*/*/*.go,pkg/apis/tensorflow/*/zz_generated.*.go,pkg/apis/tensorflow/*/*_generated.go"
- goveralls -service=travis-ci -v -package ./pkg/... -ignore "pkg/client/*/*.go,pkg/client/*/*/*.go,pkg/client/*/*/*/*.go,pkg/client/*/*/*/*/*.go,pkg/client/*/*/*/*/*/*.go,pkg/client/*/*/*/*/*/*/*.go,pkg/util/testutil/*.go,pkg/apis/tensorflow/*/zz_generated.*.go,pkg/apis/tensorflow/*/*_generated.go"

notifications:
webhooks: https://www.travisbuddy.com/
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller.v2/controller_pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/kubernetes/pkg/controller"

tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
"github.com/kubeflow/tf-operator/pkg/generator"
train_util "github.com/kubeflow/tf-operator/pkg/util/train"
)

Expand Down Expand Up @@ -133,17 +134,17 @@ func (tc *TFJobController) createNewPod(tfjob *tfv1alpha2.TFJob, rt, index strin
}

// Create OwnerReference.
controllerRef := genOwnerReference(tfjob)
controllerRef := generator.GenOwnerReference(tfjob)

// Set type and index for the worker.
labels := genLabels(tfjobKey)
labels := generator.GenLabels(tfjobKey)
labels[tfReplicaTypeLabel] = rt
labels[tfReplicaIndexLabel] = index

podTemplate := spec.Template.DeepCopy()

// Set name for the template.
podTemplate.Name = genGeneralName(tfjob.Name, rt, index)
podTemplate.Name = generator.GenGeneralName(tfjob.Name, rt, index)

if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
Expand Down Expand Up @@ -228,7 +229,7 @@ func (tc *TFJobController) getPodsForTFJob(tfjob *tfv1alpha2.TFJob) ([]*v1.Pod,

// Create selector.
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: genLabels(tfjobKey),
MatchLabels: generator.GenLabels(tfjobKey),
})

if err != nil {
Expand Down
111 changes: 32 additions & 79 deletions pkg/controller.v2/controller_pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,68 +16,19 @@
package controller

import (
"fmt"
"testing"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/controller"

tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/tf-operator/pkg/generator"
"github.com/kubeflow/tf-operator/pkg/util/testutil"
)

func newBasePod(name string, tfJob *tfv1alpha2.TFJob, t *testing.T) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: genLabels(getKey(tfJob, t)),
Namespace: tfJob.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(tfJob, controllerKind)},
},
}
}

func newPod(tfJob *tfv1alpha2.TFJob, typ string, index int, t *testing.T) *v1.Pod {
pod := newBasePod(fmt.Sprintf("%s-%d", typ, index), tfJob, t)
pod.Labels[tfReplicaTypeLabel] = typ
pod.Labels[tfReplicaIndexLabel] = fmt.Sprintf("%d", index)
return pod
}

// create count pods with the given phase for the given tfJob
func newPodList(count int32, status v1.PodPhase, tfJob *tfv1alpha2.TFJob, typ string, start int32, t *testing.T) []*v1.Pod {
pods := []*v1.Pod{}
for i := int32(0); i < count; i++ {
newPod := newPod(tfJob, typ, int(start+i), t)
newPod.Status = v1.PodStatus{Phase: status}
pods = append(pods, newPod)
}
return pods
}

func setPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1alpha2.TFJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, t *testing.T) {
var index int32
for _, pod := range newPodList(pendingPods, v1.PodPending, tfJob, typ, index, t) {
podIndexer.Add(pod)
}
index += pendingPods
for _, pod := range newPodList(activePods, v1.PodRunning, tfJob, typ, index, t) {
podIndexer.Add(pod)
}
index += activePods
for _, pod := range newPodList(succeededPods, v1.PodSucceeded, tfJob, typ, index, t) {
podIndexer.Add(pod)
}
index += succeededPods
for _, pod := range newPodList(failedPods, v1.PodFailed, tfJob, typ, index, t) {
podIndexer.Add(pod)
}
}

func TestAddPod(t *testing.T) {
// Prepare the clientset and controller for the test.
kubeClientSet := kubeclientset.NewForConfigOrDie(&rest.Config{
Expand All @@ -95,14 +46,14 @@ func TestAddPod(t *testing.T) {
}
tfJobClientSet := tfjobclientset.NewForConfigOrDie(config)
ctr, _, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
ctr.tfJobInformerSynced = alwaysReady
ctr.podInformerSynced = alwaysReady
ctr.serviceInformerSynced = alwaysReady
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.podInformerSynced = testutil.AlwaysReady
ctr.serviceInformerSynced = testutil.AlwaysReady
tfJobIndexer := ctr.tfJobInformer.GetIndexer()

stopCh := make(chan struct{})
run := func(<-chan struct{}) {
ctr.Run(threadCount, stopCh)
ctr.Run(testutil.ThreadCount, stopCh)
}
go run(stopCh)

Expand All @@ -114,21 +65,21 @@ func TestAddPod(t *testing.T) {
return true, nil
}

tfJob := newTFJob(1, 0)
unstructured, err := convertTFJobToUnstructured(tfJob)
tfJob := testutil.NewTFJob(1, 0)
unstructured, err := generator.ConvertTFJobToUnstructured(tfJob)
if err != nil {
t.Errorf("Failed to convert the TFJob to Unstructured: %v", err)
}

if err := tfJobIndexer.Add(unstructured); err != nil {
t.Errorf("Failed to add tfjob to tfJobIndexer: %v", err)
}
pod := newPod(tfJob, labelWorker, 0, t)
pod := testutil.NewPod(tfJob, testutil.LabelWorker, 0, t)
ctr.addPod(pod)

syncChan <- "sync"
if key != getKey(tfJob, t) {
t.Errorf("Failed to enqueue the TFJob %s: expected %s, got %s", tfJob.Name, getKey(tfJob, t), key)
if key != testutil.GetKey(tfJob, t) {
t.Errorf("Failed to enqueue the TFJob %s: expected %s, got %s", tfJob.Name, testutil.GetKey(tfJob, t), key)
}
close(stopCh)
}
Expand All @@ -142,18 +93,18 @@ func TestClusterSpec(t *testing.T) {
}
testCase := []tc{
tc{
tfJob: newTFJob(1, 0),
tfJob: testutil.NewTFJob(1, 0),
rt: "worker",
index: "0",
expectedClusterSpec: `{"cluster":{"worker":["` + testTFJobName +
expectedClusterSpec: `{"cluster":{"worker":["` + testutil.TestTFJobName +
`-worker-0.default.svc.cluster.local:2222"]},"task":{"type":"worker","index":0}}`,
},
tc{
tfJob: newTFJob(1, 1),
tfJob: testutil.NewTFJob(1, 1),
rt: "worker",
index: "0",
expectedClusterSpec: `{"cluster":{"ps":["` + testTFJobName +
`-ps-0.default.svc.cluster.local:2222"],"worker":["` + testTFJobName +
expectedClusterSpec: `{"cluster":{"ps":["` + testutil.TestTFJobName +
`-ps-0.default.svc.cluster.local:2222"],"worker":["` + testutil.TestTFJobName +
`-worker-0.default.svc.cluster.local:2222"]},"task":{"type":"worker","index":0}}`,
},
}
Expand All @@ -177,7 +128,7 @@ func TestRestartPolicy(t *testing.T) {
}
testCase := []tc{
func() tc {
tfJob := newTFJob(1, 0)
tfJob := testutil.NewTFJob(1, 0)
specRestartPolicy := tfv1alpha2.RestartPolicyExitCode
tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].RestartPolicy = specRestartPolicy
return tc{
Expand All @@ -187,7 +138,7 @@ func TestRestartPolicy(t *testing.T) {
}
}(),
func() tc {
tfJob := newTFJob(1, 0)
tfJob := testutil.NewTFJob(1, 0)
specRestartPolicy := tfv1alpha2.RestartPolicyNever
tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].RestartPolicy = specRestartPolicy
return tc{
Expand All @@ -197,7 +148,7 @@ func TestRestartPolicy(t *testing.T) {
}
}(),
func() tc {
tfJob := newTFJob(1, 0)
tfJob := testutil.NewTFJob(1, 0)
specRestartPolicy := tfv1alpha2.RestartPolicyAlways
tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].RestartPolicy = specRestartPolicy
return tc{
Expand All @@ -207,7 +158,7 @@ func TestRestartPolicy(t *testing.T) {
}
}(),
func() tc {
tfJob := newTFJob(1, 0)
tfJob := testutil.NewTFJob(1, 0)
specRestartPolicy := tfv1alpha2.RestartPolicyOnFailure
tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].RestartPolicy = specRestartPolicy
return tc{
Expand All @@ -217,7 +168,7 @@ func TestRestartPolicy(t *testing.T) {
}
}(),
func() tc {
tfJob := newTFJob(1, 0)
tfJob := testutil.NewTFJob(1, 0)
specRestartPolicy := tfv1alpha2.RestartPolicy("")
tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].RestartPolicy = specRestartPolicy
return tc{
Expand Down Expand Up @@ -256,33 +207,33 @@ func TestExitCode(t *testing.T) {
ctr, kubeInformerFactory, _ := newTFJobController(config, kubeClientSet, tfJobClientSet, controller.NoResyncPeriodFunc)
fakePodControl := &controller.FakePodControl{}
ctr.podControl = fakePodControl
ctr.tfJobInformerSynced = alwaysReady
ctr.podInformerSynced = alwaysReady
ctr.serviceInformerSynced = alwaysReady
ctr.tfJobInformerSynced = testutil.AlwaysReady
ctr.podInformerSynced = testutil.AlwaysReady
ctr.serviceInformerSynced = testutil.AlwaysReady
tfJobIndexer := ctr.tfJobInformer.GetIndexer()
podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer()

stopCh := make(chan struct{})
run := func(<-chan struct{}) {
ctr.Run(threadCount, stopCh)
ctr.Run(testutil.ThreadCount, stopCh)
}
go run(stopCh)

ctr.updateStatusHandler = func(tfJob *tfv1alpha2.TFJob) error {
return nil
}

tfJob := newTFJob(1, 0)
tfJob := testutil.NewTFJob(1, 0)
tfJob.Spec.TFReplicaSpecs[tfv1alpha2.TFReplicaTypeWorker].RestartPolicy = tfv1alpha2.RestartPolicyExitCode
unstructured, err := convertTFJobToUnstructured(tfJob)
unstructured, err := generator.ConvertTFJobToUnstructured(tfJob)
if err != nil {
t.Errorf("Failed to convert the TFJob to Unstructured: %v", err)
}

if err := tfJobIndexer.Add(unstructured); err != nil {
t.Errorf("Failed to add tfjob to tfJobIndexer: %v", err)
}
pod := newPod(tfJob, labelWorker, 0, t)
pod := testutil.NewPod(tfJob, testutil.LabelWorker, 0, t)
pod.Status.Phase = v1.PodFailed
pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{})
pod.Status.ContainerStatuses = append(pod.Status.ContainerStatuses, v1.ContainerStatus{
Expand All @@ -294,8 +245,10 @@ func TestExitCode(t *testing.T) {
},
})

podIndexer.Add(pod)
_, err = ctr.syncTFJob(getKey(tfJob, t))
if err := podIndexer.Add(pod); err != nil {
t.Errorf("%s: unexpected error when adding pod %v", tfJob.Name, err)
}
_, err = ctr.syncTFJob(testutil.GetKey(tfJob, t))
if err != nil {
t.Errorf("%s: unexpected error when syncing jobs %v", tfJob.Name, err)
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/controller.v2/controller_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"

tfv1alpha2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1alpha2"
"github.com/kubeflow/tf-operator/pkg/generator"
)

// reconcileServices checks and updates services for each given TFReplicaSpec.
Expand Down Expand Up @@ -104,14 +105,14 @@ func (tc *TFJobController) createNewService(tfjob *tfv1alpha2.TFJob, rtype tfv1a
}

// Create OwnerReference.
controllerRef := genOwnerReference(tfjob)
controllerRef := generator.GenOwnerReference(tfjob)

// Append tfReplicaTypeLabel and tfReplicaIndexLabel labels.
labels := genLabels(tfjobKey)
labels := generator.GenLabels(tfjobKey)
labels[tfReplicaTypeLabel] = rt
labels[tfReplicaIndexLabel] = index

port, err := getPortFromTFJob(tfjob, rtype)
port, err := generator.GetPortFromTFJob(tfjob, rtype)
if err != nil {
return err
}
Expand All @@ -129,7 +130,7 @@ func (tc *TFJobController) createNewService(tfjob *tfv1alpha2.TFJob, rtype tfv1a
},
}

service.Name = genGeneralName(tfjob.Name, rt, index)
service.Name = generator.GenGeneralName(tfjob.Name, rt, index)
service.Labels = labels

err = tc.serviceControl.CreateServicesWithControllerRef(tfjob.Namespace, service, tfjob, controllerRef)
Expand Down Expand Up @@ -160,7 +161,7 @@ func (tc *TFJobController) getServicesForTFJob(tfjob *tfv1alpha2.TFJob) ([]*v1.S

// Create selector
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: genLabels(tfjobKey),
MatchLabels: generator.GenLabels(tfjobKey),
})

if err != nil {
Expand Down
Loading

0 comments on commit f2bbf41

Please sign in to comment.