From b619d48e80191e516c5faaf3742891336cf7e827 Mon Sep 17 00:00:00 2001 From: Fei Xu Date: Thu, 30 May 2019 11:20:22 +0800 Subject: [PATCH] Add leader election (#110) * add leader elector * run dep ensure --- Gopkg.lock | 73 +++++++++ cmd/mpi-operator.v1alpha2/app/server.go | 189 +++++++++++++++++++----- pkg/apis/kubeflow/v1alpha2/constants.go | 20 +++ 3 files changed, 245 insertions(+), 37 deletions(-) create mode 100644 pkg/apis/kubeflow/v1alpha2/constants.go diff --git a/Gopkg.lock b/Gopkg.lock index cb7e9d0dd..158cba3b9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -74,6 +74,14 @@ pruneopts = "" revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1" +[[projects]] + digest = "1:ad92aa49f34cbc3546063c7eb2cabb55ee2278b72842eda80e2a20a8a06a8d73" + name = "github.com/google/uuid" + packages = ["."] + pruneopts = "" + revision = "0cd6bf5da1e1c83f8b45653022c74f71af0538a4" + version = "v1.1.1" + [[projects]] digest = "1:16b2837c8b3cf045fa2cdc82af0cf78b19582701394484ae76b2c3bc3c99ad73" name = "github.com/googleapis/gnostic" @@ -124,6 +132,36 @@ revision = "1624edc4454b8682399def8740d46db5e4362ba4" version = "v1.1.5" +[[projects]] + branch = "master" + digest = "1:5c10d22407df1430d33747ecd8412a3f690ae1c55678c30c259e2a38baffccea" + name = "github.com/kubeflow/mpi-operator" + packages = [ + "cmd/mpi-operator.v1alpha2/app", + "cmd/mpi-operator.v1alpha2/app/options", + "pkg/apis/kubeflow/v1alpha1", + "pkg/apis/kubeflow/v1alpha2", + "pkg/client/clientset/versioned", + "pkg/client/clientset/versioned/fake", + "pkg/client/clientset/versioned/scheme", + "pkg/client/clientset/versioned/typed/kubeflow/v1alpha1", + "pkg/client/clientset/versioned/typed/kubeflow/v1alpha1/fake", + "pkg/client/clientset/versioned/typed/kubeflow/v1alpha2", + "pkg/client/clientset/versioned/typed/kubeflow/v1alpha2/fake", + "pkg/client/informers/externalversions", + "pkg/client/informers/externalversions/internalinterfaces", + "pkg/client/informers/externalversions/kubeflow", + "pkg/client/informers/externalversions/kubeflow/v1alpha1", + "pkg/client/informers/externalversions/kubeflow/v1alpha2", + "pkg/client/listers/kubeflow/v1alpha1", + "pkg/client/listers/kubeflow/v1alpha2", + "pkg/controllers/v1alpha1", + "pkg/controllers/v1alpha2", + "pkg/version", + ] + pruneopts = "" + revision = "4a52f2af73739a5708067cec347be435aca3924e" + [[projects]] digest = "1:0c0ff2a89c1bb0d01887e1dac043ad7efbf3ec77482ef058ac423d13497e16fd" name = "github.com/modern-go/concurrent" @@ -140,6 +178,14 @@ revision = "4b7aa43c6742a2c18fdef89dd197aaae7dac7ccd" version = "1.0.1" +[[projects]] + digest = "1:a5484d4fa43127138ae6e7b2299a6a52ae006c7f803d98d717f60abf3e97192e" + name = "github.com/pborman/uuid" + packages = ["."] + pruneopts = "" + revision = "adf5a7427709b9deb95d29d3fa8a2bf9cfd388f1" + version = "v1.2" + [[projects]] branch = "master" digest = "1:c24598ffeadd2762552269271b3b1510df2d83ee6696c1e543a0ff653af494bc" @@ -387,6 +433,7 @@ "pkg/util/runtime", "pkg/util/sets", "pkg/util/strategicpatch", + "pkg/util/uuid", "pkg/util/validation", "pkg/util/validation/field", "pkg/util/wait", @@ -561,6 +608,8 @@ "tools/clientcmd/api", "tools/clientcmd/api/latest", "tools/clientcmd/api/v1", + "tools/leaderelection", + "tools/leaderelection/resourcelock", "tools/metrics", "tools/pager", "tools/record", @@ -648,6 +697,27 @@ analyzer-version = 1 input-imports = [ "github.com/golang/glog", + "github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app", + "github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app/options", + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v1alpha1", + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v1alpha2", + "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned", + "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/fake", + "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme", + "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/typed/kubeflow/v1alpha1", + "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/typed/kubeflow/v1alpha1/fake", + "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/typed/kubeflow/v1alpha2", + "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/typed/kubeflow/v1alpha2/fake", + "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions", + "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions/internalinterfaces", + "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions/kubeflow", + "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions/kubeflow/v1alpha1", + "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions/kubeflow/v1alpha2", + "github.com/kubeflow/mpi-operator/pkg/client/listers/kubeflow/v1alpha1", + "github.com/kubeflow/mpi-operator/pkg/client/listers/kubeflow/v1alpha2", + "github.com/kubeflow/mpi-operator/pkg/controllers/v1alpha1", + "github.com/kubeflow/mpi-operator/pkg/controllers/v1alpha2", + "github.com/kubeflow/mpi-operator/pkg/version", "github.com/stretchr/testify/assert", "k8s.io/api/apps/v1", "k8s.io/api/batch/v1", @@ -665,6 +735,7 @@ "k8s.io/apimachinery/pkg/util/diff", "k8s.io/apimachinery/pkg/util/intstr", "k8s.io/apimachinery/pkg/util/runtime", + "k8s.io/apimachinery/pkg/util/uuid", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/watch", "k8s.io/client-go/discovery", @@ -688,6 +759,8 @@ "k8s.io/client-go/testing", "k8s.io/client-go/tools/cache", "k8s.io/client-go/tools/clientcmd", + "k8s.io/client-go/tools/leaderelection", + "k8s.io/client-go/tools/leaderelection/resourcelock", "k8s.io/client-go/tools/record", "k8s.io/client-go/util/flowcontrol", "k8s.io/client-go/util/workqueue", diff --git a/cmd/mpi-operator.v1alpha2/app/server.go b/cmd/mpi-operator.v1alpha2/app/server.go index ba54414d9..04cd4c724 100644 --- a/cmd/mpi-operator.v1alpha2/app/server.go +++ b/cmd/mpi-operator.v1alpha2/app/server.go @@ -15,21 +15,34 @@ package app import ( + "context" + "fmt" "os" + "time" "github.com/golang/glog" controllersv1alpha2 "github.com/kubeflow/mpi-operator/pkg/controllers/v1alpha2" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/uuid" kubeinformers "k8s.io/client-go/informers" policyinformers "k8s.io/client-go/informers/policy/v1beta1" - "k8s.io/client-go/kubernetes" + kubeclientset "k8s.io/client-go/kubernetes" + clientgokubescheme "k8s.io/client-go/kubernetes/scheme" + v1core "k8s.io/client-go/kubernetes/typed/core/v1" + restclientset "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + election "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" "k8s.io/sample-controller/pkg/signals" "github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app/options" - clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" + "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v1alpha2" + mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" "github.com/kubeflow/mpi-operator/pkg/version" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -37,12 +50,25 @@ const ( RecommendedKubeConfigPathEnv = "KUBECONFIG" ) +var ( + // leader election config + leaseDuration = 15 * time.Second + renewDuration = 5 * time.Second + retryPeriod = 3 * time.Second +) + func Run(opt *options.ServerOption) error { // Check if the -version flag was passed and, if so, print the version and exit. if opt.PrintVersion { version.PrintVersionAndExit(apiVersion) } + namespace := os.Getenv(v1alpha2.EnvKubeflowNamespace) + if len(namespace) == 0 { + glog.Infof("EnvKubeflowNamespace not set, use default namespace") + namespace = metav1.NamespaceDefault + } + if opt.Namespace == corev1.NamespaceAll { glog.Info("Using cluster scoped operator") } else { @@ -67,49 +93,138 @@ func Run(opt *options.ServerOption) error { glog.Fatalf("Error building kubeConfig: %s", err.Error()) } - kubeClient, err := kubernetes.NewForConfig(cfg) + // Create clients. + kubeClient, leaderElectionClientSet, mpiJobClientSet, err := createClientSets(cfg) if err != nil { - glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + return err + } + if !checkCRDExists(mpiJobClientSet, opt.Namespace) { + glog.Info("CRD doesn't exist. Exiting") + os.Exit(1) } - kubeflowClient, err := clientset.NewForConfig(cfg) + // Set leader election start function. + run := func(ctx context.Context) { + var kubeInformerFactory kubeinformers.SharedInformerFactory + var kubeflowInformerFactory informers.SharedInformerFactory + if opt.Namespace == "" { + kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, 0) + kubeflowInformerFactory = informers.NewSharedInformerFactory(mpiJobClientSet, 0) + } else { + kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(opt.Namespace), nil) + kubeflowInformerFactory = informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, informers.WithNamespace(opt.Namespace), nil) + } + + var pdbInformer policyinformers.PodDisruptionBudgetInformer + if opt.EnableGangScheduling { + pdbInformer = kubeInformerFactory.Policy().V1beta1().PodDisruptionBudgets() + } + controller := controllersv1alpha2.NewMPIJobController( + kubeClient, + mpiJobClientSet, + kubeInformerFactory.Core().V1().ConfigMaps(), + kubeInformerFactory.Core().V1().ServiceAccounts(), + kubeInformerFactory.Rbac().V1().Roles(), + kubeInformerFactory.Rbac().V1().RoleBindings(), + kubeInformerFactory.Apps().V1().StatefulSets(), + kubeInformerFactory.Batch().V1().Jobs(), + pdbInformer, + kubeflowInformerFactory.Kubeflow().V1alpha2().MPIJobs(), + opt.KubectlDeliveryImage, + opt.EnableGangScheduling) + + go kubeInformerFactory.Start(ctx.Done()) + go kubeflowInformerFactory.Start(ctx.Done()) + + if err = controller.Run(opt.Threadiness, stopCh); err != nil { + glog.Fatalf("Error running controller: %s", err.Error()) + } + } + + id, err := os.Hostname() if err != nil { - glog.Fatalf("Error building kubeflow clientset: %s", err.Error()) + return fmt.Errorf("failed to get hostname: %v", err) + } + // add a uniquifier so that two processes on the same host don't accidentally both become active + id = id + "_" + string(uuid.NewUUID()) + + // Prepare event clients. + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, corev1.EventSource{Component: "mpi-operator"}) + + rl := &resourcelock.EndpointsLock{ + EndpointsMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "mpi-operator", + }, + Client: leaderElectionClientSet.CoreV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }, } - var kubeInformerFactory kubeinformers.SharedInformerFactory - var kubeflowInformerFactory informers.SharedInformerFactory - if opt.Namespace == "" { - kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, 0) - kubeflowInformerFactory = informers.NewSharedInformerFactory(kubeflowClient, 0) - } else { - kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(opt.Namespace), nil) - kubeflowInformerFactory = informers.NewSharedInformerFactoryWithOptions(kubeflowClient, 0, informers.WithNamespace(opt.Namespace), nil) + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + go func() { + select { + case <-stopCh: + cancel() + case <-ctx.Done(): + } + }() + + // Start leader election. + election.RunOrDie(ctx, election.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: leaseDuration, + RenewDeadline: renewDuration, + RetryPeriod: retryPeriod, + Callbacks: election.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + glog.Fatalf("leader election lost") + }, + }, + Name: "mpi-operator", + }) + + return fmt.Errorf("finished without leader elect") +} + +func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, mpijobclientset.Interface, error) { + + kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "mpi-operator")) + if err != nil { + return nil, nil, nil, err + } + + leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election")) + if err != nil { + return nil, nil, nil, err } - var pdbInformer policyinformers.PodDisruptionBudgetInformer - if opt.EnableGangScheduling { - pdbInformer = kubeInformerFactory.Policy().V1beta1().PodDisruptionBudgets() + mpiJobClientSet, err := mpijobclientset.NewForConfig(config) + if err != nil { + return nil, nil, nil, err } - controller := controllersv1alpha2.NewMPIJobController( - kubeClient, - kubeflowClient, - kubeInformerFactory.Core().V1().ConfigMaps(), - kubeInformerFactory.Core().V1().ServiceAccounts(), - kubeInformerFactory.Rbac().V1().Roles(), - kubeInformerFactory.Rbac().V1().RoleBindings(), - kubeInformerFactory.Apps().V1().StatefulSets(), - kubeInformerFactory.Batch().V1().Jobs(), - pdbInformer, - kubeflowInformerFactory.Kubeflow().V1alpha2().MPIJobs(), - opt.KubectlDeliveryImage, - opt.EnableGangScheduling) - - go kubeInformerFactory.Start(stopCh) - go kubeflowInformerFactory.Start(stopCh) - - if err = controller.Run(opt.Threadiness, stopCh); err != nil { - glog.Fatalf("Error running controller: %s", err.Error()) + + return kubeClientSet, leaderElectionClientSet, mpiJobClientSet, nil +} + +func checkCRDExists(clientset mpijobclientset.Interface, namespace string) bool { + _, err := clientset.KubeflowV1alpha2().MPIJobs(namespace).List(metav1.ListOptions{}) + + if err != nil { + glog.Error(err) + if _, ok := err.(*errors.StatusError); ok { + if errors.IsNotFound(err) { + return false + } + } } - return nil + return true } diff --git a/pkg/apis/kubeflow/v1alpha2/constants.go b/pkg/apis/kubeflow/v1alpha2/constants.go new file mode 100644 index 000000000..76aea39d1 --- /dev/null +++ b/pkg/apis/kubeflow/v1alpha2/constants.go @@ -0,0 +1,20 @@ +// Copyright 2019 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1alpha2 + +const ( + // EnvKubeflowNamespace is ENV for kubeflow namespace specified by user. + EnvKubeflowNamespace = "KUBEFLOW_NAMESPACE" +)