diff --git a/pkg/apis/mpi/v1/constants.go b/pkg/apis/mpi/v1/constants.go index d6cbf7b7e3..09b4b9c730 100644 --- a/pkg/apis/mpi/v1/constants.go +++ b/pkg/apis/mpi/v1/constants.go @@ -17,8 +17,6 @@ package v1 import common "github.com/kubeflow/common/pkg/apis/common/v1" const ( - // EnvKubeflowNamespace is ENV for kubeflow namespace specified by user. - EnvKubeflowNamespace = "KUBEFLOW_NAMESPACE" // DefaultPortName is name of the port used to communicate between Master and Workers. DefaultPortName = "mpi-port" // DefaultContainerName is the name of the MPIJob container. diff --git a/pkg/common/constants.go b/pkg/common/constants.go deleted file mode 100644 index 118f08e7f0..0000000000 --- a/pkg/common/constants.go +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License - -package common - -const ( - // EnvKubeflowNamespace is ENV for kubeflow namespace specified by user. - EnvKubeflowNamespace = "KUBEFLOW_NAMESPACE" -) diff --git a/pkg/common/util/client.go b/pkg/common/util/client.go index 3d36387cb5..eab26645af 100644 --- a/pkg/common/util/client.go +++ b/pkg/common/util/client.go @@ -15,18 +15,9 @@ package util import ( - "os" - "sigs.k8s.io/controller-runtime/pkg/client" ) -func HomeDir() string { - if h := os.Getenv("HOME"); h != "" { - return h - } - return os.Getenv("USERPROFILE") // windows -} - // TODO (Jeffwan@): Find an elegant way to either use delegatingReader or directly use clientss // GetDelegatingClientFromClient try to extract client reader from client, client // reader reads cluster info from api client. diff --git a/pkg/common/util/scheduler.go b/pkg/common/util/scheduler.go index 3385a2b4be..c8863fe714 100644 --- a/pkg/common/util/scheduler.go +++ b/pkg/common/util/scheduler.go @@ -33,3 +33,12 @@ func IsGangSchedulerSet(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, return false } + +func GetSchedulerName(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) string { + for _, spec := range replicas { + if len(spec.Template.Spec.SchedulerName) > 0 { + return spec.Template.Spec.SchedulerName + } + } + return "" +} diff --git a/pkg/common/util/util.go b/pkg/common/util/util.go index f635f48f4b..28150c5f90 100644 --- a/pkg/common/util/util.go +++ b/pkg/common/util/util.go @@ -50,11 +50,3 @@ func GetReplicaTypes(specs map[commonv1.ReplicaType]*commonv1.ReplicaSpec) []com } return keys } -func GetSchedulerName(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) string { - for _, spec := range replicas { - if len(spec.Template.Spec.SchedulerName) > 0 { - return spec.Template.Spec.SchedulerName - } - } - return "" -} diff --git a/pkg/common/util/v1/testutil/service.go b/pkg/common/util/v1/testutil/service.go index 327cfd5c9a..0fa81b148d 100644 --- a/pkg/common/util/v1/testutil/service.go +++ b/pkg/common/util/v1/testutil/service.go @@ -69,7 +69,7 @@ func NewServiceList(count int32, job metav1.Object, typ string, refs []metav1.Ow return services } -func SetServicesV2(client client.Client, job metav1.Object, typ string, activeWorkerServices int32, +func SetServices(client client.Client, job metav1.Object, typ string, activeWorkerServices int32, refs []metav1.OwnerReference, basicLabels map[string]string) { ctx := context.Background() for _, svc := range NewServiceList(activeWorkerServices, job, typ, refs) { diff --git a/pkg/common/util/v1/unstructured/informer.go b/pkg/common/util/v1/unstructured/informer.go deleted file mode 100644 index 6c976a1e33..0000000000 --- a/pkg/common/util/v1/unstructured/informer.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License - -// Package unstructured is the package for unstructured informer, -// which is from https://github.com/argoproj/argo/blob/master/util/unstructured/unstructured.go -// This is a temporary solution for https://github.com/kubeflow/training-operator/issues/561 -package unstructured - -import ( - "context" - "time" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/tools/cache" - - informer "github.com/kubeflow/training-operator/pkg/client/informers/externalversions/tensorflow/v1" - lister "github.com/kubeflow/training-operator/pkg/client/listers/tensorflow/v1" -) - -type UnstructuredInformer struct { - informer cache.SharedIndexInformer -} - -func NewTFJobInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) informer.TFJobInformer { - return &UnstructuredInformer{ - informer: newUnstructuredInformer(resource, client, namespace, resyncPeriod, indexers), - } -} - -func (f *UnstructuredInformer) Informer() cache.SharedIndexInformer { - return f.informer -} - -func (f *UnstructuredInformer) Lister() lister.TFJobLister { - return lister.NewTFJobLister(f.Informer().GetIndexer()) -} - -// newUnstructuredInformer constructs a new informer for Unstructured type. -// Always prefer using an informer factory to get a shared informer instead of getting an independent -// one. This reduces memory footprint and number of connections to the server. -func newUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { - return newFilteredUnstructuredInformer(resource, client, namespace, resyncPeriod, indexers) -} - -// newFilteredUnstructuredInformer constructs a new informer for Unstructured type. -// Always prefer using an informer factory to get a shared informer instead of getting an independent -// one. This reduces memory footprint and number of connections to the server. -func newFilteredUnstructuredInformer(resource schema.GroupVersionResource, client dynamic.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { - return cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return client.Resource(resource).Namespace(namespace).List(context.TODO(), options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return client.Resource(resource).Namespace(namespace).Watch(context.TODO(), options) - }, - }, - &unstructured.Unstructured{}, - resyncPeriod, - indexers, - ) -} diff --git a/pkg/config/config.go b/pkg/config/config.go index 18349ec435..5742729bd8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,3 +1,17 @@ +// Copyright 2021 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License + package config // Config is the global configuration for the training operator. diff --git a/pkg/controller.v1/register_controller.go b/pkg/controller.v1/register_controller.go index d70c067350..6ea0af0431 100644 --- a/pkg/controller.v1/register_controller.go +++ b/pkg/controller.v1/register_controller.go @@ -18,6 +18,8 @@ import ( "fmt" "strings" + "sigs.k8s.io/controller-runtime/pkg/manager" + mpiv1 "github.com/kubeflow/training-operator/pkg/apis/mpi/v1" mxnetv1 "github.com/kubeflow/training-operator/pkg/apis/mxnet/v1" pytorchv1 "github.com/kubeflow/training-operator/pkg/apis/pytorch/v1" @@ -28,7 +30,6 @@ import ( pytorchcontroller "github.com/kubeflow/training-operator/pkg/controller.v1/pytorch" tensorflowcontroller "github.com/kubeflow/training-operator/pkg/controller.v1/tensorflow" xgboostcontroller "github.com/kubeflow/training-operator/pkg/controller.v1/xgboost" - "sigs.k8s.io/controller-runtime/pkg/manager" ) const ErrTemplateSchemeNotSupported = "scheme %s is not supported yet" diff --git a/pkg/controller.v1/register_controller_test.go b/pkg/controller.v1/register_controller_test.go new file mode 100644 index 0000000000..1ff547fca9 --- /dev/null +++ b/pkg/controller.v1/register_controller_test.go @@ -0,0 +1,76 @@ +// Copyright 2022 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller_v1 + +import ( + mpiv1 "github.com/kubeflow/training-operator/pkg/apis/mpi/v1" + tfv1 "github.com/kubeflow/training-operator/pkg/apis/tensorflow/v1" + "testing" +) + +func TestEnabledSchemes(t *testing.T) { + testES := EnabledSchemes{} + + if testES.String() != "" { + t.Errorf("empty EnabledSchemes converted no-empty string %s", testES.String()) + } + + if !testES.Empty() { + t.Error("Empty method returned false for empty EnabledSchemes") + } + + if testES.Set("TFJob") != nil { + t.Error("failed to restore TFJob") + } else { + stored := false + for _, kind := range testES { + if kind == tfv1.Kind { + stored = true + } + } + if !stored { + t.Errorf("%s not successfully registered", tfv1.Kind) + } + } + + if testES.Set("mpijob") != nil { + t.Error("failed to restore PyTorchJob(pytorchjob)") + } else { + stored := false + for _, kind := range testES { + if kind == mpiv1.Kind { + stored = true + } + } + if !stored { + t.Errorf("%s not successfully registered", mpiv1.Kind) + } + } + + dummyJob := "dummyjob" + if testES.Set(dummyJob) == nil { + t.Errorf("successfully registerd non-supported job %s", dummyJob) + } + + if testES.Empty() { + t.Error("Empty method returned true for non-empty EnabledSchemes") + } + + es2 := EnabledSchemes{} + es2.FillAll() + if es2.Empty() { + t.Error("Empty method returned true for fully registered EnabledSchemes") + } +} diff --git a/pkg/controller.v1/tensorflow/job_test.go b/pkg/controller.v1/tensorflow/job_test.go index d407fb45fc..8c18eb8e5c 100644 --- a/pkg/controller.v1/tensorflow/job_test.go +++ b/pkg/controller.v1/tensorflow/job_test.go @@ -268,8 +268,8 @@ var _ = Describe("TFJob controller", func() { tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, refs, basicLabels) - testutil.SetServicesV2(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) - testutil.SetServicesV2(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) + testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) + testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) podList := &corev1.PodList{} Expect(testK8sClient.List(ctx, podList, listOpt)).Should(Succeed()) @@ -385,8 +385,8 @@ var _ = Describe("TFJob controller", func() { tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, refs, basicLabels) - testutil.SetServicesV2(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) - testutil.SetServicesV2(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) + testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) + testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) podList := &corev1.PodList{} Expect(testK8sClient.List(ctx, podList, listOpt)).Should(Succeed()) @@ -497,8 +497,8 @@ var _ = Describe("TFJob controller", func() { tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, tc.restartCounts, refs, basicLabels) - testutil.SetServicesV2(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) - testutil.SetServicesV2(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) + testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) + testutil.SetServices(testK8sClient, tc.tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) podList := &corev1.PodList{} Expect(testK8sClient.List(ctx, podList, listOpt)).Should(Succeed()) diff --git a/pkg/controller.v1/tensorflow/status_test.go b/pkg/controller.v1/tensorflow/status_test.go index 00963d90b5..cb77e28931 100644 --- a/pkg/controller.v1/tensorflow/status_test.go +++ b/pkg/controller.v1/tensorflow/status_test.go @@ -488,6 +488,8 @@ func setStatusForTest(tfJob *tfv1.TFJob, rtype commonv1.ReplicaType, failed, suc default: fmt.Println("wrong type") } + Expect(typ).ShouldNot(Equal("")) + refs := []metav1.OwnerReference{ *reconciler.GenOwnerReference(tfJob), } @@ -500,9 +502,11 @@ func setStatusForTest(tfJob *tfv1.TFJob, rtype commonv1.ReplicaType, failed, suc pod.Labels[k] = v } po := &corev1.Pod{} - _ = client.Create(ctx, pod) + Expect(client.Create(ctx, pod)).Should(Succeed()) + key := genKeyFromJob(pod) Eventually(func() error { + po = &corev1.Pod{} if err := client.Get(ctx, key, po); err != nil { return err } @@ -511,7 +515,7 @@ func setStatusForTest(tfJob *tfv1.TFJob, rtype commonv1.ReplicaType, failed, suc if worker0Completed == true && rtype == tfv1.TFReplicaTypeWorker && index == 0 { po.Status.ContainerStatuses = []corev1.ContainerStatus{ { - Name: tfv1.DefaultContainerName, + Name: reconciler.GetDefaultContainerName(), State: corev1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ ExitCode: int32(0), // exit with 0 @@ -528,16 +532,18 @@ func setStatusForTest(tfJob *tfv1.TFJob, rtype commonv1.ReplicaType, failed, suc index++ } + for i = 0; i < failed; i++ { pod := testutil.NewPod(tfJob, typ, index, refs) for k, v := range basicLabels { pod.Labels[k] = v } po := &corev1.Pod{} - _ = client.Create(ctx, pod) + Expect(client.Create(ctx, pod)).Should(Succeed()) + key := genKeyFromJob(pod) Eventually(func() error { - + po = &corev1.Pod{} if err := client.Get(ctx, key, po); err != nil { return err } @@ -547,7 +553,7 @@ func setStatusForTest(tfJob *tfv1.TFJob, rtype commonv1.ReplicaType, failed, suc if po.Status.ContainerStatuses == nil { po.Status.ContainerStatuses = []corev1.ContainerStatus{ { - Name: tfv1.DefaultContainerName, + Name: reconciler.GetDefaultContainerName(), State: corev1.ContainerState{ Terminated: &corev1.ContainerStateTerminated{ ExitCode: int32(130), // 130 is a retryable code @@ -564,6 +570,7 @@ func setStatusForTest(tfJob *tfv1.TFJob, rtype commonv1.ReplicaType, failed, suc updateJobReplicaStatuses(&tfJob.Status, rtype, po) index++ } + for i = 0; i < active; i++ { pod := testutil.NewPod(tfJob, typ, index, refs) for k, v := range basicLabels { @@ -571,8 +578,10 @@ func setStatusForTest(tfJob *tfv1.TFJob, rtype commonv1.ReplicaType, failed, suc } po := &corev1.Pod{} Expect(client.Create(ctx, pod)).Should(Succeed()) + key := genKeyFromJob(pod) Eventually(func() error { + po = &corev1.Pod{} if err := client.Get(ctx, key, po); err != nil { return err } diff --git a/pkg/controller.v1/tensorflow/tfjob_controller_test.go b/pkg/controller.v1/tensorflow/tfjob_controller_test.go index 76d9cd78fa..aeff119386 100644 --- a/pkg/controller.v1/tensorflow/tfjob_controller_test.go +++ b/pkg/controller.v1/tensorflow/tfjob_controller_test.go @@ -187,8 +187,8 @@ var _ = Describe("TFJob controller", func() { testutil.SetPodsStatuses(testK8sClient, tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, refs, basicLabels) testutil.SetPodsStatuses(testK8sClient, tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, refs, basicLabels) - testutil.SetServicesV2(testK8sClient, tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) - testutil.SetServicesV2(testK8sClient, tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) + testutil.SetServices(testK8sClient, tfJob, testutil.LabelWorker, tc.activeWorkerServices, refs, basicLabels) + testutil.SetServices(testK8sClient, tfJob, testutil.LabelPS, tc.activePSServices, refs, basicLabels) totalPodNumber := int(tc.pendingWorkerPods + tc.activeWorkerPods + tc.succeededWorkerPods + tc.failedWorkerPods + tc.pendingPSPods + tc.activePSPods + tc.succeededPSPods + tc.failedPSPods) totalServiceNumber := int(tc.activeWorkerServices + tc.activePSServices) diff --git a/pkg/version/version.go b/pkg/version/version.go deleted file mode 100644 index 1ca124e48b..0000000000 --- a/pkg/version/version.go +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2021 The Kubeflow Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License - -package version - -import ( - "fmt" - "os" - "runtime" -) - -var ( - Version = "v0.1.0-alpha" - GitSHA = "Not provided." -) - -// PrintVersionAndExit prints versions from the array returned by Info() and exit -func PrintVersionAndExit(apiVersion string) { - for _, i := range Info(apiVersion) { - fmt.Printf("%v\n", i) - } - os.Exit(0) -} - -// Info returns an array of various service versions -func Info(apiVersion string) []string { - return []string{ - fmt.Sprintf("API Version: %s", apiVersion), - fmt.Sprintf("Version: %s", Version), - fmt.Sprintf("Git SHA: %s", GitSHA), - fmt.Sprintf("Go Version: %s", runtime.Version()), - fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH), - } -}