Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(EG K8S Provider): Enable leader election for EG controller #2694

Merged
merged 26 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions api/v1alpha1/envoygateway_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ package v1alpha1

import (
"fmt"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)

// DefaultEnvoyGateway returns a new EnvoyGateway with default configuration parameters.
Expand Down Expand Up @@ -39,6 +41,11 @@ func (e *EnvoyGateway) SetEnvoyGatewayDefaults() {
if e.Provider == nil {
e.Provider = DefaultEnvoyGatewayProvider()
}
if e.Provider.Kubernetes == nil {
arkodg marked this conversation as resolved.
Show resolved Hide resolved
e.Provider.Kubernetes = &EnvoyGatewayKubernetesProvider{
LeaderElection: DefaultLeaderElection(),
}
}
if e.Gateway == nil {
e.Gateway = DefaultGateway()
}
Expand Down Expand Up @@ -85,6 +92,16 @@ func (e *EnvoyGateway) NamespaceMode() bool {
len(e.Provider.Kubernetes.Watch.Namespaces) > 0
}

// DefaultLeaderElection returns a new LeaderElection with default configuration parameters.
arkodg marked this conversation as resolved.
Show resolved Hide resolved
func DefaultLeaderElection() *LeaderElection {
return &LeaderElection{
RenewDeadline: ptr.To(time.Second * 10),
RetryPeriod: ptr.To(time.Second * 2),
LeaseDuration: ptr.To(time.Second * 15),
Disable: ptr.To(false),
}
}

// DefaultGateway returns a new Gateway with default configuration parameters.
func DefaultGateway() *Gateway {
return &Gateway{
Expand Down Expand Up @@ -195,9 +212,18 @@ func (r *EnvoyGatewayProvider) GetEnvoyGatewayKubeProvider() *EnvoyGatewayKubern

if r.Kubernetes == nil {
r.Kubernetes = DefaultEnvoyGatewayKubeProvider()
if r.Kubernetes.LeaderElection == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the case where r.Kubernetes is not nil, but r.Kubernetes.LeaderElection is nil

Copy link
Contributor Author

@alexwo alexwo Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

r.Kubernetes.LeaderElection = DefaultLeaderElection()
}
return r.Kubernetes
}

if r.Kubernetes.LeaderElection == nil {
r.Kubernetes.LeaderElection = DefaultLeaderElection()
}

r.Kubernetes.LeaderElection = DefaultLeaderElection()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed cleaning it up.


if r.Kubernetes.RateLimitDeployment == nil {
r.Kubernetes.RateLimitDeployment = DefaultKubernetesDeployment(DefaultRateLimitImage)
}
Expand Down
22 changes: 22 additions & 0 deletions api/v1alpha1/envoygateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package v1alpha1

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
gwapiv1 "sigs.k8s.io/gateway-api/apis/v1"
)
Expand Down Expand Up @@ -90,6 +92,21 @@ type EnvoyGatewaySpec struct {
ExtensionAPIs *ExtensionAPISettings `json:"extensionApis,omitempty"`
}

// LeaderElection defines the desired leader election settings.
type LeaderElection struct {
// LeaseDuration defines the time non-leader contenders will wait before attempting to claim leadership. It's based on the timestamp of the last acknowledged signal. The default setting is 15 seconds.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you follow the kubebuilder way of definition defaults, and also add optional tags
this may not be a CRD, but it helps with doc generation

// Default is 15 seconds.
LeaseDuration *time.Duration `json:"leaseDuration,omitempty"`
// RenewDeadline represents the time frame within which the current leader will attempt to renew its leadership status before relinquishing its position. The default setting is 10 seconds.
// Default is 10 seconds.
RenewDeadline *time.Duration `json:"renewDeadline,omitempty"`
// RetryPeriod denotes the interval at which LeaderElector clients should perform action retries. The default setting is 2 seconds.
// Default is 2 seconds.
RetryPeriod *time.Duration `json:"retryPeriod,omitempty"`
// Disable allows to disable leader election (enabled by default)
Disable *bool `json:"disable,omitempty"`
}

// EnvoyGatewayTelemetry defines telemetry configurations for envoy gateway control plane.
// Control plane will focus on metrics observability telemetry and tracing telemetry later.
type EnvoyGatewayTelemetry struct {
Expand Down Expand Up @@ -194,6 +211,11 @@ type EnvoyGatewayKubernetesProvider struct {
// OverwriteControlPlaneCerts updates the secrets containing the control plane certs, when set.
// +optional
OverwriteControlPlaneCerts *bool `json:"overwriteControlPlaneCerts,omitempty"`
// LeaderElection defines the settings related to leader election
zhaohuabing marked this conversation as resolved.
Show resolved Hide resolved
// if not configured leader election is enabled with default k8s settings
//
// +optional
LeaderElection *LeaderElection `json:"leaderElection,omitempty"`
}

const (
Expand Down
41 changes: 41 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions charts/gateway-helm/values.tmpl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ deployment:
targetPort: 19001
replicas: 1
pod:
affinity: {}
annotations:
affinity: { }
annotations:
prometheus.io/scrape: 'true'
prometheus.io/port: '19001'
labels: {}
labels: { }

config:
envoyGateway:
Expand Down
5 changes: 4 additions & 1 deletion internal/envoygateway/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Server struct {
DNSDomain string
// Logger is the logr implementation used by Envoy Gateway.
Logger logging.Logger
// Elected chan is used to signal what a leader is elected
Elected chan struct{}
}

// New returns a Server with default parameters.
Expand All @@ -45,7 +47,8 @@ func New() (*Server, error) {
Namespace: env.Lookup("ENVOY_GATEWAY_NAMESPACE", DefaultNamespace),
DNSDomain: env.Lookup("KUBERNETES_CLUSTER_DOMAIN", DefaultDNSDomain),
// the default logger
Logger: logging.DefaultLogger(v1alpha1.LogLevelInfo),
Logger: logging.DefaultLogger(v1alpha1.LogLevelInfo),
Elected: make(chan struct{}),
}, nil
}

Expand Down
21 changes: 21 additions & 0 deletions internal/envoygateway/config/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,27 @@ func TestDecode(t *testing.T) {
in: inPath + "invalid-gateway-version.yaml",
expect: false,
},
{
in: inPath + "gateway-leaderelection.yaml",
out: &v1alpha1.EnvoyGateway{
TypeMeta: metav1.TypeMeta{
Kind: v1alpha1.KindEnvoyGateway,
APIVersion: v1alpha1.GroupVersion.String(),
},
EnvoyGatewaySpec: v1alpha1.EnvoyGatewaySpec{
Gateway: v1alpha1.DefaultGateway(),
Provider: &v1alpha1.EnvoyGatewayProvider{
Type: v1alpha1.ProviderTypeKubernetes,
Kubernetes: &v1alpha1.EnvoyGatewayKubernetesProvider{
LeaderElection: &v1alpha1.LeaderElection{
Disable: ptr.To(true),
},
},
},
},
},
expect: true,
},
}

for _, tc := range testCases {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: EnvoyGateway
gateway:
controllerName: gateway.envoyproxy.io/gatewayclass-controller
provider:
type: Kubernetes
kubernetes:
leaderElection:
disable: true

Check failure on line 9 in internal/envoygateway/config/testdata/decoder/in/gateway-leaderelection.yaml

View workflow job for this annotation

GitHub Actions / lint

9:20 [new-line-at-end-of-file] no new line character at the end of file

Check failure on line 9 in internal/envoygateway/config/testdata/decoder/in/gateway-leaderelection.yaml

View workflow job for this annotation

GitHub Actions / lint

9:20 [new-line-at-end-of-file] no new line character at the end of file
29 changes: 24 additions & 5 deletions internal/infrastructure/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package runner
import (
"context"

"k8s.io/utils/ptr"

"github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/infrastructure"
Expand Down Expand Up @@ -41,14 +43,31 @@ func (r *Runner) Start(ctx context.Context) (err error) {
r.Logger.Error(err, "failed to create new manager")
return err
}
go r.subscribeToProxyInfraIR(ctx)

// Enable global ratelimit if it has been configured.
if r.EnvoyGateway.RateLimit != nil {
go r.enableRateLimitInfra(ctx)
var initInfra = func() {
go r.subscribeToProxyInfraIR(ctx)

// Enable global ratelimit if it has been configured.
if r.EnvoyGateway.RateLimit != nil {
go r.enableRateLimitInfra(ctx)
}
r.Logger.Info("started")
}

r.Logger.Info("started")
// When leader election is active, infrastructure initialization occurs only upon acquiring leadership
// to avoid multiple EG instances processing envoy proxy infra resources.
if !ptr.Deref(r.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
go func() {
select {
case <-ctx.Done():
return
case <-r.Elected:
initInfra()
}
}()
return
}
initInfra()
return
}

Expand Down
9 changes: 9 additions & 0 deletions internal/provider/kubernetes/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,15 @@ func (r *gatewayAPIReconciler) addFinalizer(ctx context.Context, gc *gwapiv1.Gat

// watchResources watches gateway api resources.
func (r *gatewayAPIReconciler) watchResources(ctx context.Context, mgr manager.Manager, c controller.Controller) error {
// Upon leader election, we retrigger the reconciliation process to allow the elected leader to
// process status updates and infrastructure changes. This step is crucial for synchronizing resources
// that may have been altered or introduced while there was no elected leader.
if err := c.Watch(
NewWatchAndReconcileSource(mgr.Elected(), &gwapiv1.GatewayClass{}),
handler.EnqueueRequestsFromMapFunc(r.enqueueClass)); err != nil {
return err
}

if err := c.Watch(
source.Kind(mgr.GetCache(), &gwapiv1.GatewayClass{}),
handler.EnqueueRequestsFromMapFunc(r.enqueueClass),
Expand Down
32 changes: 24 additions & 8 deletions internal/provider/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import (
"fmt"

"k8s.io/client-go/rest"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"

"github.com/envoyproxy/gateway/internal/envoygateway"
"github.com/envoyproxy/gateway/internal/envoygateway/config"
ec "github.com/envoyproxy/gateway/internal/envoygateway/config"
"github.com/envoyproxy/gateway/internal/message"
"github.com/envoyproxy/gateway/internal/status"
)
Expand All @@ -31,14 +33,23 @@ type Provider struct {
}

// New creates a new Provider from the provided EnvoyGateway.
func New(cfg *rest.Config, svr *config.Server, resources *message.ProviderResources) (*Provider, error) {
func New(cfg *rest.Config, svr *ec.Server, resources *message.ProviderResources) (*Provider, error) {
// TODO: Decide which mgr opts should be exposed through envoygateway.provider.kubernetes API.

mgrOpts := manager.Options{
Scheme: envoygateway.GetScheme(),
Logger: svr.Logger.Logger,
LeaderElection: false,
HealthProbeBindAddress: ":8081",
LeaderElectionID: "5b9825d2.gateway.envoyproxy.io",
Scheme: envoygateway.GetScheme(),
Logger: svr.Logger.Logger,
HealthProbeBindAddress: ":8081",
LeaderElectionID: "5b9825d2.gateway.envoyproxy.io",
LeaderElectionNamespace: svr.Namespace,
}

if !ptr.Deref(svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.Disable, false) {
mgrOpts.LeaderElection = true
mgrOpts.LeaseDuration = svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.LeaseDuration
mgrOpts.RetryPeriod = svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RetryPeriod
mgrOpts.RenewDeadline = svr.EnvoyGateway.Provider.Kubernetes.LeaderElection.RenewDeadline
mgrOpts.Controller = config.Controller{NeedLeaderElection: ptr.To(false)}
}

if svr.EnvoyGateway.NamespaceMode() {
Expand All @@ -47,7 +58,6 @@ func New(cfg *rest.Config, svr *config.Server, resources *message.ProviderResour
mgrOpts.Cache.DefaultNamespaces[watchNS] = cache.Config{}
}
}

mgr, err := ctrl.NewManager(cfg, mgrOpts)
if err != nil {
return nil, fmt.Errorf("failed to create manager: %w", err)
Expand All @@ -73,6 +83,12 @@ func New(cfg *rest.Config, svr *config.Server, resources *message.ProviderResour
return nil, fmt.Errorf("unable to set up ready check: %w", err)
}

// Emit elected & continue with deployment of infra resources
arkodg marked this conversation as resolved.
Show resolved Hide resolved
go func() {
<-mgr.Elected()
close(svr.Elected)
}()

return &Provider{
manager: mgr,
client: mgr.GetClient(),
Expand Down
Loading
Loading