From 28879f69ff4c306c8bc781c117856488803b92d6 Mon Sep 17 00:00:00 2001 From: Steve Wagner Date: Tue, 11 Apr 2023 15:20:08 -0700 Subject: [PATCH] Update code documentation (#42) --- internal/application/application_constants.go | 19 +++- internal/application/border_client.go | 13 ++- internal/application/doc.go | 22 ++++- internal/application/http_border_client.go | 6 ++ .../application/nginx_client_interface.go | 7 ++ internal/application/null_border_client.go | 5 + internal/application/tcp_border_client.go | 4 + internal/communication/client.go | 7 ++ internal/communication/roundtripper.go | 4 +- internal/configuration/doc.go | 2 +- internal/configuration/settings.go | 99 +++++++++++++++---- internal/core/doc.go | 10 ++ internal/core/event.go | 24 ++++- internal/core/server_update_event.go | 30 +++++- internal/core/upstream_server.go | 6 ++ internal/observation/handler.go | 29 +++++- internal/observation/watcher.go | 31 +++++- internal/probation/check.go | 7 ++ internal/probation/server.go | 32 +++++- internal/synchronization/rand.go | 6 ++ internal/synchronization/synchronizer.go | 29 ++++++ internal/translation/translator.go | 8 +- 22 files changed, 354 insertions(+), 46 deletions(-) create mode 100644 internal/core/doc.go diff --git a/internal/application/application_constants.go b/internal/application/application_constants.go index fa5cd42..5f213d4 100644 --- a/internal/application/application_constants.go +++ b/internal/application/application_constants.go @@ -5,7 +5,24 @@ package application +// These constants are intended for use in the Annotations field of the Service definition. +// They determine which Border Server client will be used. +// To use these values, add the following annotation to the Service definition: +// +// annotations: +// nginxinc.io/nkl-: +// +// where is the name of the upstream in the NGINX Plus configuration and is one of the constants below. +// +// Note, this is an extensibility point. To add a Border Server client... +// 1. Create a module that implements the BorderClient interface; +// 2. Add a new constant to this group that acts as a key for selecting the client; +// 3. Update the NewBorderClient factory method in border_client.go that returns the client; const ( - ClientTypeTcp = "tcp" + + // ClientTypeTcp creates a TcpBorderClient that uses the Stream* methods of the NGINX Plus client. + ClientTypeTcp = "tcp" + + // ClientTypeHttp creates an HttpBorderClient that uses the HTTP* methods of the NGINX Plus client. ClientTypeHttp = "http" ) diff --git a/internal/application/border_client.go b/internal/application/border_client.go index ed971b2..bae7eab 100644 --- a/internal/application/border_client.go +++ b/internal/application/border_client.go @@ -11,23 +11,30 @@ import ( "github.com/sirupsen/logrus" ) +// Interface defines the functions required to implement a Border Client. type Interface interface { Update(*core.ServerUpdateEvent) error Delete(*core.ServerUpdateEvent) error } +// BorderClient defines any state need by the Border Client. type BorderClient struct { } -// NewBorderClient Returns a NullBorderClient if the type is unknown, this avoids panics due to nil pointer dereferences. +// NewBorderClient is the Factory function for creating a Border Client. +// +// Note, this is an extensibility point. To add a Border Server client... +// 1. Create a module that implements the BorderClient interface; +// 2. Add a new constant in application_constants.go that acts as a key for selecting the client; +// 3. Update the NewBorderClient factory method in border_client.go that returns the client; func NewBorderClient(clientType string, borderClient interface{}) (Interface, error) { logrus.Debugf(`NewBorderClient for type: %s`, clientType) switch clientType { - case "tcp": + case ClientTypeTcp: return NewTcpBorderClient(borderClient) - case "http": + case ClientTypeHttp: return NewHttpBorderClient(borderClient) default: diff --git a/internal/application/doc.go b/internal/application/doc.go index 3254255..0395d6b 100644 --- a/internal/application/doc.go +++ b/internal/application/doc.go @@ -6,14 +6,28 @@ /* Package application includes support for applying updates to the Border servers. -"Border TcpServers" are the servers that are exposed to the outside world and direct traffic into the cluster. -At this time the only supported Border TcpServers are NGINX Plus servers. The BorderClient module defines -an interface that can be implemented to support other Border Server types. +"Border Servers" are servers that are exposed to the outside world and direct traffic into the cluster. +The BorderClient module defines an interface that can be implemented to support other Border Server types. +To add a Border Server client... +1. Create a module that implements the BorderClient interface; +2. Add a new constant in application_constants.go that acts as a key for selecting the client; +3. Update the NewBorderClient factory method in border_client.go that returns the client; + +At this time the only supported Border Servers are NGINX Plus servers. + +The two Border Server clients for NGINX Plus are: - HttpBorderClient: updates NGINX Plus servers using HTTP Upstream methods on the NGINX Plus API. - TcpBorderClient: updates NGINX Plus servers using Stream Upstream methods on the NGINX Plus API. -Selection of the appropriate client is based on the Annotations present on the NodePort Service definition. +Both of these implementations use the NGINX Plus client module to communicate with the NGINX Plus server. + +Selection of the appropriate client is based on the Annotations present on the Service definition, e.g.: + + annotations: + nginxinc.io/nkl-: + +where is the name of the upstream in the NGINX Plus configuration and is one of the constants in application_constants.go. */ package application diff --git a/internal/application/http_border_client.go b/internal/application/http_border_client.go index dc7d00a..a306141 100644 --- a/internal/application/http_border_client.go +++ b/internal/application/http_border_client.go @@ -11,11 +11,13 @@ import ( nginxClient "github.com/nginxinc/nginx-plus-go-client/client" ) +// HttpBorderClient implements the BorderClient interface for HTTP upstreams. type HttpBorderClient struct { BorderClient nginxClient NginxClientInterface } +// NewHttpBorderClient is the Factory function for creating an HttpBorderClient. func NewHttpBorderClient(client interface{}) (Interface, error) { ngxClient, ok := client.(NginxClientInterface) if !ok { @@ -27,6 +29,7 @@ func NewHttpBorderClient(client interface{}) (Interface, error) { }, nil } +// Update manages the Upstream servers for the Upstream Name given in the ServerUpdateEvent. func (hbc *HttpBorderClient) Update(event *core.ServerUpdateEvent) error { httpUpstreamServers := asNginxHttpUpstreamServers(event.UpstreamServers) _, _, _, err := hbc.nginxClient.UpdateHTTPServers(event.UpstreamName, httpUpstreamServers) @@ -37,6 +40,7 @@ func (hbc *HttpBorderClient) Update(event *core.ServerUpdateEvent) error { return nil } +// Delete deletes the Upstream server for the Upstream Name given in the ServerUpdateEvent. func (hbc *HttpBorderClient) Delete(event *core.ServerUpdateEvent) error { err := hbc.nginxClient.DeleteHTTPServer(event.UpstreamName, event.UpstreamServers[0].Host) if err != nil { @@ -46,12 +50,14 @@ func (hbc *HttpBorderClient) Delete(event *core.ServerUpdateEvent) error { return nil } +// asNginxHttpUpstreamServer converts a core.UpstreamServer to a nginxClient.UpstreamServer. func asNginxHttpUpstreamServer(server *core.UpstreamServer) nginxClient.UpstreamServer { return nginxClient.UpstreamServer{ Server: server.Host, } } +// asNginxHttpUpstreamServers converts a core.UpstreamServers to a []nginxClient.UpstreamServer. func asNginxHttpUpstreamServers(servers core.UpstreamServers) []nginxClient.UpstreamServer { var upstreamServers []nginxClient.UpstreamServer diff --git a/internal/application/nginx_client_interface.go b/internal/application/nginx_client_interface.go index 5ca986d..516d6e1 100644 --- a/internal/application/nginx_client_interface.go +++ b/internal/application/nginx_client_interface.go @@ -7,10 +7,17 @@ package application import nginxClient "github.com/nginxinc/nginx-plus-go-client/client" +// NginxClientInterface defines the functions used on the NGINX Plus client, abstracting away the full details of that client. type NginxClientInterface interface { + // DeleteStreamServer is used by the TcpBorderClient. DeleteStreamServer(upstream string, server string) error + + // UpdateStreamServers is used by the TcpBorderClient. UpdateStreamServers(upstream string, servers []nginxClient.StreamUpstreamServer) ([]nginxClient.StreamUpstreamServer, []nginxClient.StreamUpstreamServer, []nginxClient.StreamUpstreamServer, error) + // DeleteHTTPServer is used by the HttpBorderClient. DeleteHTTPServer(upstream string, server string) error + + // UpdateHTTPServers is used by the HttpBorderClient. UpdateHTTPServers(upstream string, servers []nginxClient.UpstreamServer) ([]nginxClient.UpstreamServer, []nginxClient.UpstreamServer, []nginxClient.UpstreamServer, error) } diff --git a/internal/application/null_border_client.go b/internal/application/null_border_client.go index f118068..8370fe0 100644 --- a/internal/application/null_border_client.go +++ b/internal/application/null_border_client.go @@ -10,18 +10,23 @@ import ( "github.com/sirupsen/logrus" ) +// NullBorderClient is a BorderClient that does nothing. +// / It serves only to prevent a panic if the BorderClient is not set correctly and errors from the factory methods are ignored. type NullBorderClient struct { } +// NewNullBorderClient is the Factory function for creating a NullBorderClient func NewNullBorderClient() (Interface, error) { return &NullBorderClient{}, nil } +// Update logs a Warning. It is, after all, a NullObject Pattern implementation. func (nbc *NullBorderClient) Update(_ *core.ServerUpdateEvent) error { logrus.Warn("NullBorderClient.Update called") return nil } +// Delete logs a Warning. It is, after all, a NullObject Pattern implementation. func (nbc *NullBorderClient) Delete(_ *core.ServerUpdateEvent) error { logrus.Warn("NullBorderClient.Delete called") return nil diff --git a/internal/application/tcp_border_client.go b/internal/application/tcp_border_client.go index bef0bc6..822110b 100644 --- a/internal/application/tcp_border_client.go +++ b/internal/application/tcp_border_client.go @@ -11,11 +11,13 @@ import ( nginxClient "github.com/nginxinc/nginx-plus-go-client/client" ) +// TcpBorderClient implements the BorderClient interface for TCP upstreams. type TcpBorderClient struct { BorderClient nginxClient NginxClientInterface } +// NewTcpBorderClient is the Factory function for creating an TcpBorderClient. func NewTcpBorderClient(client interface{}) (Interface, error) { ngxClient, ok := client.(NginxClientInterface) if !ok { @@ -27,6 +29,7 @@ func NewTcpBorderClient(client interface{}) (Interface, error) { }, nil } +// Update manages the Upstream servers for the Upstream Name given in the ServerUpdateEvent. func (tbc *TcpBorderClient) Update(event *core.ServerUpdateEvent) error { streamUpstreamServers := asNginxStreamUpstreamServers(event.UpstreamServers) _, _, _, err := tbc.nginxClient.UpdateStreamServers(event.UpstreamName, streamUpstreamServers) @@ -37,6 +40,7 @@ func (tbc *TcpBorderClient) Update(event *core.ServerUpdateEvent) error { return nil } +// Delete deletes the Upstream server for the Upstream Name given in the ServerUpdateEvent. func (tbc *TcpBorderClient) Delete(event *core.ServerUpdateEvent) error { err := tbc.nginxClient.DeleteStreamServer(event.UpstreamName, event.UpstreamServers[0].Host) if err != nil { diff --git a/internal/communication/client.go b/internal/communication/client.go index 09788fb..fb7d80d 100644 --- a/internal/communication/client.go +++ b/internal/communication/client.go @@ -11,6 +11,9 @@ import ( "time" ) +// NewHttpClient is a factory method to create a new Http Client with a default configuration. +// RoundTripper is a wrapper around the default net/communication Transport to add additional headers, in this case, +// the Headers are configured for JSON. func NewHttpClient() (*netHttp.Client, error) { headers := NewHeaders() tlsConfig := NewTlsConfig() @@ -25,6 +28,7 @@ func NewHttpClient() (*netHttp.Client, error) { }, nil } +// NewHeaders is a factory method to create a new basic Http Headers slice. func NewHeaders() []string { return []string{ "Content-Type: application/json", @@ -32,10 +36,13 @@ func NewHeaders() []string { } } +// NewTlsConfig is a factory method to create a new basic Tls Config. +// More attention should be given to the use of `InsecureSkipVerify: true`, as it is not recommended for production use. func NewTlsConfig() *tls.Config { return &tls.Config{InsecureSkipVerify: true} } +// NewTransport is a factory method to create a new basic Http Transport. func NewTransport(config *tls.Config) *netHttp.Transport { transport := netHttp.DefaultTransport.(*netHttp.Transport) transport.TLSClientConfig = config diff --git a/internal/communication/roundtripper.go b/internal/communication/roundtripper.go index 60affa3..3781c62 100644 --- a/internal/communication/roundtripper.go +++ b/internal/communication/roundtripper.go @@ -11,11 +11,13 @@ import ( "strings" ) +// RoundTripper is a simple type that wraps the default net/communication RoundTripper to add additional headers. type RoundTripper struct { Headers []string RoundTripper http.RoundTripper } +// NewRoundTripper is a factory method to create a new RoundTripper. func NewRoundTripper(headers []string, transport *netHttp.Transport) *RoundTripper { return &RoundTripper{ Headers: headers, @@ -23,7 +25,7 @@ func NewRoundTripper(headers []string, transport *netHttp.Transport) *RoundTripp } } -// RoundTrip Merge Headers +// RoundTrip This simply adds our default headers to the request before passing it on to the default RoundTripper. func (roundTripper *RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { newRequest := new(http.Request) *newRequest = *req diff --git a/internal/configuration/doc.go b/internal/configuration/doc.go index 5998528..28af35e 100644 --- a/internal/configuration/doc.go +++ b/internal/configuration/doc.go @@ -4,7 +4,7 @@ */ /* -Package config, application configuration. +Package config includes application configuration. */ package configuration diff --git a/internal/configuration/settings.go b/internal/configuration/settings.go index c394232..cb0a36b 100644 --- a/internal/configuration/settings.go +++ b/internal/configuration/settings.go @@ -19,49 +19,111 @@ import ( ) const ( - ConfigMapsNamespace = "nkl" - ResyncPeriod = 0 - NklPrefix = ConfigMapsNamespace + "-" + // ConfigMapsNamespace is the value used to filter the ConfigMaps Resource in the Informer. + ConfigMapsNamespace = "nkl" + + // ResyncPeriod is the value used to set the resync period for the Informer. + ResyncPeriod = 0 + + // NklPrefix is used to determine if a Port definition should be handled and used to update a Border Server. + // The Port name () must start with this prefix, e.g.: + // nkl- + NklPrefix = ConfigMapsNamespace + "-" + + // PortAnnotationPrefix defines the prefix used when looking up a Port in the Service Annotations. + // The value of the annotation determines which BorderServer implementation will be used. + // See the documentation in the `application/application_constants.go` file for details. PortAnnotationPrefix = "nginxinc.io" ) +// WorkQueueSettings contains the configuration values needed by the Work Queues. +// There are two work queues in the application: +// 1. nkl-handler queue, used to move messages between the Watcher and the Handler. +// 2. nkl-synchronizer queue, used to move message between the Handler and the Synchronizer. +// The queues are NamedDelayingQueue objects that use an ItemExponentialFailureRateLimiter as the underlying rate limiter. type WorkQueueSettings struct { - Name string + // Name is the name of the queue. + Name string + + // RateLimiterBase is the value used to calculate the exponential backoff rate limiter. + // The formula is: RateLimiterBase * 2 ^ (num_retries - 1) RateLimiterBase time.Duration - RateLimiterMax time.Duration + + // RateLimiterMax limits the amount of time retries are allowed to be attempted. + RateLimiterMax time.Duration } +// HandlerSettings contains the configuration values needed by the Handler. type HandlerSettings struct { - RetryCount int - Threads int + + // RetryCount is the number of times the Handler will attempt to process a message before giving up. + RetryCount int + + // Threads is the number of threads that will be used to process messages. + Threads int + + // WorkQueueSettings is the configuration for the Handler's queue. WorkQueueSettings WorkQueueSettings } +// WatcherSettings contains the configuration values needed by the Watcher. type WatcherSettings struct { + + // NginxIngressNamespace is the namespace used to filter Services in the Watcher. NginxIngressNamespace string - ResyncPeriod time.Duration + + // ResyncPeriod is the value used to set the resync period for the underlying SharedInformer. + ResyncPeriod time.Duration } +// SynchronizerSettings contains the configuration values needed by the Synchronizer. type SynchronizerSettings struct { + + // MaxMillisecondsJitter is the maximum number of milliseconds that will be applied when adding an event to the queue. MaxMillisecondsJitter int + + // MinMillisecondsJitter is the minimum number of milliseconds that will be applied when adding an event to the queue. MinMillisecondsJitter int - RetryCount int - Threads int - WorkQueueSettings WorkQueueSettings + + // RetryCount is the number of times the Synchronizer will attempt to process a message before giving up. + RetryCount int + + // Threads is the number of threads that will be used to process messages. + Threads int + + // WorkQueueSettings is the configuration for the Synchronizer's queue. + WorkQueueSettings WorkQueueSettings } +// Settings contains the configuration values needed by the application. type Settings struct { - Context context.Context - NginxPlusHosts []string - K8sClient *kubernetes.Clientset - informer cache.SharedInformer + + // Context is the context used to control the application. + Context context.Context + + // NginxPlusHosts is a list of Nginx Plus hosts that will be used to update the Border Servers. + NginxPlusHosts []string + + // K8sClient is the Kubernetes client used to communicate with the Kubernetes API. + K8sClient *kubernetes.Clientset + + // informer is the SharedInformer used to watch for changes to the ConfigMap . + informer cache.SharedInformer + + // eventHandlerRegistration is the object used to track the event handlers with the SharedInformer. eventHandlerRegistration cache.ResourceEventHandlerRegistration - Handler HandlerSettings + // Handler contains the configuration values needed by the Handler. + Handler HandlerSettings + + // Synchronizer contains the configuration values needed by the Synchronizer. Synchronizer SynchronizerSettings - Watcher WatcherSettings + + // Watcher contains the configuration values needed by the Watcher. + Watcher WatcherSettings } +// NewSettings creates a new Settings object with default values. func NewSettings(ctx context.Context, k8sClient *kubernetes.Clientset) (*Settings, error) { settings := &Settings{ Context: ctx, @@ -95,6 +157,8 @@ func NewSettings(ctx context.Context, k8sClient *kubernetes.Clientset) (*Setting return settings, nil } +// Initialize initializes the Settings object. Sets up a SharedInformer to watch for changes to the ConfigMap. +// This method must be called before the Run method. func (s *Settings) Initialize() error { logrus.Info("Settings::Initialize") @@ -115,6 +179,7 @@ func (s *Settings) Initialize() error { return nil } +// Run starts the SharedInformer and waits for the Context to be cancelled. func (s *Settings) Run() { logrus.Debug("Settings::Run") diff --git a/internal/core/doc.go b/internal/core/doc.go new file mode 100644 index 0000000..8fa9967 --- /dev/null +++ b/internal/core/doc.go @@ -0,0 +1,10 @@ +/* + * Copyright (c) 2023 F5 Inc. All rights reserved. + * Use of this source code is governed by the Apache License that can be found in the LICENSE file. + */ + +/* +Package core includes core application structures and logic. +*/ + +package core diff --git a/internal/core/event.go b/internal/core/event.go index b5609b1..09776c9 100644 --- a/internal/core/event.go +++ b/internal/core/event.go @@ -9,19 +9,37 @@ import v1 "k8s.io/api/core/v1" type EventType int +// Event types const ( + + // Created Represents the event type when a service is created Created EventType = iota + + // Updated Represents the event type when a service is updated Updated + + // Deleted Represents the event type when a service is deleted Deleted ) +// Event represents a service event type Event struct { - Type EventType - Service *v1.Service + + // Type represents the event type, one of the constant values defined above. + Type EventType + + // Service represents the service object in its current state + Service *v1.Service + + // PreviousService represents the service object in its previous state PreviousService *v1.Service - NodeIps []string + + // NodeIps represents the list of node IPs in the Cluster. This is populated by the Watcher when an event is created. + // The Node IPs are needed by the BorderClient. + NodeIps []string } +// NewEvent factory method to create a new Event func NewEvent(eventType EventType, service *v1.Service, previousService *v1.Service, nodeIps []string) Event { return Event{ Type: eventType, diff --git a/internal/core/server_update_event.go b/internal/core/server_update_event.go index dd02127..f3961ea 100644 --- a/internal/core/server_update_event.go +++ b/internal/core/server_update_event.go @@ -5,17 +5,35 @@ package core +// ServerUpdateEvent is an internal representation of an event. The Translator produces these events +// from Events received from the Handler. These are then consumed by the Synchronizer and passed along to +// the appropriate BorderClient. type ServerUpdateEvent struct { - ClientType string - Id string - NginxHost string - Type EventType - UpstreamName string + + // ClientType is the type of BorderClient that should handle this event. This is configured via Service Annotations. + // See application_constants.go for the list of supported types. + ClientType string + + // Id is the unique identifier for this event. + Id string + + // NginxHost is the host name of the NGINX Plus instance that should handle this event. + NginxHost string + + // Type is the type of event. See EventType for the list of supported types. + Type EventType + + // UpstreamName is the name of the upstream in the Border Server. + UpstreamName string + + // UpstreamServers is the list of servers in the Upstream. UpstreamServers UpstreamServers } +// ServerUpdateEvents is a list of ServerUpdateEvent. type ServerUpdateEvents = []*ServerUpdateEvent +// NewServerUpdateEvent creates a new ServerUpdateEvent. func NewServerUpdateEvent(eventType EventType, upstreamName string, clientType string, upstreamServers UpstreamServers) *ServerUpdateEvent { return &ServerUpdateEvent{ ClientType: clientType, @@ -25,6 +43,7 @@ func NewServerUpdateEvent(eventType EventType, upstreamName string, clientType s } } +// ServerUpdateEventWithIdAndHost creates a new ServerUpdateEvent with the specified Id and Host. func ServerUpdateEventWithIdAndHost(event *ServerUpdateEvent, id string, nginxHost string) *ServerUpdateEvent { return &ServerUpdateEvent{ ClientType: event.ClientType, @@ -36,6 +55,7 @@ func ServerUpdateEventWithIdAndHost(event *ServerUpdateEvent, id string, nginxHo } } +// TypeName returns the string representation of the EventType. func (e *ServerUpdateEvent) TypeName() string { switch e.Type { case Created: diff --git a/internal/core/upstream_server.go b/internal/core/upstream_server.go index 210b001..7c89b1e 100644 --- a/internal/core/upstream_server.go +++ b/internal/core/upstream_server.go @@ -5,12 +5,18 @@ package core +// UpstreamServer represents a single upstream server. This is an internal representation used to abstract the definition +// of an upstream server from any specific client. type UpstreamServer struct { + + // Host is the host name or IP address of the upstream server. Host string } +// UpstreamServers is a slice of UpstreamServer. type UpstreamServers = []*UpstreamServer +// NewUpstreamServer creates a new UpstreamServer. func NewUpstreamServer(host string) *UpstreamServer { return &UpstreamServer{ Host: host, diff --git a/internal/observation/handler.go b/internal/observation/handler.go index 16e1b63..2c3fb50 100644 --- a/internal/observation/handler.go +++ b/internal/observation/handler.go @@ -16,18 +16,36 @@ import ( "k8s.io/client-go/util/workqueue" ) +// HandlerInterface is the interface for the event handler type HandlerInterface interface { + + // AddRateLimitedEvent defines the interface for adding an event to the event queue AddRateLimitedEvent(event *core.Event) + + // Run defines the interface used to start the event handler Run(stopCh <-chan struct{}) + + // ShutDown defines the interface used to stop the event handler ShutDown() } +// Handler is responsible for processing events in the "nkl-handler" queue. +// When processing a message the Translation module is used to translate the event into an internal representation. +// The translation process may result in multiple events being generated. This fan-out mainly supports the differences +// in NGINX Plus API calls for creating/updating Upstreams and deleting Upstreams. type Handler struct { - eventQueue workqueue.RateLimitingInterface - settings *configuration.Settings + + // eventQueue is the queue used to store events + eventQueue workqueue.RateLimitingInterface + + // settings is the configuration settings + settings *configuration.Settings + + // synchronizer is the synchronizer used to synchronize the internal representation with a Border Server synchronizer synchronization.Interface } +// NewHandler creates a new event handler func NewHandler(settings *configuration.Settings, synchronizer synchronization.Interface, eventQueue workqueue.RateLimitingInterface) *Handler { return &Handler{ eventQueue: eventQueue, @@ -36,11 +54,13 @@ func NewHandler(settings *configuration.Settings, synchronizer synchronization.I } } +// AddRateLimitedEvent adds an event to the event queue func (h *Handler) AddRateLimitedEvent(event *core.Event) { logrus.Debugf(`Handler::AddRateLimitedEvent: %#v`, event) h.eventQueue.AddRateLimited(event) } +// Run starts the event handler, spins up Goroutines to process events, and waits for a stop signal func (h *Handler) Run(stopCh <-chan struct{}) { logrus.Debug("Handler::Run") @@ -51,11 +71,13 @@ func (h *Handler) Run(stopCh <-chan struct{}) { <-stopCh } +// ShutDown stops the event handler and shuts down the event queue func (h *Handler) ShutDown() { logrus.Debug("Handler::ShutDown") h.eventQueue.ShutDown() } +// handleEvent feeds translated events to the synchronizer func (h *Handler) handleEvent(e *core.Event) error { logrus.Debugf(`Handler::handleEvent: %#v`, e) // TODO: Add Telemetry @@ -70,6 +92,7 @@ func (h *Handler) handleEvent(e *core.Event) error { return nil } +// handleNextEvent pulls an event from the event queue and feeds it to the event handler with retry logic func (h *Handler) handleNextEvent() bool { logrus.Debug("Handler::handleNextEvent") evt, quit := h.eventQueue.Get() @@ -86,12 +109,14 @@ func (h *Handler) handleNextEvent() bool { return true } +// worker is the main message loop func (h *Handler) worker() { for h.handleNextEvent() { // TODO: Add Telemetry } } +// withRetry handles errors from the event handler and requeues events that fail func (h *Handler) withRetry(err error, event *core.Event) { logrus.Debug("Handler::withRetry") if err != nil { diff --git a/internal/observation/watcher.go b/internal/observation/watcher.go index 782a6e4..3ee9d3f 100644 --- a/internal/observation/watcher.go +++ b/internal/observation/watcher.go @@ -19,13 +19,25 @@ import ( "time" ) +// Watcher is responsible for watching for changes to Kubernetes resources. +// Particularly, Services in the namespace defined in the WatcherSettings::NginxIngressNamespace setting. +// When a change is detected, an Event is generated and added to the Handler's queue. type Watcher struct { + + // eventHandlerRegistration is used to track the event handlers eventHandlerRegistration interface{} - handler HandlerInterface - informer cache.SharedIndexInformer - settings *configuration.Settings + + // handler is the event handler + handler HandlerInterface + + // informer is the informer used to watch for changes to Kubernetes resources + informer cache.SharedIndexInformer + + // settings is the configuration settings + settings *configuration.Settings } +// NewWatcher creates a new Watcher func NewWatcher(settings *configuration.Settings, handler HandlerInterface) (*Watcher, error) { return &Watcher{ handler: handler, @@ -33,6 +45,7 @@ func NewWatcher(settings *configuration.Settings, handler HandlerInterface) (*Wa }, nil } +// Initialize initializes the Watcher, must be called before Watch func (w *Watcher) Initialize() error { logrus.Debug("Watcher::Initialize") var err error @@ -50,6 +63,8 @@ func (w *Watcher) Initialize() error { return nil } +// Watch starts the process of watching for changes to Kubernetes resources. +// Initialize must be called before Watch. func (w *Watcher) Watch() error { logrus.Debug("Watcher::Watch") @@ -70,6 +85,7 @@ func (w *Watcher) Watch() error { return nil } +// buildEventHandlerForAdd creates a function that is used as an event handler for the informer when Add events are raised. func (w *Watcher) buildEventHandlerForAdd() func(interface{}) { logrus.Info("Watcher::buildEventHandlerForAdd") return func(obj interface{}) { @@ -85,6 +101,7 @@ func (w *Watcher) buildEventHandlerForAdd() func(interface{}) { } } +// buildEventHandlerForDelete creates a function that is used as an event handler for the informer when Delete events are raised. func (w *Watcher) buildEventHandlerForDelete() func(interface{}) { logrus.Info("Watcher::buildEventHandlerForDelete") return func(obj interface{}) { @@ -100,6 +117,7 @@ func (w *Watcher) buildEventHandlerForDelete() func(interface{}) { } } +// buildEventHandlerForUpdate creates a function that is used as an event handler for the informer when Update events are raised. func (w *Watcher) buildEventHandlerForUpdate() func(interface{}, interface{}) { logrus.Info("Watcher::buildEventHandlerForUpdate") return func(previous, updated interface{}) { @@ -115,6 +133,7 @@ func (w *Watcher) buildEventHandlerForUpdate() func(interface{}, interface{}) { } } +// buildInformer creates the informer used to watch for changes to Kubernetes resources. func (w *Watcher) buildInformer() (cache.SharedIndexInformer, error) { logrus.Debug("Watcher::buildInformer") @@ -125,6 +144,7 @@ func (w *Watcher) buildInformer() (cache.SharedIndexInformer, error) { return informer, nil } +// initializeEventListeners initializes the event listeners for the informer. func (w *Watcher) initializeEventListeners() error { logrus.Debug("Watcher::initializeEventListeners") var err error @@ -143,6 +163,8 @@ func (w *Watcher) initializeEventListeners() error { return nil } +// notMasterNode retrieves the IP Addresses of the nodes in the cluster. Currently, the master node is excluded. This is +// because the master node may or may not be a worker node and thus may not be able to route traffic. func (w *Watcher) retrieveNodeIps() ([]string, error) { started := time.Now() logrus.Debug("Watcher::retrieveNodeIps") @@ -156,6 +178,8 @@ func (w *Watcher) retrieveNodeIps() ([]string, error) { } for _, node := range nodes.Items { + + // this is kind of a broad assumption, should probably make this a configurable option if w.notMasterNode(node) { for _, address := range node.Status.Addresses { if address.Type == v1.NodeInternalIP { @@ -170,6 +194,7 @@ func (w *Watcher) retrieveNodeIps() ([]string, error) { return nodeIps, nil } +// notMasterNode determines if the node is a master node. func (w *Watcher) notMasterNode(node v1.Node) bool { logrus.Debug("Watcher::notMasterNode") diff --git a/internal/probation/check.go b/internal/probation/check.go index 88c3959..b1fb438 100644 --- a/internal/probation/check.go +++ b/internal/probation/check.go @@ -5,27 +5,34 @@ package probation +// Check defines a single method that can be implemented for various health checks. type Check interface { Check() bool } +// LiveCheck is a check that can be used for the k8s "livez" endpoint. type LiveCheck struct { } +// ReadyCheck is a check that can be used for the k8s "readyz" endpoint. type ReadyCheck struct { } +// StartupCheck is a check that can be used for the k8s "startupz" endpoint. type StartupCheck struct { } +// Check implements the Check interface for the LiveCheck type. func (l *LiveCheck) Check() bool { return true } +// Check implements the Check interface for the ReadyCheck type. func (r *ReadyCheck) Check() bool { return true } +// Check implements the Check interface for the StartupCheck type. func (s *StartupCheck) Check() bool { return true } diff --git a/internal/probation/server.go b/internal/probation/server.go index e5ed4a9..12b1699 100644 --- a/internal/probation/server.go +++ b/internal/probation/server.go @@ -12,18 +12,34 @@ import ( ) const ( - Ok = "OK" + + // Ok is the message returned when a check passes. + Ok = "OK" + + // ServiceNotAvailable is the message returned when a check fails. ServiceNotAvailable = "Service Not Available" - ListenPort = 51031 + + // ListenPort is the port on which the health server will listen. + ListenPort = 51031 ) +// HealthServer is a server that spins up endpoints for the various k8s health checks. type HealthServer struct { - httpServer *http.Server - LiveCheck LiveCheck - ReadyCheck ReadyCheck + + // The underlying HTTP server. + httpServer *http.Server + + // Support for the "livez" endpoint. + LiveCheck LiveCheck + + // Support for the "readyz" endpoint. + ReadyCheck ReadyCheck + + // Support for the "startupz" endpoint. StartupCheck StartupCheck } +// NewHealthServer creates a new HealthServer. func NewHealthServer() *HealthServer { return &HealthServer{ LiveCheck: LiveCheck{}, @@ -32,6 +48,7 @@ func NewHealthServer() *HealthServer { } } +// Start spins up the health server. func (hs *HealthServer) Start() { logrus.Debugf("Starting probe listener on port %d", ListenPort) @@ -52,24 +69,29 @@ func (hs *HealthServer) Start() { logrus.Info("Started probe listener on", hs.httpServer.Addr) } +// Stop shuts down the health server. func (hs *HealthServer) Stop() { if err := hs.httpServer.Close(); err != nil { logrus.Errorf("unable to stop probe listener on %s: %v", hs.httpServer.Addr, err) } } +// HandleLive is the handler for the "livez" endpoint. func (hs *HealthServer) HandleLive(writer http.ResponseWriter, request *http.Request) { hs.handleProbe(writer, request, &hs.LiveCheck) } +// HandleReady is the handler for the "readyz" endpoint. func (hs *HealthServer) HandleReady(writer http.ResponseWriter, request *http.Request) { hs.handleProbe(writer, request, &hs.ReadyCheck) } +// HandleStartup is the handler for the "startupz" endpoint. func (hs *HealthServer) HandleStartup(writer http.ResponseWriter, request *http.Request) { hs.handleProbe(writer, request, &hs.StartupCheck) } +// handleProbe handles calling the appropriate Check method and writes the result to the client. func (hs *HealthServer) handleProbe(writer http.ResponseWriter, _ *http.Request, check Check) { if check.Check() { writer.WriteHeader(http.StatusOK) diff --git a/internal/synchronization/rand.go b/internal/synchronization/rand.go index 4d5903a..425b99a 100644 --- a/internal/synchronization/rand.go +++ b/internal/synchronization/rand.go @@ -10,8 +10,13 @@ import ( "time" ) +// charset contains all characters that can be used in random string generation var charset = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +// number contains all numbers that can be used in random string generation var number = []byte("0123456789") + +// alphaNumeric contains all characters and numbers that can be used in random string generation var alphaNumeric = append(charset, number...) // RandomString where n is the length of random string we want to generate @@ -24,6 +29,7 @@ func RandomString(n int) string { return string(b) } +// RandomMilliseconds returns a random duration between min and max milliseconds func RandomMilliseconds(min, max int) time.Duration { randomizer := rand.New(rand.NewSource(time.Now().UnixNano())) random := randomizer.Intn(max-min) + min diff --git a/internal/synchronization/synchronizer.go b/internal/synchronization/synchronizer.go index 2b78a3d..9eab184 100644 --- a/internal/synchronization/synchronizer.go +++ b/internal/synchronization/synchronizer.go @@ -17,18 +17,31 @@ import ( "k8s.io/client-go/util/workqueue" ) +// Interface defines the interface needed to implement a synchronizer. type Interface interface { + + // AddEvents adds a list of events to the queue. AddEvents(events core.ServerUpdateEvents) + + // AddEvent adds an event to the queue. AddEvent(event *core.ServerUpdateEvent) + + // Run starts the synchronizer. Run(stopCh <-chan struct{}) + + // ShutDown shuts down the synchronizer. ShutDown() } +// Synchronizer is responsible for synchronizing the state of the Border Servers. +// Operating against the "nkl-synchronizer", it handles events by creating a Border Client as specified in the +// Service annotation for the Upstream. see application/border_client.go and application/application_constants.go for details. type Synchronizer struct { eventQueue workqueue.RateLimitingInterface settings *configuration.Settings } +// NewSynchronizer creates a new Synchronizer. func NewSynchronizer(settings *configuration.Settings, eventQueue workqueue.RateLimitingInterface) (*Synchronizer, error) { synchronizer := Synchronizer{ eventQueue: eventQueue, @@ -38,6 +51,8 @@ func NewSynchronizer(settings *configuration.Settings, eventQueue workqueue.Rate return &synchronizer, nil } +// AddEvents adds a list of events to the queue. If no hosts are specified this is a null operation. +// Events will fan out to the number of hosts specified before being added to the queue. func (s *Synchronizer) AddEvents(events core.ServerUpdateEvents) { logrus.Debugf(`Synchronizer::AddEvents adding %d events`, len(events)) @@ -53,6 +68,8 @@ func (s *Synchronizer) AddEvents(events core.ServerUpdateEvents) { } } +// AddEvent adds an event to the queue. If no hosts are specified this is a null operation. +// Events will be added to the queue after a random delay between MinMillisecondsJitter and MaxMillisecondsJitter. func (s *Synchronizer) AddEvent(event *core.ServerUpdateEvent) { logrus.Debugf(`Synchronizer::AddEvent: %#v`, event) @@ -65,6 +82,7 @@ func (s *Synchronizer) AddEvent(event *core.ServerUpdateEvent) { s.eventQueue.AddAfter(event, after) } +// Run starts the Synchronizer, spins up Goroutines to process events, and waits for a stop signal. func (s *Synchronizer) Run(stopCh <-chan struct{}) { logrus.Debug(`Synchronizer::Run`) @@ -75,11 +93,15 @@ func (s *Synchronizer) Run(stopCh <-chan struct{}) { <-stopCh } +// ShutDown stops the Synchronizer and shuts down the event queue func (s *Synchronizer) ShutDown() { logrus.Debugf(`Synchronizer::ShutDown`) s.eventQueue.ShutDownWithDrain() } +// buildBorderClient creates a Border Client for the specified event. +// NOTE: There is an open issue (https://github.com/nginxinc/nginx-k8s-loadbalancer/issues/36) to move creation +// of the underlying Border Server client to the NewBorderClient function. func (s *Synchronizer) buildBorderClient(event *core.ServerUpdateEvent) (application.Interface, error) { logrus.Debugf(`Synchronizer::buildBorderClient`) @@ -98,6 +120,7 @@ func (s *Synchronizer) buildBorderClient(event *core.ServerUpdateEvent) (applica return application.NewBorderClient(event.ClientType, ngxClient) } +// fanOutEventToHosts takes a list of events and returns a list of events, one for each Border Server. func (s *Synchronizer) fanOutEventToHosts(event core.ServerUpdateEvents) core.ServerUpdateEvents { logrus.Debugf(`Synchronizer::fanOutEventToHosts: %#v`, event) @@ -115,6 +138,7 @@ func (s *Synchronizer) fanOutEventToHosts(event core.ServerUpdateEvents) core.Se return events } +// handleEvent dispatches an event to the proper handler function. func (s *Synchronizer) handleEvent(event *core.ServerUpdateEvent) error { logrus.Debugf(`Synchronizer::handleEvent: Id: %s`, event.Id) @@ -141,6 +165,7 @@ func (s *Synchronizer) handleEvent(event *core.ServerUpdateEvent) error { return err } +// handleCreatedUpdatedEvent handles events of type Created or Updated. func (s *Synchronizer) handleCreatedUpdatedEvent(serverUpdateEvent *core.ServerUpdateEvent) error { logrus.Debugf(`Synchronizer::handleCreatedUpdatedEvent: Id: %s`, serverUpdateEvent.Id) @@ -158,6 +183,7 @@ func (s *Synchronizer) handleCreatedUpdatedEvent(serverUpdateEvent *core.ServerU return nil } +// handleDeletedEvent handles events of type Deleted. func (s *Synchronizer) handleDeletedEvent(serverUpdateEvent *core.ServerUpdateEvent) error { logrus.Debugf(`Synchronizer::handleDeletedEvent: Id: %s`, serverUpdateEvent.Id) @@ -175,6 +201,7 @@ func (s *Synchronizer) handleDeletedEvent(serverUpdateEvent *core.ServerUpdateEv return nil } +// handleNextEvent pulls an event from the event queue and feeds it to the event handler with retry logic func (s *Synchronizer) handleNextEvent() bool { logrus.Debug(`Synchronizer::handleNextEvent`) @@ -191,12 +218,14 @@ func (s *Synchronizer) handleNextEvent() bool { return true } +// worker is the main message loop func (s *Synchronizer) worker() { logrus.Debug(`Synchronizer::worker`) for s.handleNextEvent() { } } +// withRetry handles errors from the event handler and requeues events that fail func (s *Synchronizer) withRetry(err error, event *core.ServerUpdateEvent) { logrus.Debug("Synchronizer::withRetry") if err != nil { diff --git a/internal/translation/translator.go b/internal/translation/translator.go index 2169251..55d7c29 100644 --- a/internal/translation/translator.go +++ b/internal/translation/translator.go @@ -7,6 +7,7 @@ package translation import ( "fmt" + "github.com/nginxinc/kubernetes-nginx-ingress/internal/application" "github.com/nginxinc/kubernetes-nginx-ingress/internal/configuration" "github.com/nginxinc/kubernetes-nginx-ingress/internal/core" "github.com/sirupsen/logrus" @@ -14,6 +15,8 @@ import ( "strings" ) +// Translate transforms event data into an intermediate format that can be consumed by the BorderClient implementations +// and used to update the Border Servers. func Translate(event *core.Event) (core.ServerUpdateEvents, error) { logrus.Debug("Translate::Translate") @@ -22,6 +25,7 @@ func Translate(event *core.Event) (core.ServerUpdateEvents, error) { return buildServerUpdateEvents(portsOfInterest, event) } +// filterPorts returns a list of ports that have the NklPrefix in the port name. func filterPorts(ports []v1.ServicePort) []v1.ServicePort { var portsOfInterest []v1.ServicePort @@ -80,10 +84,12 @@ func buildUpstreamServers(nodeIps []string, port v1.ServicePort) (core.UpstreamS return servers, nil } +// fixIngressName removes the NklPrefix from the port name func fixIngressName(name string) string { return name[4:] } +// getClientType returns the client type for the port, defaults to ClientTypeHttp if no Annotation is found. func getClientType(portName string, annotations map[string]string) string { key := fmt.Sprintf("%s/%s", configuration.PortAnnotationPrefix, portName) logrus.Infof("getClientType: key=%s", key) @@ -93,5 +99,5 @@ func getClientType(portName string, annotations map[string]string) string { } } - return "http" + return application.ClientTypeHttp }