From ff2112b1e4145c1e322c4923899f52eebe6e25b8 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Fri, 26 Apr 2019 09:20:23 -0400 Subject: [PATCH 1/7] Add v1alpha2 mpi-operator controller logic --- .../{ => v1alpha1}/mpi_job_controller.go | 0 .../{ => v1alpha1}/mpi_job_controller_test.go | 0 .../v1alpha2/mpi_job_controller.go | 1152 +++++++++++++++++ .../v1alpha2/mpi_job_controller_test.go | 782 +++++++++++ 4 files changed, 1934 insertions(+) rename pkg/controllers/{ => v1alpha1}/mpi_job_controller.go (100%) rename pkg/controllers/{ => v1alpha1}/mpi_job_controller_test.go (100%) create mode 100644 pkg/controllers/v1alpha2/mpi_job_controller.go create mode 100644 pkg/controllers/v1alpha2/mpi_job_controller_test.go diff --git a/pkg/controllers/mpi_job_controller.go b/pkg/controllers/v1alpha1/mpi_job_controller.go similarity index 100% rename from pkg/controllers/mpi_job_controller.go rename to pkg/controllers/v1alpha1/mpi_job_controller.go diff --git a/pkg/controllers/mpi_job_controller_test.go b/pkg/controllers/v1alpha1/mpi_job_controller_test.go similarity index 100% rename from pkg/controllers/mpi_job_controller_test.go rename to pkg/controllers/v1alpha1/mpi_job_controller_test.go diff --git a/pkg/controllers/v1alpha2/mpi_job_controller.go b/pkg/controllers/v1alpha2/mpi_job_controller.go new file mode 100644 index 000000000..0b234d759 --- /dev/null +++ b/pkg/controllers/v1alpha2/mpi_job_controller.go @@ -0,0 +1,1152 @@ +// Copyright 2018 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "bytes" + "fmt" + "time" + + "github.com/golang/glog" + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + appsinformers "k8s.io/client-go/informers/apps/v1" + batchinformers "k8s.io/client-go/informers/batch/v1" + coreinformers "k8s.io/client-go/informers/core/v1" + policyinformers "k8s.io/client-go/informers/policy/v1beta1" + rbacinformers "k8s.io/client-go/informers/rbac/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + appslisters "k8s.io/client-go/listers/apps/v1" + batchlisters "k8s.io/client-go/listers/batch/v1" + corelisters "k8s.io/client-go/listers/core/v1" + policylisters "k8s.io/client-go/listers/policy/v1beta1" + rbaclisters "k8s.io/client-go/listers/rbac/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v1alpha2" + clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" + kubeflowScheme "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme" + informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions/kubeflow/v1alpha2" + listers "github.com/kubeflow/mpi-operator/pkg/client/listers/kubeflow/v1alpha2" +) + +const ( + controllerAgentName = "mpi-job-controller" + configSuffix = "-config" + configVolumeName = "mpi-job-config" + configMountPath = "/etc/mpi" + kubexecScriptName = "kubexec.sh" + hostfileName = "hostfile" + kubectlDeliveryName = "kubectl-delivery" + kubectlTargetDirEnv = "TARGET_DIR" + kubectlVolumeName = "mpi-job-kubectl" + kubectlMountPath = "/opt/kube" + launcher = "launcher" + worker = "worker" + launcherSuffix = "-launcher" + workerSuffix = "-worker" + pdbSuffix = "-pdb" + gpuResourceName = "nvidia.com/gpu" + cpuResourceName = "cpu" + labelGroupName = "group_name" + labelMPIJobName = "mpi_job_name" + labelMPIRoleType = "mpi_role_type" +) + +const ( + // SuccessSynced is used as part of the Event 'reason' when an MPIJob is + // synced. + SuccessSynced = "Synced" + // ErrResourceExists is used as part of the Event 'reason' when an MPIJob + // fails to sync due to dependent resources of the same name already + // existing. + ErrResourceExists = "ErrResourceExists" + + // MessageResourceExists is the message used for Events when a resource + // fails to sync due to dependent resources already existing. + MessageResourceExists = "Resource %q of Kind %q already exists and is not managed by MPIJob" + // MessageResourceSynced is the message used for an Event fired when an + // MPIJob is synced successfully. + MessageResourceSynced = "MPIJob synced successfully" + + // LabelNodeRoleMaster specifies that a node is a master + LabelNodeRoleMaster = "node-role.kubernetes.io/master" +) + +// MPIJobController is the controller implementation for MPIJob resources. +type MPIJobController struct { + // kubeClient is a standard kubernetes clientset. + kubeClient kubernetes.Interface + // kubeflowClient is a clientset for our own API group. + kubeflowClient clientset.Interface + + configMapLister corelisters.ConfigMapLister + configMapSynced cache.InformerSynced + serviceAccountLister corelisters.ServiceAccountLister + serviceAccountSynced cache.InformerSynced + roleLister rbaclisters.RoleLister + roleSynced cache.InformerSynced + roleBindingLister rbaclisters.RoleBindingLister + roleBindingSynced cache.InformerSynced + statefulSetLister appslisters.StatefulSetLister + statefulSetSynced cache.InformerSynced + jobLister batchlisters.JobLister + jobSynced cache.InformerSynced + pdbLister policylisters.PodDisruptionBudgetLister + pdbSynced cache.InformerSynced + mpiJobLister listers.MPIJobLister + mpiJobSynced cache.InformerSynced + + // queue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + queue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder + // The maximum number of GPUs per node. + gpusPerNode int + // The maximum number of processing units per node. + processingUnitsPerNode int + // The processing resource name, e.g. "nvidia.com/gpu" or "cpu" + processingResourceType string + // The container image used to deliver the kubectl binary. + kubectlDeliveryImage string + // Whether to enable gang scheduling by kube-batch + enableGangScheduling bool +} + +// NewMPIJobController returns a new MPIJob controller. +func NewMPIJobController( + kubeClient kubernetes.Interface, + kubeflowClient clientset.Interface, + configMapInformer coreinformers.ConfigMapInformer, + serviceAccountInformer coreinformers.ServiceAccountInformer, + roleInformer rbacinformers.RoleInformer, + roleBindingInformer rbacinformers.RoleBindingInformer, + statefulSetInformer appsinformers.StatefulSetInformer, + jobInformer batchinformers.JobInformer, + pdbInformer policyinformers.PodDisruptionBudgetInformer, + mpiJobInformer informers.MPIJobInformer, + kubectlDeliveryImage string, + enableGangScheduling bool) *MPIJobController { + + // Create event broadcaster. + // Add mpi-job-controller types to the default Kubernetes Scheme so Events + // can be logged for mpi-job-controller types. + kubeflowScheme.AddToScheme(scheme.Scheme) + glog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + controller := &MPIJobController{ + kubeClient: kubeClient, + kubeflowClient: kubeflowClient, + configMapLister: configMapInformer.Lister(), + configMapSynced: configMapInformer.Informer().HasSynced, + serviceAccountLister: serviceAccountInformer.Lister(), + serviceAccountSynced: serviceAccountInformer.Informer().HasSynced, + roleLister: roleInformer.Lister(), + roleSynced: roleInformer.Informer().HasSynced, + roleBindingLister: roleBindingInformer.Lister(), + roleBindingSynced: roleBindingInformer.Informer().HasSynced, + statefulSetLister: statefulSetInformer.Lister(), + statefulSetSynced: statefulSetInformer.Informer().HasSynced, + jobLister: jobInformer.Lister(), + jobSynced: jobInformer.Informer().HasSynced, + pdbLister: pdbInformer.Lister(), + pdbSynced: pdbInformer.Informer().HasSynced, + mpiJobLister: mpiJobInformer.Lister(), + mpiJobSynced: mpiJobInformer.Informer().HasSynced, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), + recorder: recorder, + kubectlDeliveryImage: kubectlDeliveryImage, + enableGangScheduling: enableGangScheduling, + } + + glog.Info("Setting up event handlers") + // Set up an event handler for when MPIJob resources change. + mpiJobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueMPIJob, + UpdateFunc: func(old, new interface{}) { + controller.enqueueMPIJob(new) + }, + }) + + // Set up an event handler for when dependent resources change. This + // handler will lookup the owner of the given resource, and if it is + // owned by an MPIJob resource will enqueue that MPIJob resource for + // processing. This way, we don't need to implement custom logic for + // handling dependent resources. More info on this pattern: + // https://github.com/kubernetes/community/blob/8cafef897a22026d42f5e5bb3f104febe7e29830/contributors/devel/controllers.md + configMapInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newConfigMap := new.(*corev1.ConfigMap) + oldConfigMap := old.(*corev1.ConfigMap) + if newConfigMap.ResourceVersion == oldConfigMap.ResourceVersion { + // Periodic re-sync will send update events for all known + // ConfigMaps. Two different versions of the same ConfigMap + // will always have different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + serviceAccountInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newServiceAccount := new.(*corev1.ServiceAccount) + oldServiceAccount := old.(*corev1.ServiceAccount) + if newServiceAccount.ResourceVersion == oldServiceAccount.ResourceVersion { + // Periodic re-sync will send update events for all known + // ServiceAccounts. Two different versions of the same ServiceAccount + // will always have different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + roleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newRole := new.(*rbacv1.Role) + oldRole := old.(*rbacv1.Role) + if newRole.ResourceVersion == oldRole.ResourceVersion { + // Periodic re-sync will send update events for all known + // Roles. Two different versions of the same Role + // will always have different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + roleBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newRoleBinding := new.(*rbacv1.RoleBinding) + oldRoleBinding := old.(*rbacv1.RoleBinding) + if newRoleBinding.ResourceVersion == oldRoleBinding.ResourceVersion { + // Periodic re-sync will send update events for all known + // RoleBindings. Two different versions of the same RoleBinding + // will always have different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + statefulSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newStatefulSet := new.(*appsv1.StatefulSet) + oldStatefulSet := old.(*appsv1.StatefulSet) + if newStatefulSet.ResourceVersion == oldStatefulSet.ResourceVersion { + // Periodic re-sync will send update events for all known + // StatefulSets. Two different versions of the same StatefulSet + // will always have different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newJob := new.(*batchv1.Job) + oldJob := old.(*batchv1.Job) + if newJob.ResourceVersion == oldJob.ResourceVersion { + // Periodic re-sync will send update events for all known Jobs. + // Two different versions of the same Job will always have + // different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.handleObject, + UpdateFunc: func(old, new interface{}) { + newPolicy := new.(*policyv1beta1.PodDisruptionBudget) + oldPolicy := old.(*policyv1beta1.PodDisruptionBudget) + if newPolicy.ResourceVersion == oldPolicy.ResourceVersion { + // Periodic re-sync will send update events for all known PodDisruptionBudgets. + // Two different versions of the same Job will always have + // different RVs. + return + } + controller.handleObject(new) + }, + DeleteFunc: controller.handleObject, + }) + + return controller +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the work queue and wait for +// workers to finish processing their current work items. +func (c *MPIJobController) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + + // Start the informer factories to begin populating the informer caches. + glog.Info("Starting MPIJob controller") + + // Wait for the caches to be synced before starting workers. + glog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.configMapSynced, c.serviceAccountSynced, c.roleSynced, c.roleBindingSynced, c.statefulSetSynced, c.jobSynced, c.mpiJobSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + glog.Info("Starting workers") + // Launch workers to process MPIJob resources. + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + + return nil +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// work queue. +func (c *MPIJobController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the work queue and +// attempt to process it, by calling the syncHandler. +func (c *MPIJobController) processNextWorkItem() bool { + obj, shutdown := c.queue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.queue.Done. + err := func(obj interface{}) error { + // We call Done here so the work queue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the work queue and attempted again after a back-off + // period. + defer c.queue.Done(obj) + var key string + var ok bool + // We expect strings to come off the work queue. These are of the + // form namespace/name. We do this as the delayed nature of the + // work queue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // work queue. + if key, ok = obj.(string); !ok { + // As the item in the work queue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.queue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // MPIJob resource to be synced. + if err := c.syncHandler(key); err != nil { + return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.queue.Forget(obj) + glog.Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the MPIJob resource +// with the current status of the resource. +func (c *MPIJobController) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name. + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + // Get the MPIJob with this namespace/name. + mpiJob, err := c.mpiJobLister.MPIJobs(namespace).Get(name) + // The MPIJob may no longer exist, in which case we stop processing. + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("mpi job '%s' in work queue no longer exists", key)) + return nil + } + if err != nil { + return err + } + + // Get the launcher Job for this MPIJob. + launcher, err := c.getLauncherJob(mpiJob) + if err != nil { + return err + } + // We're done if the launcher either succeeded or failed. + done := launcher != nil && (launcher.Status.Succeeded == 1 || launcher.Status.Failed == 1) + + workerSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] + workerReplicas := int(*workerSpec.Replicas) + + if !done { + // Get the ConfigMap for this MPIJob. + if config, err := c.getOrCreateConfigMap(mpiJob, workerReplicas); config == nil || err != nil { + return err + } + + // Get the launcher ServiceAccount for this MPIJob. + if sa, err := c.getOrCreateLauncherServiceAccount(mpiJob); sa == nil || err != nil { + return err + } + + // Get the launcher Role for this MPIJob. + if r, err := c.getOrCreateLauncherRole(mpiJob, workerReplicas); r == nil || err != nil { + return err + } + + // Get the launcher RoleBinding for this MPIJob. + if rb, err := c.getLauncherRoleBinding(mpiJob); rb == nil || err != nil { + return err + } + + // Get the PDB for this MPIJob + if c.enableGangScheduling { + if pdb, err := c.getOrCreatePDB(mpiJob, workerReplicas); pdb == nil || err != nil { + return err + } + } + } + + worker, err := c.getOrCreateWorkerStatefulSet(mpiJob, workerReplicas) + if err != nil { + return err + } + + // If the worker is ready, start the launcher. + workerReady := workerReplicas == 0 || int(worker.Status.ReadyReplicas) == workerReplicas + if workerReady && launcher == nil { + launcher, err = c.kubeClient.BatchV1().Jobs(namespace).Create(newLauncher(mpiJob, c.kubectlDeliveryImage)) + if err != nil { + return err + } + } + + // Finally, we update the status block of the MPIJob resource to reflect the + // current state of the world. + err = c.updateMPIJobStatus(mpiJob, launcher, worker) + if err != nil { + return err + } + + // TODO(terrytangyuan): update MPIJob conditions + + c.recorder.Event(mpiJob, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) + return nil +} + +// getLauncherJob gets the launcher Job controlled by this MPIJob. +func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) { + launcher, err := c.jobLister.Jobs(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix) + if errors.IsNotFound(err) { + return nil, nil + } + if err != nil { + // If an error occurs during Get, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + return nil, err + } + + // If the launcher is not controlled by this MPIJob resource, we should log + // a warning to the event recorder and return. + if !metav1.IsControlledBy(launcher, mpiJob) { + msg := fmt.Sprintf(MessageResourceExists, launcher.Name, launcher.Kind) + c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg) + return launcher, fmt.Errorf(msg) + } + + return launcher, nil +} + +// getOrCreatePDB will create a PDB for gang scheduling by kube-batch. +func (c *MPIJobController) getOrCreatePDB(mpiJob *kubeflow.MPIJob, minAvailableWorkerReplicas int) (*policyv1beta1.PodDisruptionBudget, error) { + + pdb, err := c.pdbLister.PodDisruptionBudgets(mpiJob.Namespace).Get(mpiJob.Name + pdbSuffix) + // If the PDB doesn't exist, we'll create it. + if errors.IsNotFound(err) { + pdb, err = c.kubeClient.PolicyV1beta1().PodDisruptionBudgets(mpiJob.Namespace).Create(newPDB(mpiJob, minAvailableWorkerReplicas)) + } + // If an error occurs during Get/Create, we'll requeue the item so we + // can attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + // If the PDB is not controlled by this MPIJob resource, we + // should log a warning to the event recorder and return. + if !metav1.IsControlledBy(pdb, mpiJob) { + msg := fmt.Sprintf(MessageResourceExists, pdb.Name, pdb.Kind) + c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + return pdb, nil +} + +// getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates +// one if it doesn't exist. +func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int) (*corev1.ConfigMap, error) { + cm, err := c.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix) + // If the ConfigMap doesn't exist, we'll create it. + if errors.IsNotFound(err) { + cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Create(newConfigMap(mpiJob, workerReplicas)) + } + // If an error occurs during Get/Create, we'll requeue the item so we + // can attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + // If the ConfigMap is not controlled by this MPIJob resource, we + // should log a warning to the event recorder and return. + if !metav1.IsControlledBy(cm, mpiJob) { + msg := fmt.Sprintf(MessageResourceExists, cm.Name, cm.Kind) + c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + return cm, nil +} + +// getOrCreateLauncherServiceAccount gets the launcher ServiceAccount controlled +// by this MPIJob, or creates one if it doesn't exist. +func (c *MPIJobController) getOrCreateLauncherServiceAccount(mpiJob *kubeflow.MPIJob) (*corev1.ServiceAccount, error) { + sa, err := c.serviceAccountLister.ServiceAccounts(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix) + // If the ServiceAccount doesn't exist, we'll create it. + if errors.IsNotFound(err) { + sa, err = c.kubeClient.CoreV1().ServiceAccounts(mpiJob.Namespace).Create(newLauncherServiceAccount(mpiJob)) + } + // If an error occurs during Get/Create, we'll requeue the item so we + // can attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + // If the launcher ServiceAccount is not controlled by this MPIJob resource, we + // should log a warning to the event recorder and return. + if !metav1.IsControlledBy(sa, mpiJob) { + msg := fmt.Sprintf(MessageResourceExists, sa.Name, sa.Kind) + c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + return sa, nil +} + +// getOrCreateLauncherRole gets the launcher Role controlled by this MPIJob. +func (c *MPIJobController) getOrCreateLauncherRole(mpiJob *kubeflow.MPIJob, workerReplicas int) (*rbacv1.Role, error) { + role, err := c.roleLister.Roles(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix) + // If the Role doesn't exist, we'll create it. + if errors.IsNotFound(err) { + role, err = c.kubeClient.RbacV1().Roles(mpiJob.Namespace).Create(newLauncherRole(mpiJob, workerReplicas)) + } + // If an error occurs during Get/Create, we'll requeue the item so we + // can attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + // If the launcher Role is not controlled by this MPIJob resource, we + // should log a warning to the event recorder and return. + if !metav1.IsControlledBy(role, mpiJob) { + msg := fmt.Sprintf(MessageResourceExists, role.Name, role.Kind) + c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + return role, nil +} + +// getLauncherRoleBinding gets the launcher RoleBinding controlled by this +// MPIJob, or creates one if it doesn't exist. +func (c *MPIJobController) getLauncherRoleBinding(mpiJob *kubeflow.MPIJob) (*rbacv1.RoleBinding, error) { + rb, err := c.roleBindingLister.RoleBindings(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix) + // If the RoleBinding doesn't exist, we'll create it. + if errors.IsNotFound(err) { + rb, err = c.kubeClient.RbacV1().RoleBindings(mpiJob.Namespace).Create(newLauncherRoleBinding(mpiJob)) + } + // If an error occurs during Get/Create, we'll requeue the item so we + // can attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + // If the launcher RoleBinding is not controlled by this MPIJob resource, we + // should log a warning to the event recorder and return. + if !metav1.IsControlledBy(rb, mpiJob) { + msg := fmt.Sprintf(MessageResourceExists, rb.Name, rb.Kind) + c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + return rb, nil +} + +// getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this +// MPIJob, or creates one if it doesn't exist. +func (c *MPIJobController) getOrCreateWorkerStatefulSet(mpiJob *kubeflow.MPIJob, workerReplicas int) (*appsv1.StatefulSet, error) { + worker, err := c.statefulSetLister.StatefulSets(mpiJob.Namespace).Get(mpiJob.Name + workerSuffix) + // If the StatefulSet doesn't exist, we'll create it. + if errors.IsNotFound(err) && workerReplicas > 0 { + worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Create(newWorker(mpiJob, int32(workerReplicas))) + } + // If an error occurs during Get/Create, we'll requeue the item so we + // can attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil && !errors.IsNotFound(err) { + return nil, err + } + + // If the worker is not controlled by this MPIJob resource, we should log + // a warning to the event recorder and return. + if worker != nil && !metav1.IsControlledBy(worker, mpiJob) { + msg := fmt.Sprintf(MessageResourceExists, worker.Name, worker.Kind) + c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg) + return nil, fmt.Errorf(msg) + } + + // If the worker is out of date, update the worker. + if worker != nil && int(*worker.Spec.Replicas) != workerReplicas { + worker, err = c.kubeClient.AppsV1().StatefulSets(mpiJob.Namespace).Update(newWorker(mpiJob, int32(workerReplicas))) + // If an error occurs during Update, we'll requeue the item so we can + // attempt processing again later. This could have been caused by a + // temporary network failure, or any other transient reason. + if err != nil { + return nil, err + } + } + + return worker, nil +} + +func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher *batchv1.Job, worker *appsv1.StatefulSet) error { + // NEVER modify objects from the store. It's a read-only, local cache. + // You can use DeepCopy() to make a deep copy of original object and modify this copy + // Or create a copy manually for better performance + mpiJobCopy := mpiJob.DeepCopy() + + if launcher != nil { + now := metav1.Now() + if launcher.Status.Active > 0 { + mpiJobCopy.Status.ReplicaStatuses[kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher)].Active += 1 + if mpiJobCopy.Status.StartTime == nil { + mpiJobCopy.Status.StartTime = &now + } + } else if launcher.Status.Succeeded > 0 { + mpiJobCopy.Status.ReplicaStatuses[kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher)].Succeeded += 1 + if mpiJobCopy.Status.CompletionTime == nil { + mpiJobCopy.Status.CompletionTime = &now + } + } else if launcher.Status.Failed > 0 { + mpiJobCopy.Status.ReplicaStatuses[kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher)].Failed += 1 + } + } + if worker != nil { + now := metav1.Now() + if worker.Status.ReadyReplicas > 0 { + mpiJobCopy.Status.ReplicaStatuses[kubeflow.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = worker.Status.ReadyReplicas + if mpiJobCopy.Status.StartTime == nil { + mpiJobCopy.Status.StartTime = &now + } + } + // TODO: Figure out to update the other statuses + // else if worker.Status.Succeeded > 0 { + // workerStatuses.Succeeded += workerStatuses.Succeeded + // if mpiJobCopy.Status.CompletionTime == nil { + // mpiJobCopy.Status.CompletionTime = &now + // } + // } else if worker.Status.Failed > 0 { + // workerStatuses.Failed += workerStatuses.Failed + // } + } + // Until #38113 is merged, we must use Update instead of UpdateStatus to + // update the Status block of the MPIJob resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + _, err := c.kubeflowClient.KubeflowV1alpha2().MPIJobs(mpiJob.Namespace).Update(mpiJobCopy) + return err +} + +// enqueueMPIJob takes a MPIJob resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than MPIJob. +func (c *MPIJobController) enqueueMPIJob(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(err) + return + } + c.queue.AddRateLimited(key) +} + +// handleObject will take any resource implementing metav1.Object and attempt +// to find the MPIJob resource that 'owns' it. It does this by looking at the +// objects metadata.ownerReferences field for an appropriate OwnerReference. +// It then enqueues that MPIJob resource to be processed. If the object does not +// have an appropriate OwnerReference, it will simply be skipped. +func (c *MPIJobController) handleObject(obj interface{}) { + var object metav1.Object + var ok bool + if object, ok = obj.(metav1.Object); !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + runtime.HandleError(fmt.Errorf("error decoding object, invalid type")) + return + } + object, ok = tombstone.Obj.(metav1.Object) + if !ok { + runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type")) + return + } + glog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName()) + } + glog.V(4).Infof("Processing object: %s", object.GetName()) + if ownerRef := metav1.GetControllerOf(object); ownerRef != nil { + // If this object is not owned by a MPIJob, we should not do anything + // more with it. + if ownerRef.Kind != "MPIJob" { + return + } + + mpiJob, err := c.mpiJobLister.MPIJobs(object.GetNamespace()).Get(ownerRef.Name) + if err != nil { + glog.V(4).Infof("ignoring orphaned object '%s' of mpi job '%s'", object.GetSelfLink(), ownerRef.Name) + return + } + + c.enqueueMPIJob(mpiJob) + return + } +} + +// newConfigMap creates a new ConfigMap containing configurations for an MPIJob +// resource. It also sets the appropriate OwnerReferences on the resource so +// handleObject can discover the MPIJob resource that 'owns' it. +func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int) *corev1.ConfigMap { + kubexec := fmt.Sprintf(`#!/bin/sh +set -x +POD_NAME=$1 +shift +%s/kubectl exec ${POD_NAME} -- /bin/sh -c "$*" +`, kubectlMountPath) + + // If no processing unit is specified, default to 1 slot. + slots := 1 + if mpiJob.Spec.SlotsPerWorker != nil { + slots = int(*mpiJob.Spec.SlotsPerWorker) + } + var buffer bytes.Buffer + for i := 0; i < workerReplicas; i++ { + buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots)) + } + + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: mpiJob.Name + configSuffix, + Namespace: mpiJob.Namespace, + Labels: map[string]string{ + "app": mpiJob.Name, + }, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), + }, + }, + Data: map[string]string{ + hostfileName: buffer.String(), + kubexecScriptName: kubexec, + }, + } +} + +// newLauncherServiceAccount creates a new launcher ServiceAccount for an MPIJob +// resource. It also sets the appropriate OwnerReferences on the resource so +// handleObject can discover the MPIJob resource that 'owns' it. +func newLauncherServiceAccount(mpiJob *kubeflow.MPIJob) *corev1.ServiceAccount { + return &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{ + Name: mpiJob.Name + launcherSuffix, + Namespace: mpiJob.Namespace, + Labels: map[string]string{ + "app": mpiJob.Name, + }, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), + }, + }, + } +} + +// newLauncherRole creates a new launcher Role for an MPIJob resource. It also +// sets the appropriate OwnerReferences on the resource so handleObject can +// discover the MPIJob resource that 'owns' it. +func newLauncherRole(mpiJob *kubeflow.MPIJob, workerReplicas int) *rbacv1.Role { + var podNames []string + for i := 0; i < workerReplicas; i++ { + podNames = append(podNames, fmt.Sprintf("%s%s-%d", mpiJob.Name, workerSuffix, i)) + } + return &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: mpiJob.Name + launcherSuffix, + Namespace: mpiJob.Namespace, + Labels: map[string]string{ + "app": mpiJob.Name, + }, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), + }, + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"get"}, + APIGroups: []string{""}, + Resources: []string{"pods"}, + ResourceNames: podNames, + }, + { + Verbs: []string{"create"}, + APIGroups: []string{""}, + Resources: []string{"pods/exec"}, + ResourceNames: podNames, + }, + }, + } +} + +// newLauncherRoleBinding creates a new launcher RoleBinding for an MPIJob +// resource. It also sets the appropriate OwnerReferences on the resource so +// handleObject can discover the MPIJob resource that 'owns' it. +func newLauncherRoleBinding(mpiJob *kubeflow.MPIJob) *rbacv1.RoleBinding { + launcherName := mpiJob.Name + launcherSuffix + return &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + Name: launcherName, + Namespace: mpiJob.Namespace, + Labels: map[string]string{ + "app": mpiJob.Name, + }, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), + }, + }, + Subjects: []rbacv1.Subject{ + { + Kind: rbacv1.ServiceAccountKind, + Name: launcherName, + Namespace: mpiJob.Namespace, + }, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.GroupName, + Kind: "Role", + Name: launcherName, + }, + } +} + +// newPDB creates a new launcher PodDisruptionBudget for an MPIJob +// resource. It also sets the appropriate OwnerReferences on the resource so +// handleObject can discover the MPIJob resource that 'owns' it. +func newPDB(mpiJob *kubeflow.MPIJob, minAvailableReplicas int) *policyv1beta1.PodDisruptionBudget { + minAvailable := intstr.FromInt(minAvailableReplicas) + return &policyv1beta1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: mpiJob.Name + pdbSuffix, + Namespace: mpiJob.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), + }, + }, + Spec: policyv1beta1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": mpiJob.Name, + }, + }, + }, + } +} + +// newWorker creates a new worker StatefulSet for an MPIJob resource. It also +// sets the appropriate OwnerReferences on the resource so handleObject can +// discover the MPIJob resource that 'owns' it. +func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32) *appsv1.StatefulSet { + labels := map[string]string{ + labelGroupName: "kubeflow.org", + labelMPIJobName: mpiJob.Name, + labelMPIRoleType: worker, + } + + podSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.DeepCopy() + + // keep the labels which are set in PodTemplate + if len(podSpec.Labels) == 0 { + podSpec.Labels = make(map[string]string) + } + + for key, value := range labels { + podSpec.Labels[key] = value + } + // always set restartPolicy to restartAlways for statefulset + podSpec.Spec.RestartPolicy = corev1.RestartPolicyAlways + + container := podSpec.Spec.Containers[0] + container.Command = []string{"sleep"} + container.Args = []string{"365d"} + + // We need the kubexec.sh script here because Open MPI checks for the path + // in every rank. + container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{ + Name: configVolumeName, + MountPath: configMountPath, + }) + podSpec.Spec.Containers[0] = container + + scriptMode := int32(0555) + podSpec.Spec.Volumes = append(podSpec.Spec.Volumes, corev1.Volume{ + Name: configVolumeName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: mpiJob.Name + configSuffix, + }, + Items: []corev1.KeyToPath{ + { + Key: kubexecScriptName, + Path: kubexecScriptName, + Mode: &scriptMode, + }, + }, + }, + }, + }) + + // set default BackoffLimit + if mpiJob.Spec.BackoffLimit == nil { + mpiJob.Spec.BackoffLimit = new(int32) + *mpiJob.Spec.BackoffLimit = 6 + } + + return &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: mpiJob.Name + workerSuffix, + Namespace: mpiJob.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), + }, + }, + Spec: appsv1.StatefulSetSpec{ + PodManagementPolicy: appsv1.ParallelPodManagement, + Replicas: &desiredReplicas, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + ServiceName: mpiJob.Name + workerSuffix, + Template: *podSpec, + }, + } +} + +// newLauncher creates a new launcher Job for an MPIJob resource. It also sets +// the appropriate OwnerReferences on the resource so handleObject can discover +// the MPIJob resource that 'owns' it. +func newLauncher(mpiJob *kubeflow.MPIJob, kubectlDeliveryImage string) *batchv1.Job { + launcherName := mpiJob.Name + launcherSuffix + labels := map[string]string{ + labelGroupName: "kubeflow.org", + labelMPIJobName: mpiJob.Name, + labelMPIRoleType: launcher, + } + + podSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.DeepCopy() + // copy the labels and annotations to pod from PodTemplate + if len(podSpec.Labels) == 0 { + podSpec.Labels = make(map[string]string) + } + for key, value := range labels { + podSpec.Labels[key] = value + } + + podSpec.Spec.ServiceAccountName = launcherName + podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, corev1.Container{ + Name: kubectlDeliveryName, + Image: kubectlDeliveryImage, + Env: []corev1.EnvVar{ + { + Name: kubectlTargetDirEnv, + Value: kubectlMountPath, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: kubectlVolumeName, + MountPath: kubectlMountPath, + }, + }, + }) + container := podSpec.Spec.Containers[0] + container.Env = append(container.Env, + corev1.EnvVar{ + Name: "OMPI_MCA_plm_rsh_agent", + Value: fmt.Sprintf("%s/%s", configMountPath, kubexecScriptName), + }, + corev1.EnvVar{ + Name: "OMPI_MCA_orte_default_hostfile", + Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName), + }) + + // determine if run the launcher on the master node + if mpiJob.Spec.LauncherOnMaster { + + // support Tolerate + podSpec.Spec.Tolerations = []corev1.Toleration{ + { + Key: LabelNodeRoleMaster, + Effect: corev1.TaintEffectNoSchedule, + }, + } + // prefer to assign pod to master node + podSpec.Spec.Affinity = &corev1.Affinity{ + NodeAffinity: &corev1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{ + NodeSelectorTerms: []corev1.NodeSelectorTerm{ + { + MatchExpressions: []corev1.NodeSelectorRequirement{ + { + Key: LabelNodeRoleMaster, + Operator: corev1.NodeSelectorOpExists, + }, + }, + }, + }, + }, + }, + } + } + + container.VolumeMounts = append(container.VolumeMounts, + corev1.VolumeMount{ + Name: kubectlVolumeName, + MountPath: kubectlMountPath, + }, + corev1.VolumeMount{ + Name: configVolumeName, + MountPath: configMountPath, + }) + podSpec.Spec.Containers[0] = container + // Only a `RestartPolicy` equal to `Never` or `OnFailure` is allowed for `Job`. + if podSpec.Spec.RestartPolicy != corev1.RestartPolicyNever { + podSpec.Spec.RestartPolicy = corev1.RestartPolicyOnFailure + } + scriptsMode := int32(0555) + hostfileMode := int32(0444) + podSpec.Spec.Volumes = append(podSpec.Spec.Volumes, + corev1.Volume{ + Name: kubectlVolumeName, + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + corev1.Volume{ + Name: configVolumeName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: mpiJob.Name + configSuffix, + }, + Items: []corev1.KeyToPath{ + { + Key: kubexecScriptName, + Path: kubexecScriptName, + Mode: &scriptsMode, + }, + { + Key: hostfileName, + Path: hostfileName, + Mode: &hostfileMode, + }, + }, + }, + }, + }) + + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: launcherName, + Namespace: mpiJob.Namespace, + Labels: labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind), + }, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: mpiJob.Spec.BackoffLimit, + ActiveDeadlineSeconds: mpiJob.Spec.ActiveDeadlineSeconds, + Template: *podSpec, + }, + } +} diff --git a/pkg/controllers/v1alpha2/mpi_job_controller_test.go b/pkg/controllers/v1alpha2/mpi_job_controller_test.go new file mode 100644 index 000000000..b614ec69b --- /dev/null +++ b/pkg/controllers/v1alpha2/mpi_job_controller_test.go @@ -0,0 +1,782 @@ +// Copyright 2018 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controllers + +import ( + "reflect" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" + rbacv1 "k8s.io/api/rbac/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" + kubeinformers "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + + kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v1alpha2" + "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/fake" + informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" + "k8s.io/apimachinery/pkg/api/resource" +) + +var ( + alwaysReady = func() bool { return true } + noResyncPeriodFunc = func() time.Duration { return 0 } +) + +type fixture struct { + t *testing.T + + client *fake.Clientset + kubeClient *k8sfake.Clientset + + // Objects to put in the store. + configMapLister []*corev1.ConfigMap + serviceAccountLister []*corev1.ServiceAccount + roleLister []*rbacv1.Role + roleBindingLister []*rbacv1.RoleBinding + statefulSetLister []*appsv1.StatefulSet + jobLister []*batchv1.Job + pdbLister []*policyv1beta1.PodDisruptionBudget + mpiJobLister []*kubeflow.MPIJob + + // Actions expected to happen on the client. + kubeActions []core.Action + actions []core.Action + + // Objects from here are pre-loaded into NewSimpleFake. + kubeObjects []runtime.Object + objects []runtime.Object +} + +func newFixture(t *testing.T) *fixture { + f := &fixture{} + f.t = t + f.objects = []runtime.Object{} + f.kubeObjects = []runtime.Object{} + return f +} + +func newMPIJobCommon(name string, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { + mpiJob := &kubeflow.MPIJob{ + TypeMeta: metav1.TypeMeta{APIVersion: kubeflow.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceDefault, + }, + Spec: kubeflow.MPIJobSpec{ + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ + kubeflow.MPIReplicaTypeWorker: &kubeflow.ReplicaSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "bar", + }, + }, + }, + }, + }, + kubeflow.MPIReplicaTypeLauncher: &kubeflow.ReplicaSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "bar", + }, + }, + }, + }, + }, + }, + }, + Status: kubeflow.JobStatus{}, + } + + if startTime != nil { + mpiJob.Status.StartTime = startTime + } + if completionTime != nil { + mpiJob.Status.CompletionTime = completionTime + } + + return mpiJob +} + +func newMPIJob(name string, replicas *int32, pusPerReplica int64, resourceName string, startTime, completionTime *metav1.Time) *kubeflow.MPIJob { + mpiJob := newMPIJobCommon(name, startTime, completionTime) + + mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Replicas = replicas + + workerContainers := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.Spec.Containers + for i := range workerContainers { + container := &workerContainers[i] + container.Resources = corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceName(resourceName): *resource.NewQuantity(pusPerReplica, resource.DecimalExponent), + }, + } + } + + return mpiJob +} + +func (f *fixture) newController() (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { + f.client = fake.NewSimpleClientset(f.objects...) + f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...) + + i := informers.NewSharedInformerFactory(f.client, noResyncPeriodFunc()) + k8sI := kubeinformers.NewSharedInformerFactory(f.kubeClient, noResyncPeriodFunc()) + + c := NewMPIJobController( + f.kubeClient, + f.client, + k8sI.Core().V1().ConfigMaps(), + k8sI.Core().V1().ServiceAccounts(), + k8sI.Rbac().V1().Roles(), + k8sI.Rbac().V1().RoleBindings(), + k8sI.Apps().V1().StatefulSets(), + k8sI.Batch().V1().Jobs(), + k8sI.Policy().V1beta1().PodDisruptionBudgets(), + i.Kubeflow().V1alpha2().MPIJobs(), + "kubectl-delivery", + false, + ) + + c.configMapSynced = alwaysReady + c.serviceAccountSynced = alwaysReady + c.roleSynced = alwaysReady + c.roleBindingSynced = alwaysReady + c.statefulSetSynced = alwaysReady + c.jobSynced = alwaysReady + c.pdbSynced = alwaysReady + c.mpiJobSynced = alwaysReady + c.recorder = &record.FakeRecorder{} + + for _, configMap := range f.configMapLister { + k8sI.Core().V1().ConfigMaps().Informer().GetIndexer().Add(configMap) + } + + for _, serviceAccount := range f.serviceAccountLister { + k8sI.Core().V1().ServiceAccounts().Informer().GetIndexer().Add(serviceAccount) + } + + for _, role := range f.roleLister { + k8sI.Rbac().V1().Roles().Informer().GetIndexer().Add(role) + } + + for _, roleBinding := range f.roleBindingLister { + k8sI.Rbac().V1().RoleBindings().Informer().GetIndexer().Add(roleBinding) + } + + for _, statefulSet := range f.statefulSetLister { + k8sI.Apps().V1().StatefulSets().Informer().GetIndexer().Add(statefulSet) + } + + for _, job := range f.jobLister { + k8sI.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + } + + for _, pdb := range f.pdbLister { + k8sI.Policy().V1beta1().PodDisruptionBudgets().Informer().GetIndexer().Add(pdb) + } + + for _, mpiJob := range f.mpiJobLister { + i.Kubeflow().V1alpha2().MPIJobs().Informer().GetIndexer().Add(mpiJob) + } + + return c, i, k8sI +} + +func (f *fixture) run(mpiJobName string) { + f.runController(mpiJobName, true, false) +} + +func (f *fixture) runExpectError(mpiJobName string) { + f.runController(mpiJobName, true, true) +} + +func (f *fixture) runController(mpiJobName string, startInformers bool, expectError bool) { + c, i, k8sI := f.newController() + if startInformers { + stopCh := make(chan struct{}) + defer close(stopCh) + i.Start(stopCh) + k8sI.Start(stopCh) + } + + err := c.syncHandler(mpiJobName) + if !expectError && err != nil { + f.t.Errorf("error syncing mpi job: %v", err) + } else if expectError && err == nil { + f.t.Error("expected error syncing mpi job, got nil") + } + + actions := filterInformerActions(f.client.Actions()) + for i, action := range actions { + if len(f.actions) < i+1 { + f.t.Errorf("%d unexpected actions: %+v", len(actions)-len(f.actions), actions[i:]) + break + } + + expectedAction := f.actions[i] + checkAction(expectedAction, action, f.t) + } + + if len(f.actions) > len(actions) { + f.t.Errorf("%d additional expected actions:%+v", len(f.actions)-len(actions), f.actions[len(actions):]) + } + + k8sActions := filterInformerActions(f.kubeClient.Actions()) + for i, action := range k8sActions { + if len(f.kubeActions) < i+1 { + f.t.Errorf("%d unexpected actions: %+v", len(k8sActions)-len(f.kubeActions), k8sActions[i:]) + break + } + + expectedAction := f.kubeActions[i] + checkAction(expectedAction, action, f.t) + } + + if len(f.kubeActions) > len(k8sActions) { + f.t.Errorf("%d additional expected actions:%+v", len(f.kubeActions)-len(k8sActions), f.kubeActions[len(k8sActions):]) + } +} + +// checkAction verifies that expected and actual actions are equal and both have +// same attached resources +func checkAction(expected, actual core.Action, t *testing.T) { + if !(expected.Matches(actual.GetVerb(), actual.GetResource().Resource) && actual.GetSubresource() == expected.GetSubresource()) { + t.Errorf("Expected\n\t%#v\ngot\n\t%#v", expected, actual) + return + } + + if reflect.TypeOf(actual) != reflect.TypeOf(expected) { + t.Errorf("Action has wrong type. Expected: %t. Got: %t", expected, actual) + return + } + + switch a := actual.(type) { + case core.CreateAction: + e, _ := expected.(core.CreateAction) + expObject := e.GetObject() + object := a.GetObject() + + if !reflect.DeepEqual(expObject, object) { + t.Errorf("Action %s %s has wrong object\nDiff:\n %s", + a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintDiff(expObject, object)) + } + case core.UpdateAction: + e, _ := expected.(core.UpdateAction) + expObject := e.GetObject() + object := a.GetObject() + + if !reflect.DeepEqual(expObject, object) { + t.Errorf("Action %s %s has wrong object\nDiff:\n %s", + a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintDiff(expObject, object)) + } + case core.PatchAction: + e, _ := expected.(core.PatchAction) + expPatch := e.GetPatch() + patch := a.GetPatch() + + if !reflect.DeepEqual(expPatch, expPatch) { + t.Errorf("Action %s %s has wrong patch\nDiff:\n %s", + a.GetVerb(), a.GetResource().Resource, diff.ObjectGoPrintDiff(expPatch, patch)) + } + } +} + +// filterInformerActions filters list and watch actions for testing resources. +// Since list and watch don't change resource state we can filter it to lower +// nose level in our tests. +func filterInformerActions(actions []core.Action) []core.Action { + var ret []core.Action + for _, action := range actions { + if len(action.GetNamespace()) == 0 && + (action.Matches("list", "configmaps") || + action.Matches("watch", "configmaps") || + action.Matches("list", "serviceaccounts") || + action.Matches("watch", "serviceaccounts") || + action.Matches("list", "roles") || + action.Matches("watch", "roles") || + action.Matches("list", "rolebindings") || + action.Matches("watch", "rolebindings") || + action.Matches("list", "statefulsets") || + action.Matches("watch", "statefulsets") || + action.Matches("list", "pods") || + action.Matches("watch", "pods") || + action.Matches("list", "jobs") || + action.Matches("watch", "jobs") || + action.Matches("list", "poddisruptionbudgets") || + action.Matches("watch", "poddisruptionbudgets") || + action.Matches("list", "mpijobs") || + action.Matches("watch", "mpijobs")) { + continue + } + ret = append(ret, action) + } + + return ret +} + +func (f *fixture) expectCreateConfigMapAction(d *corev1.ConfigMap) { + f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "configmaps"}, d.Namespace, d)) +} + +func (f *fixture) expectUpdateConfigMapAction(d *corev1.ConfigMap) { + f.kubeActions = append(f.kubeActions, core.NewUpdateAction(schema.GroupVersionResource{Resource: "configmaps"}, d.Namespace, d)) +} + +func (f *fixture) expectCreateServiceAccountAction(d *corev1.ServiceAccount) { + f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "serviceaccounts"}, d.Namespace, d)) +} + +func (f *fixture) expectUpdateServiceAccountAction(d *corev1.ServiceAccount) { + f.kubeActions = append(f.kubeActions, core.NewUpdateAction(schema.GroupVersionResource{Resource: "serviceaccounts"}, d.Namespace, d)) +} + +func (f *fixture) expectCreateRoleAction(d *rbacv1.Role) { + f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "roles"}, d.Namespace, d)) +} + +func (f *fixture) expectUpdateRoleAction(d *rbacv1.Role) { + f.kubeActions = append(f.kubeActions, core.NewUpdateAction(schema.GroupVersionResource{Resource: "roles"}, d.Namespace, d)) +} + +func (f *fixture) expectCreateRoleBindingAction(d *rbacv1.RoleBinding) { + f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "rolebindings"}, d.Namespace, d)) +} + +func (f *fixture) expectUpdateRoleBindingAction(d *rbacv1.RoleBinding) { + f.kubeActions = append(f.kubeActions, core.NewUpdateAction(schema.GroupVersionResource{Resource: "rolebindings"}, d.Namespace, d)) +} + +func (f *fixture) expectCreateStatefulSetAction(d *appsv1.StatefulSet) { + f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "statefulsets"}, d.Namespace, d)) +} + +func (f *fixture) expectUpdateStatefulSetAction(d *appsv1.StatefulSet) { + f.kubeActions = append(f.kubeActions, core.NewUpdateAction(schema.GroupVersionResource{Resource: "statefulsets"}, d.Namespace, d)) +} + +func (f *fixture) expectCreateJobAction(d *batchv1.Job) { + f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "jobs"}, d.Namespace, d)) +} + +func (f *fixture) expectUpdateJobAction(d *batchv1.Job) { + f.kubeActions = append(f.kubeActions, core.NewUpdateAction(schema.GroupVersionResource{Resource: "jobs"}, d.Namespace, d)) +} + +func (f *fixture) expectUpdateMPIJobStatusAction(mpiJob *kubeflow.MPIJob) { + action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "mpijobs"}, mpiJob.Namespace, mpiJob) + // TODO: Until #38113 is merged, we can't use Subresource + //action.Subresource = "status" + f.actions = append(f.actions, action) +} + +func (f *fixture) setUpMPIJob(mpiJob *kubeflow.MPIJob) { + f.mpiJobLister = append(f.mpiJobLister, mpiJob) + f.objects = append(f.objects, mpiJob) +} + +func (f *fixture) setUpLauncher(launcher *batchv1.Job) { + f.jobLister = append(f.jobLister, launcher) + f.kubeObjects = append(f.kubeObjects, launcher) +} + +func (f *fixture) setUpWorker(worker *appsv1.StatefulSet) { + f.statefulSetLister = append(f.statefulSetLister, worker) + f.kubeObjects = append(f.kubeObjects, worker) +} + +func (f *fixture) setUpConfigMap(configMap *corev1.ConfigMap) { + f.configMapLister = append(f.configMapLister, configMap) + f.kubeObjects = append(f.kubeObjects, configMap) +} + +func (f *fixture) setUpServiceAccount(serviceAccount *corev1.ServiceAccount) { + f.serviceAccountLister = append(f.serviceAccountLister, serviceAccount) + f.kubeObjects = append(f.kubeObjects, serviceAccount) +} + +func (f *fixture) setUpRole(role *rbacv1.Role) { + f.roleLister = append(f.roleLister, role) + f.kubeObjects = append(f.kubeObjects, role) +} + +func (f *fixture) setUpRoleBinding(roleBinding *rbacv1.RoleBinding) { + f.roleBindingLister = append(f.roleBindingLister, roleBinding) + f.kubeObjects = append(f.kubeObjects, roleBinding) +} + +func (f *fixture) setUpRbac(mpiJob *kubeflow.MPIJob, workerReplicas int) { + serviceAccount := newLauncherServiceAccount(mpiJob) + f.setUpServiceAccount(serviceAccount) + + role := newLauncherRole(mpiJob, workerReplicas) + f.setUpRole(role) + + roleBinding := newLauncherRoleBinding(mpiJob) + f.setUpRoleBinding(roleBinding) +} + +func setUpMPIJobTimestamp(mpiJob *kubeflow.MPIJob, startTime, completionTime *metav1.Time) { + if startTime != nil { + mpiJob.Status.StartTime = startTime + } + + if completionTime != nil { + mpiJob.Status.CompletionTime = completionTime + } +} + +func getKey(mpiJob *kubeflow.MPIJob, t *testing.T) string { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(mpiJob) + if err != nil { + t.Errorf("Unexpected error getting key for mpi job %v: %v", mpiJob.Name, err) + return "" + } + return key +} + +func TestDoNothingWithInvalidKey(t *testing.T) { + f := newFixture(t) + f.run("foo/bar/baz") +} + +func TestDoNothingWithNonexistentMPIJob(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.run(getKey(mpiJob, t)) +} + +func TestLauncherNotControlledByUs(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + launcher := newLauncher(mpiJob, "kubectl-delivery") + launcher.OwnerReferences = nil + f.setUpLauncher(launcher) + + f.runExpectError(getKey(mpiJob, t)) +} + +func TestLauncherSucceeded(t *testing.T) { + f := newFixture(t) + + startTime := metav1.Now() + completionTime := metav1.Now() + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + launcher := newLauncher(mpiJob, "kubectl-delivery") + launcher.Status.Succeeded = 1 + f.setUpLauncher(launcher) + + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.ReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher): &kubeflow.ReplicaStatus{ + Active: 0, + Succeeded: 1, + Failed: 0, + }, + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeWorker): &kubeflow.ReplicaStatus{ + Active: 0, + Succeeded: 0, + Failed: 0, + }, + } + + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) +} + +func TestLauncherFailed(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, nil) + f.setUpMPIJob(mpiJob) + + launcher := newLauncher(mpiJob, "kubectl-delivery") + launcher.Status.Failed = 1 + f.setUpLauncher(launcher) + + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.ReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher): &kubeflow.ReplicaStatus{ + Active: 0, + Succeeded: 0, + Failed: 1, + }, + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeWorker): &kubeflow.ReplicaStatus{ + Active: 0, + Succeeded: 0, + Failed: 0, + }, + } + setUpMPIJobTimestamp(mpiJobCopy, &startTime, nil) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) +} + +func TestLauncherDoesNotExist(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(4), 4, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + expConfigMap := newConfigMap(mpiJob, 4) + f.expectCreateConfigMapAction(expConfigMap) + + expServiceAccount := newLauncherServiceAccount(mpiJob) + f.expectCreateServiceAccountAction(expServiceAccount) + + expRole := newLauncherRole(mpiJob, 4) + f.expectCreateRoleAction(expRole) + + expRoleBinding := newLauncherRoleBinding(mpiJob) + f.expectCreateRoleBindingAction(expRoleBinding) + + expWorker := newWorker(mpiJob, 4) + f.expectCreateStatefulSetAction(expWorker) + + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.ReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeWorker): &kubeflow.ReplicaStatus{ + Active: 0, + Succeeded: 0, + Failed: 0, + }, + } + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) +} + +func TestConfigMapNotControlledByUs(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + configMap := newConfigMap(mpiJob, 8) + configMap.OwnerReferences = nil + f.setUpConfigMap(configMap) + + f.runExpectError(getKey(mpiJob, t)) +} + +func TestServiceAccountNotControlledByUs(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + f.setUpConfigMap(newConfigMap(mpiJob, 8)) + + serviceAccount := newLauncherServiceAccount(mpiJob) + serviceAccount.OwnerReferences = nil + f.setUpServiceAccount(serviceAccount) + + f.runExpectError(getKey(mpiJob, t)) +} + +func TestRoleNotControlledByUs(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + f.setUpConfigMap(newConfigMap(mpiJob, 8)) + f.setUpServiceAccount(newLauncherServiceAccount(mpiJob)) + + role := newLauncherRole(mpiJob, 8) + role.OwnerReferences = nil + f.setUpRole(role) + + f.runExpectError(getKey(mpiJob, t)) +} + +func TestRoleBindingNotControlledByUs(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + f.setUpConfigMap(newConfigMap(mpiJob, 8)) + f.setUpServiceAccount(newLauncherServiceAccount(mpiJob)) + f.setUpRole(newLauncherRole(mpiJob, 8)) + + roleBinding := newLauncherRoleBinding(mpiJob) + roleBinding.OwnerReferences = nil + f.setUpRoleBinding(roleBinding) + + f.runExpectError(getKey(mpiJob, t)) +} + +func TestShutdownWorker(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + launcher := newLauncher(mpiJob, "kubectl-delivery") + launcher.Status.Succeeded = 1 + f.setUpLauncher(launcher) + + worker := newWorker(mpiJob, 8) + f.setUpWorker(worker) + + expWorker := newWorker(mpiJob, 0) + f.expectUpdateStatefulSetAction(expWorker) + + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.ReplicaStatuses[kubeflow.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0 + mpiJobCopy.Status.ReplicaStatuses[kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher)].Succeeded = 1 + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) +} + +func TestWorkerNotControlledByUs(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(64), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + f.setUpConfigMap(newConfigMap(mpiJob, 8)) + f.setUpRbac(mpiJob, 8) + + worker := newWorker(mpiJob, 8) + worker.OwnerReferences = nil + f.setUpWorker(worker) + + f.runExpectError(getKey(mpiJob, t)) +} + +func TestLauncherActive(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(8), 1, gpuResourceName, &startTime, &completionTime) + + f.setUpMPIJob(mpiJob) + + f.setUpConfigMap(newConfigMap(mpiJob, 1)) + f.setUpRbac(mpiJob, 1) + + launcher := newLauncher(mpiJob, "kubectl-delivery") + launcher.Status.Active = 1 + f.setUpLauncher(launcher) + + worker := newWorker(mpiJob, 1) + f.setUpWorker(worker) + + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.ReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher): &kubeflow.ReplicaStatus{ + Active: 1, + Succeeded: 0, + Failed: 0, + }, + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeWorker): &kubeflow.ReplicaStatus{ + Active: 0, + Succeeded: 0, + Failed: 0, + }, + } + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) +} + +func TestWorkerReady(t *testing.T) { + f := newFixture(t) + startTime := metav1.Now() + completionTime := metav1.Now() + + mpiJob := newMPIJob("test", int32Ptr(16), 1, gpuResourceName, &startTime, &completionTime) + f.setUpMPIJob(mpiJob) + + f.setUpConfigMap(newConfigMap(mpiJob, 2)) + f.setUpRbac(mpiJob, 2) + + worker := newWorker(mpiJob, 2) + worker.Status.ReadyReplicas = 2 + f.setUpWorker(worker) + + expLauncher := newLauncher(mpiJob, "kubectl-delivery") + f.expectCreateJobAction(expLauncher) + + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.ReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeLauncher): &kubeflow.ReplicaStatus{ + Active: 0, + Succeeded: 0, + Failed: 0, + }, + kubeflow.ReplicaType(kubeflow.MPIReplicaTypeWorker): &kubeflow.ReplicaStatus{ + Active: 2, + Succeeded: 0, + Failed: 0, + }, + } + setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) +} + +func int32Ptr(i int32) *int32 { return &i } From 5b8dc2b18d95b5140f4b04c5fc8195377480faac Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Fri, 26 Apr 2019 09:23:36 -0400 Subject: [PATCH 2/7] Update travis config --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index ec39e0e28..7f6cb384f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,4 +12,4 @@ install: - go get -t -d ./... script: -- go test ./... +- go test ./pkg/controllers/v1alpha1/... From cdbd3eaddf5b48253539c58066270c7c146fc660 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Fri, 26 Apr 2019 09:27:35 -0400 Subject: [PATCH 3/7] Pull updates from master --- pkg/controllers/v1alpha1/mpi_job_controller.go | 4 ++-- pkg/controllers/v1alpha1/mpi_job_controller_test.go | 4 +++- pkg/controllers/v1alpha2/mpi_job_controller.go | 2 +- pkg/controllers/v1alpha2/mpi_job_controller_test.go | 2 +- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pkg/controllers/v1alpha1/mpi_job_controller.go b/pkg/controllers/v1alpha1/mpi_job_controller.go index 575403069..46ffb617b 100644 --- a/pkg/controllers/v1alpha1/mpi_job_controller.go +++ b/pkg/controllers/v1alpha1/mpi_job_controller.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package controllers +package v1alpha1 import ( "bytes" @@ -1071,7 +1071,7 @@ func newWorker(mpiJob *kubeflow.MPIJob, desiredReplicas int32, processingUnits i }, }) - //add SchedulerName to podSpec + //add SchedulerName to podSpec if enableGangScheduling { if podSpec.Spec.SchedulerName != "" && podSpec.Spec.SchedulerName != gangSchedulerName { errMsg := fmt.Sprintf( diff --git a/pkg/controllers/v1alpha1/mpi_job_controller_test.go b/pkg/controllers/v1alpha1/mpi_job_controller_test.go index 0e37f7482..1a0f2bfe8 100644 --- a/pkg/controllers/v1alpha1/mpi_job_controller_test.go +++ b/pkg/controllers/v1alpha1/mpi_job_controller_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package controllers +package v1alpha1 import ( "github.com/stretchr/testify/assert" @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" diff --git a/pkg/controllers/v1alpha2/mpi_job_controller.go b/pkg/controllers/v1alpha2/mpi_job_controller.go index 0b234d759..b3e88f4f3 100644 --- a/pkg/controllers/v1alpha2/mpi_job_controller.go +++ b/pkg/controllers/v1alpha2/mpi_job_controller.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package controllers +package v1alpha2 import ( "bytes" diff --git a/pkg/controllers/v1alpha2/mpi_job_controller_test.go b/pkg/controllers/v1alpha2/mpi_job_controller_test.go index b614ec69b..fa7c919ad 100644 --- a/pkg/controllers/v1alpha2/mpi_job_controller_test.go +++ b/pkg/controllers/v1alpha2/mpi_job_controller_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package controllers +package v1alpha2 import ( "reflect" From 7e94b67dd71e0eb086523c0a2a6c0c2c158006fc Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Fri, 26 Apr 2019 09:36:43 -0400 Subject: [PATCH 4/7] Fix travis build --- .travis.yml | 2 +- pkg/controllers/v1alpha1/mpi_job_controller_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 7f6cb384f..41e771b15 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,7 @@ before_install: - dep ensure install: -- go get -t -d ./... +- go get -t -d ./pkg/controllers/v1alpha1/... script: - go test ./pkg/controllers/v1alpha1/... diff --git a/pkg/controllers/v1alpha1/mpi_job_controller_test.go b/pkg/controllers/v1alpha1/mpi_job_controller_test.go index 1a0f2bfe8..70143e3fd 100644 --- a/pkg/controllers/v1alpha1/mpi_job_controller_test.go +++ b/pkg/controllers/v1alpha1/mpi_job_controller_test.go @@ -15,7 +15,6 @@ package v1alpha1 import ( - "github.com/stretchr/testify/assert" "reflect" "testing" "time" From 1457e633b7430a15d0182cde463ebe10d5515026 Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Fri, 26 Apr 2019 21:58:41 -0400 Subject: [PATCH 5/7] Fix imports and travis config --- .travis.yml | 4 ++-- cmd/mpi-operator/main.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 41e771b15..f33e1ec7d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,7 @@ before_install: - dep ensure install: -- go get -t -d ./pkg/controllers/v1alpha1/... +- go get -t -d ./... script: -- go test ./pkg/controllers/v1alpha1/... +- go test ./pkg/controllers/v1alpha1/... ./cmd/... diff --git a/cmd/mpi-operator/main.go b/cmd/mpi-operator/main.go index 589adf0a4..3e370e373 100644 --- a/cmd/mpi-operator/main.go +++ b/cmd/mpi-operator/main.go @@ -18,6 +18,7 @@ import ( "flag" "github.com/golang/glog" + controllersv1alpha1 "github.com/kubeflow/mpi-operator/pkg/controllers/v1alpha1" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -25,7 +26,6 @@ import ( clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" - "github.com/kubeflow/mpi-operator/pkg/controllers" policyinformers "k8s.io/client-go/informers/policy/v1beta1" ) @@ -75,7 +75,7 @@ func main() { if enableGangScheduling { pdbInformer = kubeInformerFactory.Policy().V1beta1().PodDisruptionBudgets() } - controller := controllers.NewMPIJobController( + controller := controllersv1alpha1.NewMPIJobController( kubeClient, kubeflowClient, kubeInformerFactory.Core().V1().ConfigMaps(), From f0278f3128d2f041dbce01aa9c02094e6bfb6b10 Mon Sep 17 00:00:00 2001 From: "Yuan (Terry) Tang" Date: Sat, 27 Apr 2019 08:42:53 -0500 Subject: [PATCH 6/7] Update .travis.yml --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index f33e1ec7d..7f6cb384f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,4 +12,4 @@ install: - go get -t -d ./... script: -- go test ./pkg/controllers/v1alpha1/... ./cmd/... +- go test ./pkg/controllers/v1alpha1/... From 22fdc1c34ba1a5a52c0a5d7de78e99ad0dbb86af Mon Sep 17 00:00:00 2001 From: terrytangyuan Date: Mon, 29 Apr 2019 21:39:36 -0400 Subject: [PATCH 7/7] Explicitly exclude v1alpha2 tests --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 7f6cb384f..5187195e1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,4 +12,4 @@ install: - go get -t -d ./... script: -- go test ./pkg/controllers/v1alpha1/... +- go test $(go list ./... | grep -v v1alpha2)