diff --git a/cmd/mpi-operator/main.go b/cmd/mpi-operator.v1alpha1/main.go similarity index 100% rename from cmd/mpi-operator/main.go rename to cmd/mpi-operator.v1alpha1/main.go diff --git a/cmd/mpi-operator.v1alpha2/app/options/options.go b/cmd/mpi-operator.v1alpha2/app/options/options.go new file mode 100644 index 00000000..d86851ec --- /dev/null +++ b/cmd/mpi-operator.v1alpha2/app/options/options.go @@ -0,0 +1,62 @@ +// Copyright 2019 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package options + +import ( + "flag" + + "k8s.io/api/core/v1" +) + +// ServerOption is the main context object for the controller manager. +type ServerOption struct { + Kubeconfig string + MasterURL string + KubectlDeliveryImage string + Threadiness int + PrintVersion bool + EnableGangScheduling bool + Namespace string +} + +// NewServerOption creates a new CMServer with a default config. +func NewServerOption() *ServerOption { + s := ServerOption{} + return &s +} + +// AddFlags adds flags for a specific CMServer to the specified FlagSet. +func (s *ServerOption) AddFlags(fs *flag.FlagSet) { + fs.StringVar(&s.MasterURL, "master", "", + `The url of the Kubernetes API server, + will overrides any value in kubeconfig, only required if out-of-cluster.`) + + fs.StringVar(&s.Kubeconfig, "kubeConfig", "", + "Path to a kubeConfig. Only required if out-of-cluster.") + + fs.StringVar(&s.KubectlDeliveryImage, "kubectl-delivery-image", "", + "The container image used to deliver the kubectl binary.") + + fs.StringVar(&s.Namespace, "namespace", v1.NamespaceAll, + `The namespace to monitor tfjobs. If unset, it monitors all namespaces cluster-wide. + If set, it only monitors tfjobs in the given namespace.`) + + fs.IntVar(&s.Threadiness, "threadiness", 2, + `How many threads to process the main logic`) + + fs.BoolVar(&s.PrintVersion, "version", false, "Show version and quit") + + fs.BoolVar(&s.EnableGangScheduling, "enable-gang-scheduling", false, "Set true to enable gang scheduling by kube-batch.") +} diff --git a/cmd/mpi-operator.v1alpha2/app/server.go b/cmd/mpi-operator.v1alpha2/app/server.go new file mode 100644 index 00000000..ba54414d --- /dev/null +++ b/cmd/mpi-operator.v1alpha2/app/server.go @@ -0,0 +1,115 @@ +// Copyright 2019 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "os" + + "github.com/golang/glog" + controllersv1alpha2 "github.com/kubeflow/mpi-operator/pkg/controllers/v1alpha2" + corev1 "k8s.io/api/core/v1" + kubeinformers "k8s.io/client-go/informers" + policyinformers "k8s.io/client-go/informers/policy/v1beta1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/sample-controller/pkg/signals" + + "github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app/options" + clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" + informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" + "github.com/kubeflow/mpi-operator/pkg/version" +) + +const ( + apiVersion = "v1alpha2" + RecommendedKubeConfigPathEnv = "KUBECONFIG" +) + +func Run(opt *options.ServerOption) error { + // Check if the -version flag was passed and, if so, print the version and exit. + if opt.PrintVersion { + version.PrintVersionAndExit(apiVersion) + } + + if opt.Namespace == corev1.NamespaceAll { + glog.Info("Using cluster scoped operator") + } else { + glog.Infof("Scoping operator to namespace %s", opt.Namespace) + } + + // To help debugging, immediately log version. + glog.Infof("%+v", version.Info(apiVersion)) + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + // Note: ENV KUBECONFIG will overwrite user defined Kubeconfig option. + if len(os.Getenv(RecommendedKubeConfigPathEnv)) > 0 { + // use the current context in kubeconfig + // This is very useful for running locally. + opt.Kubeconfig = os.Getenv(RecommendedKubeConfigPathEnv) + } + + cfg, err := clientcmd.BuildConfigFromFlags(opt.MasterURL, opt.Kubeconfig) + if err != nil { + glog.Fatalf("Error building kubeConfig: %s", err.Error()) + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + + kubeflowClient, err := clientset.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubeflow clientset: %s", err.Error()) + } + + var kubeInformerFactory kubeinformers.SharedInformerFactory + var kubeflowInformerFactory informers.SharedInformerFactory + if opt.Namespace == "" { + kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, 0) + kubeflowInformerFactory = informers.NewSharedInformerFactory(kubeflowClient, 0) + } else { + kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(opt.Namespace), nil) + kubeflowInformerFactory = informers.NewSharedInformerFactoryWithOptions(kubeflowClient, 0, informers.WithNamespace(opt.Namespace), nil) + } + + var pdbInformer policyinformers.PodDisruptionBudgetInformer + if opt.EnableGangScheduling { + pdbInformer = kubeInformerFactory.Policy().V1beta1().PodDisruptionBudgets() + } + controller := controllersv1alpha2.NewMPIJobController( + kubeClient, + kubeflowClient, + kubeInformerFactory.Core().V1().ConfigMaps(), + kubeInformerFactory.Core().V1().ServiceAccounts(), + kubeInformerFactory.Rbac().V1().Roles(), + kubeInformerFactory.Rbac().V1().RoleBindings(), + kubeInformerFactory.Apps().V1().StatefulSets(), + kubeInformerFactory.Batch().V1().Jobs(), + pdbInformer, + kubeflowInformerFactory.Kubeflow().V1alpha2().MPIJobs(), + opt.KubectlDeliveryImage, + opt.EnableGangScheduling) + + go kubeInformerFactory.Start(stopCh) + go kubeflowInformerFactory.Start(stopCh) + + if err = controller.Run(opt.Threadiness, stopCh); err != nil { + glog.Fatalf("Error running controller: %s", err.Error()) + } + return nil +} diff --git a/cmd/mpi-operator.v1alpha2/main.go b/cmd/mpi-operator.v1alpha2/main.go new file mode 100644 index 00000000..4d4deeca --- /dev/null +++ b/cmd/mpi-operator.v1alpha2/main.go @@ -0,0 +1,35 @@ +// Copyright 2018 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + + "github.com/golang/glog" + + "github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app" + "github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app/options" +) + +func main() { + s := options.NewServerOption() + s.AddFlags(flag.CommandLine) + + flag.Parse() + + if err := app.Run(s); err != nil { + glog.Fatalf("%v\n", err) + } +} diff --git a/deploy/0-crd.yaml b/deploy/crd/crd-v1alpha1.yaml similarity index 100% rename from deploy/0-crd.yaml rename to deploy/crd/crd-v1alpha1.yaml diff --git a/deploy/crd/crd-v1alpha2.yaml b/deploy/crd/crd-v1alpha2.yaml new file mode 100644 index 00000000..428c17a6 --- /dev/null +++ b/deploy/crd/crd-v1alpha2.yaml @@ -0,0 +1,36 @@ +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: mpijobs.kubeflow.org +spec: + group: kubeflow.org + version: v1alpha2 + scope: Namespaced + names: + plural: mpijobs + singular: mpijob + kind: MPIJob + shortNames: + - mj + - mpij + validation: + openAPIV3Schema: + properties: + spec: + properties: + slotsPerWorker: + type: integer + minimum: 1 + mpiReplicaSpecs: + properties: + Launcher: + properties: + replicas: + type: integer + minimum: 1 + maximum: 1 + Worker: + properties: + replicas: + type: integer + minimum: 1 diff --git a/examples/v1alpha2/tensorflow-benchmarks-custom.yaml b/examples/v1alpha2/tensorflow-benchmarks-custom.yaml new file mode 100644 index 00000000..15895ba5 --- /dev/null +++ b/examples/v1alpha2/tensorflow-benchmarks-custom.yaml @@ -0,0 +1,26 @@ +# This file shows how to run multi-node training benchmarks using an MPIJob, +# specifying GPUs explicitly per worker. +apiVersion: kubeflow.org/v1alpha2 +kind: MPIJob +metadata: + name: tensorflow-benchmarks-16-custom +spec: + slotsPerWorker: 4 + mpiReplicaSpecs: + Launcher: + replicas: 1 + template: + spec: + containers: + - image: mpioperator/tensorflow-benchmarks:latest + name: tensorflow-benchmarks + Worker: + replicas: 4 + template: + spec: + containers: + - image: mpioperator/tensorflow-benchmarks:latest + name: tensorflow-benchmarks + resources: + limits: + nvidia.com/gpu: 4 \ No newline at end of file diff --git a/examples/v1alpha2/tensorflow-benchmarks-imagenet.yaml b/examples/v1alpha2/tensorflow-benchmarks-imagenet.yaml new file mode 100644 index 00000000..cac1637f --- /dev/null +++ b/examples/v1alpha2/tensorflow-benchmarks-imagenet.yaml @@ -0,0 +1,55 @@ +# This file shows how to run multi-node training benchmarks using an MPIJob, +# using real ImageNet data stored in Amazon EFS. +# +# It assume the ImageNet tfrecord files are stored in an EFS directory under +# `/imagenet/{train,validation}`. +apiVersion: kubeflow.org/v1alpha2 +kind: MPIJob +metadata: + name: tensorflow-benchmarks-imagenet +spec: + mpiReplicaSpecs: + Launcher: + replicas: 1 + template: + spec: + containers: + - image: mpioperator/tensorflow-benchmarks:latest + name: tensorflow-benchmarks + command: + - mpirun + - python + - scripts/tf_cnn_benchmarks/tf_cnn_benchmarks.py + - --data_format=NCHW + - --batch_size=256 + - --model=resnet50 + - --optimizer=momentum + - --variable_update=horovod + - --nodistortions + - --gradient_repacking=8 + - --num_epochs=90 + - --weight_decay=1e-4 + - --data_dir=/efs/imagenet/train + - --use_fp16 + - --train_dir=/models/resnet50 + Worker: + replicas: 2 + template: + spec: + containers: + - image: mpioperator/tensorflow-benchmarks:latest + name: tensorflow-benchmarks + volumeMounts: + - mountPath: /efs + name: efs + - mountPath: /models + name: models + volumes: + - name: efs + nfs: + server: fs-ab134502.efs.us-west-2.amazonaws.com + path: / + readOnly: true + - name: models + emptyDir: {} + diff --git a/examples/v1alpha2/tensorflow-benchmarks.yaml b/examples/v1alpha2/tensorflow-benchmarks.yaml new file mode 100644 index 00000000..7eb1bc3e --- /dev/null +++ b/examples/v1alpha2/tensorflow-benchmarks.yaml @@ -0,0 +1,22 @@ +apiVersion: kubeflow.org/v1alpha2 +kind: MPIJob +metadata: + name: tensorflow-benchmarks-16 +spec: + slotsPerWorker: 1 + mpiReplicaSpecs: + Launcher: + replicas: 1 + template: + spec: + containers: + - image: mpioperator/tensorflow-benchmarks:latest + name: tensorflow-benchmarks + Worker: + replicas: 2 + template: + spec: + containers: + - image: mpioperator/tensorflow-benchmarks:latest + name: tensorflow-benchmarks + diff --git a/pkg/controllers/v1alpha2/mpi_job_controller.go b/pkg/controllers/v1alpha2/mpi_job_controller.go index b3e88f4f..cbbe6d76 100644 --- a/pkg/controllers/v1alpha2/mpi_job_controller.go +++ b/pkg/controllers/v1alpha2/mpi_job_controller.go @@ -130,12 +130,6 @@ type MPIJobController struct { // recorder is an event recorder for recording Event resources to the // Kubernetes API. recorder record.EventRecorder - // The maximum number of GPUs per node. - gpusPerNode int - // The maximum number of processing units per node. - processingUnitsPerNode int - // The processing resource name, e.g. "nvidia.com/gpu" or "cpu" - processingResourceType string // The container image used to deliver the kubectl binary. kubectlDeliveryImage string // Whether to enable gang scheduling by kube-batch @@ -167,6 +161,13 @@ func NewMPIJobController( eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + var pdbLister policylisters.PodDisruptionBudgetLister + var pdbSynced cache.InformerSynced + if enableGangScheduling { + pdbLister = pdbInformer.Lister() + pdbSynced = pdbInformer.Informer().HasSynced + } + controller := &MPIJobController{ kubeClient: kubeClient, kubeflowClient: kubeflowClient, @@ -182,8 +183,8 @@ func NewMPIJobController( statefulSetSynced: statefulSetInformer.Informer().HasSynced, jobLister: jobInformer.Lister(), jobSynced: jobInformer.Informer().HasSynced, - pdbLister: pdbInformer.Lister(), - pdbSynced: pdbInformer.Informer().HasSynced, + pdbLister: pdbLister, + pdbSynced: pdbSynced, mpiJobLister: mpiJobInformer.Lister(), mpiJobSynced: mpiJobInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), @@ -297,22 +298,23 @@ func NewMPIJobController( }, DeleteFunc: controller.handleObject, }) - pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: controller.handleObject, - UpdateFunc: func(old, new interface{}) { - newPolicy := new.(*policyv1beta1.PodDisruptionBudget) - oldPolicy := old.(*policyv1beta1.PodDisruptionBudget) - if newPolicy.ResourceVersion == oldPolicy.ResourceVersion { - // Periodic re-sync will send update events for all known PodDisruptionBudgets. - // Two different versions of the same Job will always have - // different RVs. - return - } - controller.handleObject(new) - }, - DeleteFunc: controller.handleObject, - }) - + if pdbInformer != nil { + pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newPolicy := new.(*policyv1beta1.PodDisruptionBudget) + oldPolicy := old.(*policyv1beta1.PodDisruptionBudget) + if newPolicy.ResourceVersion == oldPolicy.ResourceVersion { + // Periodic re-sync will send update events for all known PodDisruptionBudgets. + // Two different versions of the same Job will always have + // different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + } return controller } @@ -688,6 +690,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher mpiJobCopy := mpiJob.DeepCopy() if launcher != nil { + initializeMPIJobStatuses(mpiJobCopy, kubeflow.MPIReplicaTypeLauncher) now := metav1.Now() if launcher.Status.Active > 0 { mpiJobCopy.Status.ReplicaStatuses[kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher)].Active += 1 @@ -704,6 +707,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher } } if worker != nil { + initializeMPIJobStatuses(mpiJobCopy, kubeflow.MPIReplicaTypeWorker) now := metav1.Now() if worker.Status.ReadyReplicas > 0 { mpiJobCopy.Status.ReplicaStatuses[kubeflow.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = worker.Status.ReadyReplicas @@ -721,14 +725,21 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher // workerStatuses.Failed += workerStatuses.Failed // } } - // Until #38113 is merged, we must use Update instead of UpdateStatus to - // update the Status block of the MPIJob resource. UpdateStatus will not - // allow changes to the Spec of the resource, which is ideal for ensuring - // nothing other than resource status has been updated. - _, err := c.kubeflowClient.KubeflowV1alpha2().MPIJobs(mpiJob.Namespace).Update(mpiJobCopy) + + _, err := c.kubeflowClient.KubeflowV1alpha2().MPIJobs(mpiJob.Namespace).UpdateStatus(mpiJobCopy) return err } +// initializeMPIJobStatuses initializes the ReplicaStatuses for MPIJob. +func initializeMPIJobStatuses(mpiJob *kubeflow.MPIJob, mtype kubeflow.MPIReplicaType) { + replicaType := kubeflow.ReplicaType(mtype) + if mpiJob.Status.ReplicaStatuses == nil { + mpiJob.Status.ReplicaStatuses = make(map[kubeflow.ReplicaType]*kubeflow.ReplicaStatus) + } + + mpiJob.Status.ReplicaStatuses[replicaType] = &kubeflow.ReplicaStatus{} +} + // enqueueMPIJob takes a MPIJob resource and converts it into a namespace/name // string which is then put onto the work queue. This method should *not* be // passed resources of any type other than MPIJob. diff --git a/pkg/version/version.go b/pkg/version/version.go new file mode 100644 index 00000000..df4ffa4d --- /dev/null +++ b/pkg/version/version.go @@ -0,0 +1,43 @@ +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package version + +import ( + "fmt" + "os" + "runtime" +) + +var ( + Version = "v0.1.0-alpha" + GitSHA = "Not provided." +) + +// PrintVersionAndExit prints versions from the array returned by Info() and exit +func PrintVersionAndExit(apiVersion string) { + for _, i := range Info(apiVersion) { + fmt.Printf("%v\n", i) + } + os.Exit(0) +} + +// Info returns an array of various service versions +func Info(apiVersion string) []string { + return []string{ + fmt.Sprintf("API Version: %s", apiVersion), + fmt.Sprintf("Version: %s", Version), + fmt.Sprintf("Git SHA: %s", GitSHA), + fmt.Sprintf("Go Version: %s", runtime.Version()), + fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH), + } +}