Skip to content

Commit

Permalink
Add leader election (#110)
Browse files Browse the repository at this point in the history
* add leader elector

* run dep ensure
  • Loading branch information
fisherxu authored and k8s-ci-robot committed May 30, 2019
1 parent 4a52f2a commit b619d48
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 37 deletions.
73 changes: 73 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

189 changes: 152 additions & 37 deletions cmd/mpi-operator.v1alpha2/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,34 +15,60 @@
package app

import (
"context"
"fmt"
"os"
"time"

"github.com/golang/glog"
controllersv1alpha2 "github.com/kubeflow/mpi-operator/pkg/controllers/v1alpha2"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/uuid"
kubeinformers "k8s.io/client-go/informers"
policyinformers "k8s.io/client-go/informers/policy/v1beta1"
"k8s.io/client-go/kubernetes"
kubeclientset "k8s.io/client-go/kubernetes"
clientgokubescheme "k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclientset "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
election "k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"
"k8s.io/sample-controller/pkg/signals"

"github.com/kubeflow/mpi-operator/cmd/mpi-operator.v1alpha2/app/options"
clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
"github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v1alpha2"
mpijobclientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned"
informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions"
"github.com/kubeflow/mpi-operator/pkg/version"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
apiVersion = "v1alpha2"
RecommendedKubeConfigPathEnv = "KUBECONFIG"
)

var (
// leader election config
leaseDuration = 15 * time.Second
renewDuration = 5 * time.Second
retryPeriod = 3 * time.Second
)

func Run(opt *options.ServerOption) error {
// Check if the -version flag was passed and, if so, print the version and exit.
if opt.PrintVersion {
version.PrintVersionAndExit(apiVersion)
}

namespace := os.Getenv(v1alpha2.EnvKubeflowNamespace)
if len(namespace) == 0 {
glog.Infof("EnvKubeflowNamespace not set, use default namespace")
namespace = metav1.NamespaceDefault
}

if opt.Namespace == corev1.NamespaceAll {
glog.Info("Using cluster scoped operator")
} else {
Expand All @@ -67,49 +93,138 @@ func Run(opt *options.ServerOption) error {
glog.Fatalf("Error building kubeConfig: %s", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
// Create clients.
kubeClient, leaderElectionClientSet, mpiJobClientSet, err := createClientSets(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
return err
}
if !checkCRDExists(mpiJobClientSet, opt.Namespace) {
glog.Info("CRD doesn't exist. Exiting")
os.Exit(1)
}

kubeflowClient, err := clientset.NewForConfig(cfg)
// Set leader election start function.
run := func(ctx context.Context) {
var kubeInformerFactory kubeinformers.SharedInformerFactory
var kubeflowInformerFactory informers.SharedInformerFactory
if opt.Namespace == "" {
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, 0)
kubeflowInformerFactory = informers.NewSharedInformerFactory(mpiJobClientSet, 0)
} else {
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(opt.Namespace), nil)
kubeflowInformerFactory = informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, informers.WithNamespace(opt.Namespace), nil)
}

var pdbInformer policyinformers.PodDisruptionBudgetInformer
if opt.EnableGangScheduling {
pdbInformer = kubeInformerFactory.Policy().V1beta1().PodDisruptionBudgets()
}
controller := controllersv1alpha2.NewMPIJobController(
kubeClient,
mpiJobClientSet,
kubeInformerFactory.Core().V1().ConfigMaps(),
kubeInformerFactory.Core().V1().ServiceAccounts(),
kubeInformerFactory.Rbac().V1().Roles(),
kubeInformerFactory.Rbac().V1().RoleBindings(),
kubeInformerFactory.Apps().V1().StatefulSets(),
kubeInformerFactory.Batch().V1().Jobs(),
pdbInformer,
kubeflowInformerFactory.Kubeflow().V1alpha2().MPIJobs(),
opt.KubectlDeliveryImage,
opt.EnableGangScheduling)

go kubeInformerFactory.Start(ctx.Done())
go kubeflowInformerFactory.Start(ctx.Done())

if err = controller.Run(opt.Threadiness, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
}

id, err := os.Hostname()
if err != nil {
glog.Fatalf("Error building kubeflow clientset: %s", err.Error())
return fmt.Errorf("failed to get hostname: %v", err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())

// Prepare event clients.
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(clientgokubescheme.Scheme, corev1.EventSource{Component: "mpi-operator"})

rl := &resourcelock.EndpointsLock{
EndpointsMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "mpi-operator",
},
Client: leaderElectionClientSet.CoreV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
},
}

var kubeInformerFactory kubeinformers.SharedInformerFactory
var kubeflowInformerFactory informers.SharedInformerFactory
if opt.Namespace == "" {
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, 0)
kubeflowInformerFactory = informers.NewSharedInformerFactory(kubeflowClient, 0)
} else {
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(opt.Namespace), nil)
kubeflowInformerFactory = informers.NewSharedInformerFactoryWithOptions(kubeflowClient, 0, informers.WithNamespace(opt.Namespace), nil)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()

go func() {
select {
case <-stopCh:
cancel()
case <-ctx.Done():
}
}()

// Start leader election.
election.RunOrDie(ctx, election.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leaseDuration,
RenewDeadline: renewDuration,
RetryPeriod: retryPeriod,
Callbacks: election.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
glog.Fatalf("leader election lost")
},
},
Name: "mpi-operator",
})

return fmt.Errorf("finished without leader elect")
}

func createClientSets(config *restclientset.Config) (kubeclientset.Interface, kubeclientset.Interface, mpijobclientset.Interface, error) {

kubeClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "mpi-operator"))
if err != nil {
return nil, nil, nil, err
}

leaderElectionClientSet, err := kubeclientset.NewForConfig(restclientset.AddUserAgent(config, "leader-election"))
if err != nil {
return nil, nil, nil, err
}

var pdbInformer policyinformers.PodDisruptionBudgetInformer
if opt.EnableGangScheduling {
pdbInformer = kubeInformerFactory.Policy().V1beta1().PodDisruptionBudgets()
mpiJobClientSet, err := mpijobclientset.NewForConfig(config)
if err != nil {
return nil, nil, nil, err
}
controller := controllersv1alpha2.NewMPIJobController(
kubeClient,
kubeflowClient,
kubeInformerFactory.Core().V1().ConfigMaps(),
kubeInformerFactory.Core().V1().ServiceAccounts(),
kubeInformerFactory.Rbac().V1().Roles(),
kubeInformerFactory.Rbac().V1().RoleBindings(),
kubeInformerFactory.Apps().V1().StatefulSets(),
kubeInformerFactory.Batch().V1().Jobs(),
pdbInformer,
kubeflowInformerFactory.Kubeflow().V1alpha2().MPIJobs(),
opt.KubectlDeliveryImage,
opt.EnableGangScheduling)

go kubeInformerFactory.Start(stopCh)
go kubeflowInformerFactory.Start(stopCh)

if err = controller.Run(opt.Threadiness, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())

return kubeClientSet, leaderElectionClientSet, mpiJobClientSet, nil
}

func checkCRDExists(clientset mpijobclientset.Interface, namespace string) bool {
_, err := clientset.KubeflowV1alpha2().MPIJobs(namespace).List(metav1.ListOptions{})

if err != nil {
glog.Error(err)
if _, ok := err.(*errors.StatusError); ok {
if errors.IsNotFound(err) {
return false
}
}
}
return nil
return true
}
20 changes: 20 additions & 0 deletions pkg/apis/kubeflow/v1alpha2/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2019 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package v1alpha2

const (
// EnvKubeflowNamespace is ENV for kubeflow namespace specified by user.
EnvKubeflowNamespace = "KUBEFLOW_NAMESPACE"
)

0 comments on commit b619d48

Please sign in to comment.