Skip to content

Commit

Permalink
feat: add leader election to allow for multiple replicas.
Browse files Browse the repository at this point in the history
  • Loading branch information
Justin Bertrand committed Nov 1, 2023
1 parent 0bb13c2 commit 9e0541f
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 11 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/StatCan/ingress-istio-controller
go 1.18

require (
github.com/google/uuid v1.1.2
istio.io/api v0.0.0-20211015181651-ddbde26ea264
istio.io/client-go v1.10.6
k8s.io/api v0.20.2
Expand Down
101 changes: 94 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/StatCan/ingress-istio-controller/pkg/controller"
"github.com/StatCan/ingress-istio-controller/pkg/signals"
istio "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/transport"
"k8s.io/klog"
)

Expand All @@ -23,14 +33,14 @@ var (
scopedGateways bool
ingressClass string
defaultWeight int
lockName string
lockNamespace string
)

func main() {
klog.InitFlags(nil)
flag.Parse()

stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("error building kubeconfig: %v", err)
Expand Down Expand Up @@ -63,11 +73,86 @@ func main() {
istioInformerFactory.Networking().V1beta1().VirtualServices(),
istioInformerFactory.Networking().V1beta1().Gateways())

kubeInformerFactory.Start(stopCh)
istioInformerFactory.Start(stopCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-wait
klog.Info("received signal, shutting down")
cancel()
}()

kubeInformerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done())

runWithLeaderElection(ctlr, cfg, kubeclient, ctx)
}

func runWithLeaderElection(ctlr *controller.Controller, cfg *rest.Config, kubeclient *kubernetes.Clientset, ctx context.Context) {

// Acquire a lock
// Identity used to distinguish between multiple cloud controller manager instances
id, err := os.Hostname()
if err != nil {
klog.Fatal(err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
id = id + "_" + string(uuid.NewUUID())
klog.Infof("generated id: %s", id)

var lock resourcelock.Interface

lock = &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockName,
Namespace: lockNamespace,
},
Client: kubeclient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}

cfg.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
if err := ctlr.Run(2, ctx); err != nil {
if err != context.Canceled {
klog.Errorf("error running controller: %v", err)
}
}
},
OnStoppedLeading: func() {
klog.Info("stopped leading")
},
OnNewLeader: func(identity string) {
if identity == id {
// We just acquired the lock
return
}

klog.Infof("new leader elected: %v", identity)
},
},
})
}

if err = ctlr.Run(2, stopCh); err != nil {
klog.Fatalf("error running controller: %v", err)
// Returns an environment variables value if set, otherwise returns dflt.
func getEnvOrDefault(dflt, envVar string) string {
val, ok := os.LookupEnv(envVar)
if ok {
return val
} else {
return dflt
}
}

Expand All @@ -79,4 +164,6 @@ func init() {
flag.BoolVar(&scopedGateways, "scoped-gateways", false, "Gateways are scoped to the same namespace they exist within. This will limit the Service search for Load Balancer status. In istiod, this is controlled via the PILOT_SCOPE_GATEWAY_TO_NAMESPACE environment variable.")
flag.StringVar(&ingressClass, "ingress-class", "", "The ingress class annotation to monitor (empty string to skip checking annotation)")
flag.IntVar(&defaultWeight, "virtual-service-weight", 100, "The weight of the Virtual Service destination.")
flag.StringVar(&lockName, "lock-name", getEnvOrDefault("sidecar-terminator", "LOCK_NAME"), "The name of the leader lock.")
flag.StringVar(&lockNamespace, "lock-namespace", getEnvOrDefault("sidecar-terminator-system", "LOCK_NAMESPACE"), "The namespace where the leader lock resides.")
}
9 changes: 5 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -131,24 +132,24 @@ func NewController(
}

// Run runs the controller.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
func (c *Controller) Run(threadiness int, ctx context.Context) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()

klog.Info("starting controller")

klog.Info("waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched, c.gatewaysSynched); !ok {
if ok := cache.WaitForCacheSync(ctx.Done(), c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched, c.gatewaysSynched); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

klog.Info("starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
}

klog.Info("started workers")
<-stopCh
<-ctx.Done()
klog.Info("shutting down workers")

return nil
Expand Down

0 comments on commit 9e0541f

Please sign in to comment.