diff --git a/cmd/nginx-k8s-edge-controller/main.go b/cmd/nginx-k8s-edge-controller/main.go index af586b0..0014481 100644 --- a/cmd/nginx-k8s-edge-controller/main.go +++ b/cmd/nginx-k8s-edge-controller/main.go @@ -7,9 +7,12 @@ package main import ( "context" "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/config" "github.com/nginxinc/kubernetes-nginx-ingress/internal/observation" "github.com/nginxinc/kubernetes-nginx-ingress/internal/synchronization" "github.com/sirupsen/logrus" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" ) func main() { @@ -23,7 +26,22 @@ func run() error { ctx := context.Background() var err error - synchronizer, err := synchronization.NewSynchronizer() + k8sClient, err := buildKubernetesClient() + if err != nil { + return fmt.Errorf(`error building a Kubernetes client: %w`, err) + } + + settings, err := config.NewSettings(ctx, k8sClient) + if err != nil { + return fmt.Errorf(`error occurred creating settings: %w`, err) + } + + err = settings.Initialize() + if err != nil { + return fmt.Errorf(`error occurred initializing settings: %w`, err) + } + + synchronizer, err := synchronization.NewSynchronizer(settings) if err != nil { return fmt.Errorf(`error initializing synchronizer: %w`, err) } @@ -36,7 +54,7 @@ func run() error { handler := observation.NewHandler(synchronizer) handler.Initialize() - watcher, err := observation.NewWatcher(ctx, handler) + watcher, err := observation.NewWatcher(ctx, handler, k8sClient) if err != nil { return fmt.Errorf(`error occurred creating a watcher: %w`, err) } @@ -46,6 +64,7 @@ func run() error { return fmt.Errorf(`error occurred initializing the watcher: %w`, err) } + go settings.Run() go handler.Run(ctx.Done()) go synchronizer.Run(ctx.Done()) @@ -57,3 +76,20 @@ func run() error { <-ctx.Done() return nil } + +func buildKubernetesClient() (*kubernetes.Clientset, error) { + logrus.Debug("Watcher::buildKubernetesClient") + k8sConfig, err := rest.InClusterConfig() + if err == rest.ErrNotInCluster { + return nil, fmt.Errorf(`not running in a Cluster: %w`, err) + } else if err != nil { + return nil, fmt.Errorf(`error occurred getting the Cluster config: %w`, err) + } + + client, err := kubernetes.NewForConfig(k8sConfig) + if err != nil { + return nil, fmt.Errorf(`error occurred creating a client: %w`, err) + } + + return client, nil +} diff --git a/deployment/nkl-configmap.yaml b/deployment/nkl-configmap.yaml new file mode 100644 index 0000000..247885b --- /dev/null +++ b/deployment/nkl-configmap.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: ConfigMap +data: + nginx-hosts: + "http://10.1.1.4:9000/api,http://10.1.1.5:9000/api" +metadata: + name: nkl-config + namespace: nkl diff --git a/deployment/nkl-deployment.yaml b/deployment/nkl-deployment.yaml index 330ccb4..db00339 100644 --- a/deployment/nkl-deployment.yaml +++ b/deployment/nkl-deployment.yaml @@ -3,22 +3,22 @@ kind: Deployment metadata: name: nkl-deployment labels: - app: nec + app: nkl spec: replicas: 1 selector: matchLabels: - app: nec + app: nkl template: metadata: labels: - app: nec + app: nkl spec: containers: - name: nginx-k8s-edge-controller env: - name: NGINX_PLUS_HOST - value: "http://192.168.1.109:9000/api" + value: "http://10.1.1.4:9000/api" image: ciroque/nginx-k8s-edge-controller:latest imagePullPolicy: Always serviceAccountName: nginx-k8s-edge-controller diff --git a/deployment/nkl-namespace.yaml b/deployment/nkl-namespace.yaml new file mode 100644 index 0000000..5ebfb6f --- /dev/null +++ b/deployment/nkl-namespace.yaml @@ -0,0 +1,6 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: nkl + labels: + name: nkl diff --git a/internal/config/settings.go b/internal/config/settings.go index fde0539..a35c045 100644 --- a/internal/config/settings.go +++ b/internal/config/settings.go @@ -5,21 +5,131 @@ package config import ( - "errors" - "os" + "context" + "fmt" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "strings" +) + +const ( + ConfigMapsNamespace = "nkl" + ResyncPeriod = 0 ) type Settings struct { - NginxPlusHost string + ctx context.Context + NginxPlusHosts []string + k8sClient *kubernetes.Clientset + informer cache.SharedInformer + eventHandlerRegistration cache.ResourceEventHandlerRegistration } -func NewSettings() (*Settings, error) { +func NewSettings(ctx context.Context, k8sClient *kubernetes.Clientset) (*Settings, error) { config := new(Settings) - config.NginxPlusHost = os.Getenv("NGINX_PLUS_HOST") - if config.NginxPlusHost == "" { - return nil, errors.New("the NGINX_PLUS_HOST variable is not defined. This is required") - } + config.k8sClient = k8sClient + config.ctx = ctx return config, nil } + +func (s *Settings) Initialize() error { + logrus.Info("Settings::Initialize") + + var err error + + informer, err := s.buildInformer() + if err != nil { + return fmt.Errorf(`error occurred building ConfigMap informer: %w`, err) + } + + s.informer = informer + + err = s.initializeEventListeners() + if err != nil { + return fmt.Errorf(`error occurred initializing event listeners: %w`, err) + } + + return nil +} + +func (s *Settings) Run() { + logrus.Debug("Settings::Run") + + defer utilruntime.HandleCrash() + + go s.informer.Run(s.ctx.Done()) + + <-s.ctx.Done() +} + +func (s *Settings) buildInformer() (cache.SharedInformer, error) { + options := informers.WithNamespace(ConfigMapsNamespace) + factory := informers.NewSharedInformerFactoryWithOptions(s.k8sClient, ResyncPeriod, options) + informer := factory.Core().V1().ConfigMaps().Informer() + + return informer, nil +} + +func (s *Settings) initializeEventListeners() error { + logrus.Debug("Settings::initializeEventListeners") + + var err error + + handlers := cache.ResourceEventHandlerFuncs{ + AddFunc: s.handleAddEvent, + UpdateFunc: s.handleUpdateEvent, + DeleteFunc: s.handleDeleteEvent, + } + + s.eventHandlerRegistration, err = s.informer.AddEventHandler(handlers) + if err != nil { + return fmt.Errorf(`error occurred registering event handlers: %w`, err) + } + + return nil +} + +func (s *Settings) handleAddEvent(obj interface{}) { + logrus.Debug("Settings::handleAddEvent") + + s.handleUpdateEvent(obj, nil) +} + +func (s *Settings) handleDeleteEvent(_ interface{}) { + logrus.Debug("Settings::handleDeleteEvent") + + s.updateHosts([]string{}) +} + +func (s *Settings) handleUpdateEvent(obj interface{}, _ interface{}) { + logrus.Debug("Settings::handleUpdateEvent") + + configMap, ok := obj.(*corev1.ConfigMap) + if !ok { + logrus.Errorf("Settings::handleUpdateEvent: could not convert obj to ConfigMap") + return + } + + hosts, found := configMap.Data["nginx-hosts"] + if !found { + logrus.Errorf("Settings::handleUpdateEvent: nginx-hosts key not found in ConfigMap") + return + } + + newHosts := s.parseHosts(hosts) + s.updateHosts(newHosts) +} + +func (s *Settings) parseHosts(hosts string) []string { + return strings.Split(hosts, ",") +} + +func (s *Settings) updateHosts(hosts []string) { + s.NginxPlusHosts = hosts +} diff --git a/internal/core/events.go b/internal/core/events.go index 0da44e8..02ece51 100644 --- a/internal/core/events.go +++ b/internal/core/events.go @@ -21,6 +21,8 @@ type Event struct { } type ServerUpdateEvent struct { + Id string + NginxHost string Type EventType UpstreamName string Servers []nginxClient.StreamUpstreamServer @@ -45,6 +47,16 @@ func NewServerUpdateEvent(eventType EventType, upstreamName string, servers []ng } } +func ServerUpdateEventWithIdAndHost(event *ServerUpdateEvent, id string, nginxHost string) *ServerUpdateEvent { + return &ServerUpdateEvent{ + Id: id, + NginxHost: nginxHost, + Type: event.Type, + UpstreamName: event.UpstreamName, + Servers: event.Servers, + } +} + func (e *ServerUpdateEvent) TypeName() string { switch e.Type { case Created: diff --git a/internal/observation/watcher.go b/internal/observation/watcher.go index ad682dc..c9019ee 100644 --- a/internal/observation/watcher.go +++ b/internal/observation/watcher.go @@ -14,7 +14,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "time" ) @@ -30,9 +29,10 @@ type Watcher struct { informer cache.SharedIndexInformer } -func NewWatcher(ctx context.Context, handler *Handler) (*Watcher, error) { +func NewWatcher(ctx context.Context, handler *Handler, k8sClient *kubernetes.Clientset) (*Watcher, error) { return &Watcher{ ctx: ctx, + client: k8sClient, handler: handler, }, nil } @@ -41,11 +41,6 @@ func (w *Watcher) Initialize() error { logrus.Debug("Watcher::Initialize") var err error - w.client, err = w.buildKubernetesClient() - if err != nil { - return fmt.Errorf(`initalization error: %w`, err) - } - w.informer, err = w.buildInformer() if err != nil { return fmt.Errorf(`initialization error: %w`, err) @@ -129,23 +124,6 @@ func (w *Watcher) buildInformer() (cache.SharedIndexInformer, error) { return informer, nil } -func (w *Watcher) buildKubernetesClient() (*kubernetes.Clientset, error) { - logrus.Debug("Watcher::buildKubernetesClient") - k8sConfig, err := rest.InClusterConfig() - if err == rest.ErrNotInCluster { - return nil, fmt.Errorf(`not running in a Cluster: %w`, err) - } else if err != nil { - return nil, fmt.Errorf(`error occurred getting the Cluster config: %w`, err) - } - - client, err := kubernetes.NewForConfig(k8sConfig) - if err != nil { - return nil, fmt.Errorf(`error occurred creating a client: %w`, err) - } - - return client, nil -} - func (w *Watcher) initializeEventListeners() error { logrus.Debug("Watcher::initializeEventListeners") var err error @@ -186,7 +164,8 @@ func (w *Watcher) retrieveNodeIps() ([]string, error) { } } - logrus.Infof("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds()) + logrus.Debugf("Watcher::retrieveNodeIps duration: %d", time.Since(started).Nanoseconds()) + return nodeIps, nil } diff --git a/internal/synchronization/rand.go b/internal/synchronization/rand.go new file mode 100644 index 0000000..3733090 --- /dev/null +++ b/internal/synchronization/rand.go @@ -0,0 +1,31 @@ +// Copyright 2023 f5 Inc. All rights reserved. +// Use of this source code is governed by the Apache +// license that can be found in the LICENSE file. + +package synchronization + +import ( + "math/rand" + "time" +) + +var charset = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") +var number = []byte("0123456789") +var alphaNumeric = append(charset, number...) + +// RandomString where n is the length of random string we want to generate +func RandomString(n int) string { + b := make([]byte, n) + for i := range b { + // randomly select 1 character from given charset + b[i] = alphaNumeric[rand.Intn(len(alphaNumeric))] + } + return string(b) +} + +func RandomMilliseconds(min, max int) time.Duration { + randomizer := rand.New(rand.NewSource(time.Now().UnixNano())) + random := randomizer.Intn(max-min) + min + + return time.Millisecond * time.Duration(random) +} diff --git a/internal/synchronization/synchronizer.go b/internal/synchronization/synchronizer.go index b059ac5..54a6855 100644 --- a/internal/synchronization/synchronizer.go +++ b/internal/synchronization/synchronizer.go @@ -16,53 +16,58 @@ import ( "time" ) -const RateLimiterBase = time.Second * 2 -const RateLimiterMax = time.Second * 60 -const RetryCount = 5 -const Threads = 1 -const SynchronizerQueueName = `nkl-synchronizer` +const ( + // MaxMillisecondsJitter and MinMillisecondsJitter are used to randomize the rate limiter, + // creating headroom for calls to the NGINX edge hosts. + MaxMillisecondsJitter = 750 + MinMillisecondsJitter = 250 + + RateLimiterBase = time.Second * 2 + RateLimiterMax = time.Second * 60 + RetryCount = 5 + Threads = 1 + SynchronizerQueueName = `nkl-synchronizer` +) type Synchronizer struct { - NginxPlusClient *nginxClient.NginxClient - eventQueue workqueue.RateLimitingInterface + eventQueue workqueue.RateLimitingInterface + settings *config.Settings } -func NewSynchronizer() (*Synchronizer, error) { +func NewSynchronizer(settings *config.Settings) (*Synchronizer, error) { synchronizer := Synchronizer{} - + synchronizer.settings = settings return &synchronizer, nil } func (s *Synchronizer) AddEvents(events core.ServerUpdateEvents) { logrus.Debugf(`Synchronizer::AddEvents adding %d events`, len(events)) - // TODO: Add fan-out for multiple NginxClients - for _, event := range events { + if len(s.settings.NginxPlusHosts) == 0 { + logrus.Warnf(`No Nginx Plus hosts were specified. Skipping synchronization.`) + return + } + + updatedEvents := s.fanOutEventToHosts(events) + + for _, event := range updatedEvents { s.AddEvent(event) } } func (s *Synchronizer) AddEvent(event *core.ServerUpdateEvent) { logrus.Debugf(`Synchronizer::AddEvent: %#v`, event) - s.eventQueue.AddRateLimited(event) -} -func (s *Synchronizer) Initialize() error { - var err error - settings, err := config.NewSettings() - if err != nil { - return fmt.Errorf(`error loading configuration: %v`, err) + if event.NginxHost == `` { + logrus.Warnf(`Nginx host was not specified. Skipping synchronization.`) + return } - httpClient, err := communication.NewHttpClient() - if err != nil { - return fmt.Errorf(`error creating HTTP client: %v`, err) - } + s.eventQueue.AddAfter(event, RandomMilliseconds(MinMillisecondsJitter, MaxMillisecondsJitter)) +} - s.NginxPlusClient, err = nginxClient.NewNginxClient(httpClient, settings.NginxPlusHost) - if err != nil { - return fmt.Errorf(`error creating Nginx Plus client: %v`, err) - } +func (s *Synchronizer) Initialize() error { + logrus.Debug(`Synchronizer::Initialize`) rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(RateLimiterBase, RateLimiterMax) s.eventQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, SynchronizerQueueName) @@ -85,28 +90,99 @@ func (s *Synchronizer) ShutDown() { s.eventQueue.ShutDownWithDrain() } -func (s *Synchronizer) handleEvent(serverUpdateEvent *core.ServerUpdateEvent) error { - logrus.Debugf(`Synchronizer::handleEvent: %#v`, serverUpdateEvent) +func (s *Synchronizer) buildNginxPlusClient(nginxHost string) (*nginxClient.NginxClient, error) { + logrus.Debugf(`Synchronizer::buildNginxPlusClient for host: %s`, nginxHost) + + var err error + + httpClient, err := communication.NewHttpClient() + if err != nil { + return nil, fmt.Errorf(`error creating HTTP client: %v`, err) + } + + client, err := nginxClient.NewNginxClient(httpClient, nginxHost) + if err != nil { + return nil, fmt.Errorf(`error creating Nginx Plus client: %v`, err) + } + + return client, nil +} + +func (s *Synchronizer) fanOutEventToHosts(event core.ServerUpdateEvents) core.ServerUpdateEvents { + logrus.Debugf(`Synchronizer::fanOutEventToHosts: %#v`, event) + + var events core.ServerUpdateEvents + + for hidx, host := range s.settings.NginxPlusHosts { + for eidx, event := range event { + id := fmt.Sprintf(`[%d:%d]-[%s]-[%s]-[%s]`, hidx, eidx, RandomString(12), event.UpstreamName, host) + updatedEvent := core.ServerUpdateEventWithIdAndHost(event, id, host) + + events = append(events, updatedEvent) + } + } + + return events +} + +func (s *Synchronizer) handleEvent(event *core.ServerUpdateEvent) error { + logrus.Debugf(`Synchronizer::handleEvent: Id: %s`, event.Id) - switch serverUpdateEvent.Type { + var err error + + switch event.Type { case core.Created: fallthrough + case core.Updated: - _, _, _, err := s.NginxPlusClient.UpdateStreamServers(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers) - if err != nil { - return fmt.Errorf(`error occurred updating the nginx+ upstream servers: %w`, err) - } + err = s.handleCreatedUpdatedEvent(event) + case core.Deleted: - // NOTE: Deleted events include a single server in the array - err := s.NginxPlusClient.DeleteStreamServer(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers[0].Server) - if err != nil { - return fmt.Errorf(`error occurred deleting the nginx+ upstream server: %w`, err) - } + err = s.handleDeletedEvent(event) + default: - logrus.Warnf(`Synchronizer::handleEvent: unknown event type: %d`, serverUpdateEvent.Type) + logrus.Warnf(`Synchronizer::handleEvent: unknown event type: %d`, event.Type) + } + + if err == nil { + logrus.Infof(`Synchronizer::handleEvent: successfully %s the nginx+ host(s) for Upstream: %s: Id(%s)`, event.TypeName(), event.UpstreamName, event.Id) + } + + return err +} + +func (s *Synchronizer) handleCreatedUpdatedEvent(serverUpdateEvent *core.ServerUpdateEvent) error { + logrus.Debugf(`Synchronizer::handleCreatedUpdatedEvent: Id: %s`, serverUpdateEvent.Id) + + var err error + + client, err := s.buildNginxPlusClient(serverUpdateEvent.NginxHost) + if err != nil { + return fmt.Errorf(`error occurred building the nginx+ client: %w`, err) } - logrus.Infof(`Synchronizer::handleEvent: successfully %s the nginx+ hosts for Ingress: "%s"`, serverUpdateEvent.TypeName(), serverUpdateEvent.UpstreamName) + _, _, _, err = client.UpdateStreamServers(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers) + if err != nil { + return fmt.Errorf(`error occurred updating the nginx+ upstream servers: %w`, err) + } + + return nil +} + +func (s *Synchronizer) handleDeletedEvent(serverUpdateEvent *core.ServerUpdateEvent) error { + logrus.Debugf(`Synchronizer::handleDeletedEvent: Id: %s`, serverUpdateEvent.Id) + + var err error + + client, err := s.buildNginxPlusClient(serverUpdateEvent.NginxHost) + if err != nil { + return fmt.Errorf(`error occurred building the nginx+ client: %w`, err) + } + + err = client.DeleteStreamServer(serverUpdateEvent.UpstreamName, serverUpdateEvent.Servers[0].Server) + if err != nil { + return fmt.Errorf(`error occurred deleting the nginx+ upstream server: %w`, err) + } return nil } @@ -130,7 +206,6 @@ func (s *Synchronizer) handleNextEvent() bool { func (s *Synchronizer) worker() { logrus.Debug(`Synchronizer::worker`) for s.handleNextEvent() { - // TODO: Add Telemetry } } @@ -140,10 +215,12 @@ func (s *Synchronizer) withRetry(err error, event *core.ServerUpdateEvent) { // TODO: Add Telemetry if s.eventQueue.NumRequeues(event) < RetryCount { // TODO: Make this configurable s.eventQueue.AddRateLimited(event) - logrus.Infof(`Synchronizer::withRetry: requeued event: %#v; error: %v`, event, err) + logrus.Infof(`Synchronizer::withRetry: requeued event: %s; error: %v`, event.Id, err) } else { s.eventQueue.Forget(event) logrus.Warnf(`Synchronizer::withRetry: event %#v has been dropped due to too many retries`, event) } + } else { + s.eventQueue.Forget(event) } // TODO: Add error logging } diff --git a/k8s/RBAC/ClusterRole.yaml b/k8s/RBAC/ClusterRole.yaml index 6e58070..79036d1 100644 --- a/k8s/RBAC/ClusterRole.yaml +++ b/k8s/RBAC/ClusterRole.yaml @@ -5,5 +5,5 @@ metadata: rules: - apiGroups: - "" - resources: ["services", "nodes"] + resources: ["services", "nodes", "configmaps"] verbs: ["get", "watch", "list"]