Skip to content

Commit

Permalink
Add fan-out support for multiple NGINX+ Edge Load-Balancers
Browse files Browse the repository at this point in the history
Settings using Informer instead of Watch
  • Loading branch information
ciroque committed Feb 10, 2023
1 parent 6983e3f commit 92cd76b
Show file tree
Hide file tree
Showing 10 changed files with 338 additions and 82 deletions.
40 changes: 38 additions & 2 deletions cmd/nginx-k8s-edge-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ package main
import (
"context"
"fmt"
"github.com/nginxinc/kubernetes-nginx-ingress/internal/config"
"github.com/nginxinc/kubernetes-nginx-ingress/internal/observation"
"github.com/nginxinc/kubernetes-nginx-ingress/internal/synchronization"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

func main() {
Expand All @@ -23,7 +26,22 @@ func run() error {
ctx := context.Background()
var err error

synchronizer, err := synchronization.NewSynchronizer()
k8sClient, err := buildKubernetesClient()
if err != nil {
return fmt.Errorf(`error building a Kubernetes client: %w`, err)
}

settings, err := config.NewSettings(ctx, k8sClient)
if err != nil {
return fmt.Errorf(`error occurred creating settings: %w`, err)
}

err = settings.Initialize()
if err != nil {
return fmt.Errorf(`error occurred initializing settings: %w`, err)
}

synchronizer, err := synchronization.NewSynchronizer(settings)
if err != nil {
return fmt.Errorf(`error initializing synchronizer: %w`, err)
}
Expand All @@ -36,7 +54,7 @@ func run() error {
handler := observation.NewHandler(synchronizer)
handler.Initialize()

watcher, err := observation.NewWatcher(ctx, handler)
watcher, err := observation.NewWatcher(ctx, handler, k8sClient)
if err != nil {
return fmt.Errorf(`error occurred creating a watcher: %w`, err)
}
Expand All @@ -46,6 +64,7 @@ func run() error {
return fmt.Errorf(`error occurred initializing the watcher: %w`, err)
}

go settings.Run()
go handler.Run(ctx.Done())
go synchronizer.Run(ctx.Done())

Expand All @@ -57,3 +76,20 @@ func run() error {
<-ctx.Done()
return nil
}

func buildKubernetesClient() (*kubernetes.Clientset, error) {
logrus.Debug("Watcher::buildKubernetesClient")
k8sConfig, err := rest.InClusterConfig()
if err == rest.ErrNotInCluster {
return nil, fmt.Errorf(`not running in a Cluster: %w`, err)
} else if err != nil {
return nil, fmt.Errorf(`error occurred getting the Cluster config: %w`, err)
}

client, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
return nil, fmt.Errorf(`error occurred creating a client: %w`, err)
}

return client, nil
}
8 changes: 8 additions & 0 deletions deployment/nkl-configmap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apiVersion: v1
kind: ConfigMap
data:
nginx-hosts:
"http://10.1.1.4:9000/api,http://10.1.1.5:9000/api"
metadata:
name: nkl-config
namespace: nkl
8 changes: 4 additions & 4 deletions deployment/nkl-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ kind: Deployment
metadata:
name: nkl-deployment
labels:
app: nec
app: nkl
spec:
replicas: 1
selector:
matchLabels:
app: nec
app: nkl
template:
metadata:
labels:
app: nec
app: nkl
spec:
containers:
- name: nginx-k8s-edge-controller
env:
- name: NGINX_PLUS_HOST
value: "http://192.168.1.109:9000/api"
value: "http://10.1.1.4:9000/api"
image: ciroque/nginx-k8s-edge-controller:latest
imagePullPolicy: Always
serviceAccountName: nginx-k8s-edge-controller
6 changes: 6 additions & 0 deletions deployment/nkl-namespace.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apiVersion: v1
kind: Namespace
metadata:
name: nkl
labels:
name: nkl
126 changes: 118 additions & 8 deletions internal/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,131 @@
package config

import (
"errors"
"os"
"context"
"fmt"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"strings"
)

const (
ConfigMapsNamespace = "nkl"
ResyncPeriod = 0
)

type Settings struct {
NginxPlusHost string
ctx context.Context
NginxPlusHosts []string
k8sClient *kubernetes.Clientset
informer cache.SharedInformer
eventHandlerRegistration cache.ResourceEventHandlerRegistration
}

func NewSettings() (*Settings, error) {
func NewSettings(ctx context.Context, k8sClient *kubernetes.Clientset) (*Settings, error) {
config := new(Settings)

config.NginxPlusHost = os.Getenv("NGINX_PLUS_HOST")
if config.NginxPlusHost == "" {
return nil, errors.New("the NGINX_PLUS_HOST variable is not defined. This is required")
}
config.k8sClient = k8sClient
config.ctx = ctx

return config, nil
}

func (s *Settings) Initialize() error {
logrus.Info("Settings::Initialize")

var err error

informer, err := s.buildInformer()
if err != nil {
return fmt.Errorf(`error occurred building ConfigMap informer: %w`, err)
}

s.informer = informer

err = s.initializeEventListeners()
if err != nil {
return fmt.Errorf(`error occurred initializing event listeners: %w`, err)
}

return nil
}

func (s *Settings) Run() {
logrus.Debug("Settings::Run")

defer utilruntime.HandleCrash()

go s.informer.Run(s.ctx.Done())

<-s.ctx.Done()
}

func (s *Settings) buildInformer() (cache.SharedInformer, error) {
options := informers.WithNamespace(ConfigMapsNamespace)
factory := informers.NewSharedInformerFactoryWithOptions(s.k8sClient, ResyncPeriod, options)
informer := factory.Core().V1().ConfigMaps().Informer()

return informer, nil
}

func (s *Settings) initializeEventListeners() error {
logrus.Debug("Settings::initializeEventListeners")

var err error

handlers := cache.ResourceEventHandlerFuncs{
AddFunc: s.handleAddEvent,
UpdateFunc: s.handleUpdateEvent,
DeleteFunc: s.handleDeleteEvent,
}

s.eventHandlerRegistration, err = s.informer.AddEventHandler(handlers)
if err != nil {
return fmt.Errorf(`error occurred registering event handlers: %w`, err)
}

return nil
}

func (s *Settings) handleAddEvent(obj interface{}) {
logrus.Debug("Settings::handleAddEvent")

s.handleUpdateEvent(obj, nil)
}

func (s *Settings) handleDeleteEvent(_ interface{}) {
logrus.Debug("Settings::handleDeleteEvent")

s.updateHosts([]string{})
}

func (s *Settings) handleUpdateEvent(obj interface{}, _ interface{}) {
logrus.Debug("Settings::handleUpdateEvent")

configMap, ok := obj.(*corev1.ConfigMap)
if !ok {
logrus.Errorf("Settings::handleUpdateEvent: could not convert obj to ConfigMap")
return
}

hosts, found := configMap.Data["nginx-hosts"]
if !found {
logrus.Errorf("Settings::handleUpdateEvent: nginx-hosts key not found in ConfigMap")
return
}

newHosts := s.parseHosts(hosts)
s.updateHosts(newHosts)
}

func (s *Settings) parseHosts(hosts string) []string {
return strings.Split(hosts, ",")
}

func (s *Settings) updateHosts(hosts []string) {
s.NginxPlusHosts = hosts
}
12 changes: 12 additions & 0 deletions internal/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type Event struct {
}

type ServerUpdateEvent struct {
Id string
NginxHost string
Type EventType
UpstreamName string
Servers []nginxClient.StreamUpstreamServer
Expand All @@ -45,6 +47,16 @@ func NewServerUpdateEvent(eventType EventType, upstreamName string, servers []ng
}
}

func ServerUpdateEventWithIdAndHost(event *ServerUpdateEvent, id string, nginxHost string) *ServerUpdateEvent {
return &ServerUpdateEvent{
Id: id,
NginxHost: nginxHost,
Type: event.Type,
UpstreamName: event.UpstreamName,
Servers: event.Servers,
}
}

func (e *ServerUpdateEvent) TypeName() string {
switch e.Type {
case Created:
Expand Down
29 changes: 4 additions & 25 deletions internal/observation/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"time"
)
Expand All @@ -30,9 +29,10 @@ type Watcher struct {
informer cache.SharedIndexInformer
}

func NewWatcher(ctx context.Context, handler *Handler) (*Watcher, error) {
func NewWatcher(ctx context.Context, handler *Handler, k8sClient *kubernetes.Clientset) (*Watcher, error) {
return &Watcher{
ctx: ctx,
client: k8sClient,
handler: handler,
}, nil
}
Expand All @@ -41,11 +41,6 @@ func (w *Watcher) Initialize() error {
logrus.Debug("Watcher::Initialize")
var err error

w.client, err = w.buildKubernetesClient()
if err != nil {
return fmt.Errorf(`initalization error: %w`, err)
}

w.informer, err = w.buildInformer()
if err != nil {
return fmt.Errorf(`initialization error: %w`, err)
Expand Down Expand Up @@ -129,23 +124,6 @@ func (w *Watcher) buildInformer() (cache.SharedIndexInformer, error) {
return informer, nil
}

func (w *Watcher) buildKubernetesClient() (*kubernetes.Clientset, error) {
logrus.Debug("Watcher::buildKubernetesClient")
k8sConfig, err := rest.InClusterConfig()
if err == rest.ErrNotInCluster {
return nil, fmt.Errorf(`not running in a Cluster: %w`, err)
} else if err != nil {
return nil, fmt.Errorf(`error occurred getting the Cluster config: %w`, err)
}

client, err := kubernetes.NewForConfig(k8sConfig)
if err != nil {
return nil, fmt.Errorf(`error occurred creating a client: %w`, err)
}

return client, nil
}

func (w *Watcher) initializeEventListeners() error {
logrus.Debug("Watcher::initializeEventListeners")
var err error
Expand Down Expand Up @@ -186,7 +164,8 @@ func (w *Watcher) retrieveNodeIps() ([]string, error) {
}
}

logrus.Infof("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds())
logrus.Debugf("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds())

return nodeIps, nil
}

Expand Down
31 changes: 31 additions & 0 deletions internal/synchronization/rand.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2023 f5 Inc. All rights reserved.
// Use of this source code is governed by the Apache
// license that can be found in the LICENSE file.

package synchronization

import (
"math/rand"
"time"
)

var charset = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
var number = []byte("0123456789")
var alphaNumeric = append(charset, number...)

// RandomString where n is the length of random string we want to generate
func RandomString(n int) string {
b := make([]byte, n)
for i := range b {
// randomly select 1 character from given charset
b[i] = alphaNumeric[rand.Intn(len(alphaNumeric))]
}
return string(b)
}

func RandomMilliseconds(min, max int) time.Duration {
randomizer := rand.New(rand.NewSource(time.Now().UnixNano()))
random := randomizer.Intn(max-min) + min

return time.Millisecond * time.Duration(random)
}
Loading

0 comments on commit 92cd76b

Please sign in to comment.