From 290d4bac7776f461dce431287925ab86fcd74aa0 Mon Sep 17 00:00:00 2001 From: adel121 Date: Wed, 18 Dec 2024 10:10:38 +0100 Subject: [PATCH] notify leader election subscribers on leadership state change --- .../subcommands/start/command.go | 30 +++++++------ .../controllers/secret/controller.go | 32 +++++++------- .../controllers/secret/controller_test.go | 2 +- .../controllers/webhook/controller_base.go | 16 +++---- .../webhook/controller_base_test.go | 4 +- .../controllers/webhook/controller_v1.go | 4 +- .../controllers/webhook/controller_v1_test.go | 2 +- .../controllers/webhook/controller_v1beta1.go | 4 +- .../webhook/controller_v1beta1_test.go | 2 +- .../admission/patch/file_provider.go | 24 ++++++----- .../admission/patch/file_provider_test.go | 2 +- pkg/clusteragent/admission/patch/provider.go | 6 +-- .../admission/patch/rc_provider.go | 32 +++++++------- .../admission/patch/rc_provider_test.go | 4 +- pkg/clusteragent/admission/patch/start.go | 18 ++++---- pkg/clusteragent/admission/start.go | 27 ++++++------ .../leaderelection/leaderelection.go | 16 ++++--- .../leaderelection/leaderelection_engine.go | 13 ++++-- .../leaderelection/leaderelection_test.go | 42 +++++++++++++++++-- 19 files changed, 169 insertions(+), 111 deletions(-) diff --git a/cmd/cluster-agent/subcommands/start/command.go b/cmd/cluster-agent/subcommands/start/command.go index aea5e4207f57c..80fe4a3b6d383 100644 --- a/cmd/cluster-agent/subcommands/start/command.go +++ b/cmd/cluster-agent/subcommands/start/command.go @@ -455,13 +455,12 @@ func start(log log.Component, if config.GetBool("admission_controller.enabled") { if config.GetBool("admission_controller.auto_instrumentation.patcher.enabled") { patchCtx := admissionpatch.ControllerContext{ - IsLeaderFunc: le.IsLeader, - LeaderSubscribeFunc: le.Subscribe, - K8sClient: apiCl.Cl, - RcClient: rcClient, - ClusterName: clusterName, - ClusterID: clusterID, - StopCh: stopCh, + LeadershipStateSubscribeFunc: le.Subscribe, + K8sClient: apiCl.Cl, + RcClient: rcClient, + ClusterName: clusterName, + ClusterID: clusterID, + StopCh: stopCh, } if err := admissionpatch.StartControllers(patchCtx); err != nil { log.Errorf("Cannot start auto instrumentation patcher: %v", err) @@ -471,15 +470,14 @@ func start(log log.Component, } admissionCtx := admissionpkg.ControllerContext{ - IsLeaderFunc: le.IsLeader, - LeaderSubscribeFunc: le.Subscribe, - SecretInformers: apiCl.CertificateSecretInformerFactory, - ValidatingInformers: apiCl.WebhookConfigInformerFactory, - MutatingInformers: apiCl.WebhookConfigInformerFactory, - Client: apiCl.Cl, - StopCh: stopCh, - ValidatingStopCh: validatingStopCh, - Demultiplexer: demultiplexer, + LeadershipStateSubscribeFunc: le.Subscribe, + SecretInformers: apiCl.CertificateSecretInformerFactory, + ValidatingInformers: apiCl.WebhookConfigInformerFactory, + MutatingInformers: apiCl.WebhookConfigInformerFactory, + Client: apiCl.Cl, + StopCh: stopCh, + ValidatingStopCh: validatingStopCh, + Demultiplexer: demultiplexer, } webhooks, err := admissionpkg.StartControllers(admissionCtx, wmeta, pa, datadogConfig) diff --git a/pkg/clusteragent/admission/controllers/secret/controller.go b/pkg/clusteragent/admission/controllers/secret/controller.go index fccdce4aebc6b..38e80e0851d12 100644 --- a/pkg/clusteragent/admission/controllers/secret/controller.go +++ b/pkg/clusteragent/admission/controllers/secret/controller.go @@ -34,19 +34,19 @@ import ( // Controller is responsible for creating and refreshing the Secret object // that contains the certificate of the Admission Webhook. type Controller struct { - clientSet kubernetes.Interface - secretsLister corelisters.SecretLister - secretsSynced cache.InformerSynced - config Config - dnsNames []string - dnsNamesDigest uint64 - queue workqueue.TypedRateLimitingInterface[string] - isLeaderFunc func() bool - isLeaderNotif <-chan struct{} + clientSet kubernetes.Interface + secretsLister corelisters.SecretLister + secretsSynced cache.InformerSynced + config Config + dnsNames []string + dnsNamesDigest uint64 + queue workqueue.TypedRateLimitingInterface[string] + isLeaderFunc func() bool + leadershipStateNotif <-chan struct{} } // NewController returns a new Secret Controller. -func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, isLeaderFunc func() bool, isLeaderNotif <-chan struct{}, config Config) *Controller { +func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config) *Controller { dnsNames := generateDNSNames(config.GetNs(), config.GetSvc()) controller := &Controller{ clientSet: client, @@ -59,8 +59,8 @@ func NewController(client kubernetes.Interface, secretInformer coreinformers.Sec workqueue.DefaultTypedControllerRateLimiter[string](), workqueue.TypedRateLimitingQueueConfig[string]{Name: "secrets"}, ), - isLeaderFunc: isLeaderFunc, - isLeaderNotif: isLeaderNotif, + isLeaderFunc: isLeaderFunc, + leadershipStateNotif: leadershipStateNotif, } if _, err := secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, @@ -101,9 +101,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) { func (c *Controller) enqueueOnLeaderNotif(stop <-chan struct{}) { for { select { - case <-c.isLeaderNotif: - log.Infof("Got a leader notification, enqueuing a reconciliation for %s/%s", c.config.GetNs(), c.config.GetName()) - c.triggerReconciliation() + case <-c.leadershipStateNotif: + if c.isLeaderFunc() { + log.Infof("Got a leader notification, enqueuing a reconciliation for %s/%s", c.config.GetNs(), c.config.GetName()) + c.triggerReconciliation() + } case <-stop: return } diff --git a/pkg/clusteragent/admission/controllers/secret/controller_test.go b/pkg/clusteragent/admission/controllers/secret/controller_test.go index 650c354a6d013..41b9adffae80f 100644 --- a/pkg/clusteragent/admission/controllers/secret/controller_test.go +++ b/pkg/clusteragent/admission/controllers/secret/controller_test.go @@ -175,7 +175,7 @@ func (f *fixture) run(stopCh <-chan struct{}) *Controller { f.client, factory.Core().V1().Secrets(), func() bool { return true }, - make(chan struct{}), + make(<-chan struct{}), cfg, ) diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_base.go b/pkg/clusteragent/admission/controllers/webhook/controller_base.go index 5505337ba5838..68c1b9fd3f120 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_base.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_base.go @@ -51,7 +51,7 @@ func NewController( validatingInformers admissionregistration.Interface, mutatingInformers admissionregistration.Interface, isLeaderFunc func() bool, - isLeaderNotif <-chan struct{}, + leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, pa workload.PodPatcher, @@ -59,9 +59,9 @@ func NewController( demultiplexer demultiplexer.Component, ) Controller { if config.useAdmissionV1() { - return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, isLeaderNotif, config, wmeta, pa, datadogConfig, demultiplexer) + return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pa, datadogConfig, demultiplexer) } - return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, isLeaderNotif, config, wmeta, pa, datadogConfig, demultiplexer) + return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pa, datadogConfig, demultiplexer) } // Webhook represents an admission webhook @@ -162,7 +162,7 @@ type controllerBase struct { mutatingWebhooksSynced cache.InformerSynced //nolint:structcheck queue workqueue.TypedRateLimitingInterface[string] isLeaderFunc func() bool - isLeaderNotif <-chan struct{} + leadershipStateNotif <-chan struct{} webhooks []Webhook } @@ -186,9 +186,11 @@ func (c *controllerBase) EnabledWebhooks() []Webhook { func (c *controllerBase) enqueueOnLeaderNotif(stop <-chan struct{}) { for { select { - case <-c.isLeaderNotif: - log.Infof("Got a leader notification, enqueuing a reconciliation for %q", c.config.getWebhookName()) - c.triggerReconciliation() + case <-c.leadershipStateNotif: + if c.isLeaderFunc() { + log.Infof("Got a leader notification, enqueuing a reconciliation for %q", c.config.getWebhookName()) + c.triggerReconciliation() + } case <-stop: return } diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_base_test.go b/pkg/clusteragent/admission/controllers/webhook/controller_base_test.go index d5612b96c74e0..67281c628dd92 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_base_test.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_base_test.go @@ -35,7 +35,7 @@ func TestNewController(t *testing.T) { factory.Admissionregistration(), factory.Admissionregistration(), func() bool { return true }, - make(chan struct{}), + make(<-chan struct{}), getV1Cfg(t), wmeta, nil, @@ -52,7 +52,7 @@ func TestNewController(t *testing.T) { factory.Admissionregistration(), factory.Admissionregistration(), func() bool { return true }, - make(chan struct{}), + make(<-chan struct{}), getV1beta1Cfg(t), wmeta, nil, diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1.go index 003ccbe10986a..4cd136fa776d6 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1.go @@ -53,7 +53,7 @@ func NewControllerV1( validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer, mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer, isLeaderFunc func() bool, - isLeaderNotif <-chan struct{}, + leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, pa workload.PodPatcher, @@ -75,7 +75,7 @@ func NewControllerV1( workqueue.TypedRateLimitingQueueConfig[string]{Name: "webhooks"}, ) controller.isLeaderFunc = isLeaderFunc - controller.isLeaderNotif = isLeaderNotif + controller.leadershipStateNotif = leadershipStateNotif controller.webhooks = controller.generateWebhooks(wmeta, pa, datadogConfig, demultiplexer) controller.generateTemplates() diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go index 6a52255c42e29..3c0bbb81bd5f3 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1_test.go @@ -1190,7 +1190,7 @@ func (f *fixtureV1) createController() (*ControllerV1, informers.SharedInformerF factory.Admissionregistration().V1().ValidatingWebhookConfigurations(), factory.Admissionregistration().V1().MutatingWebhookConfigurations(), func() bool { return true }, - make(chan struct{}), + make(<-chan struct{}), getV1Cfg(f.t), wmeta, nil, diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go index dad86838eff33..201a784ac7b8d 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1.go @@ -54,7 +54,7 @@ func NewControllerV1beta1( validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer, mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer, isLeaderFunc func() bool, - isLeaderNotif <-chan struct{}, + leadershipStateNotif <-chan struct{}, config Config, wmeta workloadmeta.Component, pa workload.PodPatcher, @@ -76,7 +76,7 @@ func NewControllerV1beta1( workqueue.TypedRateLimitingQueueConfig[string]{Name: "webhooks"}, ) controller.isLeaderFunc = isLeaderFunc - controller.isLeaderNotif = isLeaderNotif + controller.leadershipStateNotif = leadershipStateNotif controller.webhooks = controller.generateWebhooks(wmeta, pa, datadogConfig, demultiplexer) controller.generateTemplates() diff --git a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go index 62fa7713cd809..ef20ff4343a97 100644 --- a/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go +++ b/pkg/clusteragent/admission/controllers/webhook/controller_v1beta1_test.go @@ -1184,7 +1184,7 @@ func (f *fixtureV1beta1) createController() (*ControllerV1beta1, informers.Share factory.Admissionregistration().V1beta1().ValidatingWebhookConfigurations(), factory.Admissionregistration().V1beta1().MutatingWebhookConfigurations(), func() bool { return true }, - make(chan struct{}), + make(<-chan struct{}), getV1beta1Cfg(f.t), wmeta, nil, diff --git a/pkg/clusteragent/admission/patch/file_provider.go b/pkg/clusteragent/admission/patch/file_provider.go index 71df716b951ba..7dce3050a7f39 100644 --- a/pkg/clusteragent/admission/patch/file_provider.go +++ b/pkg/clusteragent/admission/patch/file_provider.go @@ -18,7 +18,8 @@ import ( // filePatchProvider this is a stub and will be used for e2e testing only type filePatchProvider struct { file string - isLeaderNotif <-chan struct{} + leadershipStateNotif <-chan struct{} + isLeaderFunc func() bool pollInterval time.Duration subscribers map[TargetObjKind]chan Request lastSuccessfulRefresh time.Time @@ -27,13 +28,14 @@ type filePatchProvider struct { var _ patchProvider = &filePatchProvider{} -func newfileProvider(file string, isLeaderNotif <-chan struct{}, clusterName string) *filePatchProvider { +func newfileProvider(file string, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, clusterName string) *filePatchProvider { return &filePatchProvider{ - file: file, - isLeaderNotif: isLeaderNotif, - pollInterval: 15 * time.Second, - subscribers: make(map[TargetObjKind]chan Request), - clusterName: clusterName, + file: file, + leadershipStateNotif: leadershipStateNotif, + pollInterval: 15 * time.Second, + subscribers: make(map[TargetObjKind]chan Request), + clusterName: clusterName, + isLeaderFunc: isLeaderFunc, } } @@ -49,9 +51,11 @@ func (fpp *filePatchProvider) start(stopCh <-chan struct{}) { defer ticker.Stop() for { select { - case <-fpp.isLeaderNotif: - log.Info("Got a leader notification, polling from file") - fpp.process(true) + case <-fpp.leadershipStateNotif: + if fpp.isLeaderFunc() { + log.Info("Got a leader notification, polling from file") + fpp.process(true) + } case <-ticker.C: fpp.process(false) case <-stopCh: diff --git a/pkg/clusteragent/admission/patch/file_provider_test.go b/pkg/clusteragent/admission/patch/file_provider_test.go index 5916cbce5dbc6..63068a8ac836f 100644 --- a/pkg/clusteragent/admission/patch/file_provider_test.go +++ b/pkg/clusteragent/admission/patch/file_provider_test.go @@ -14,7 +14,7 @@ import ( ) func TestFileProviderProcess(t *testing.T) { - fpp := newfileProvider("testdata/auto-instru.json", make(chan struct{}), "dev") + fpp := newfileProvider("testdata/auto-instru.json", func() bool { return true }, make(<-chan struct{}), "dev") notifs := fpp.subscribe(KindDeployment) fpp.process(false) require.Len(t, notifs, 1) diff --git a/pkg/clusteragent/admission/patch/provider.go b/pkg/clusteragent/admission/patch/provider.go index 2b1469533c38a..3f4ea77b5fa12 100644 --- a/pkg/clusteragent/admission/patch/provider.go +++ b/pkg/clusteragent/admission/patch/provider.go @@ -20,14 +20,14 @@ type patchProvider interface { subscribe(kind TargetObjKind) chan Request } -func newPatchProvider(rcClient *rcclient.Client, isLeaderNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (patchProvider, error) { +func newPatchProvider(rcClient *rcclient.Client, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (patchProvider, error) { if pkgconfigsetup.IsRemoteConfigEnabled(pkgconfigsetup.Datadog()) { - return newRemoteConfigProvider(rcClient, isLeaderNotif, telemetryCollector, clusterName) + return newRemoteConfigProvider(rcClient, isLeaderFunc, leadershipStateNotif, telemetryCollector, clusterName) } if pkgconfigsetup.Datadog().GetBool("admission_controller.auto_instrumentation.patcher.fallback_to_file_provider") { // Use the file config provider for e2e testing only (it replaces RC as a source of configs) file := pkgconfigsetup.Datadog().GetString("admission_controller.auto_instrumentation.patcher.file_provider_path") - return newfileProvider(file, isLeaderNotif, clusterName), nil + return newfileProvider(file, isLeaderFunc, leadershipStateNotif, clusterName), nil } return nil, errors.New("remote config is disabled") } diff --git a/pkg/clusteragent/admission/patch/rc_provider.go b/pkg/clusteragent/admission/patch/rc_provider.go index e64525b03a845..8cc1580937ec1 100644 --- a/pkg/clusteragent/admission/patch/rc_provider.go +++ b/pkg/clusteragent/admission/patch/rc_provider.go @@ -20,25 +20,27 @@ import ( // remoteConfigProvider consumes tracing configs from RC and delivers them to the patcher type remoteConfigProvider struct { - client *rcclient.Client - isLeaderNotif <-chan struct{} - subscribers map[TargetObjKind]chan Request - clusterName string - telemetryCollector telemetry.TelemetryCollector + client *rcclient.Client + leadershipStateNotif <-chan struct{} + isLeaderFunc func() bool + subscribers map[TargetObjKind]chan Request + clusterName string + telemetryCollector telemetry.TelemetryCollector } var _ patchProvider = &remoteConfigProvider{} -func newRemoteConfigProvider(client *rcclient.Client, isLeaderNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (*remoteConfigProvider, error) { +func newRemoteConfigProvider(client *rcclient.Client, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (*remoteConfigProvider, error) { if client == nil { return nil, errors.New("remote config client not initialized") } return &remoteConfigProvider{ - client: client, - isLeaderNotif: isLeaderNotif, - subscribers: make(map[TargetObjKind]chan Request), - clusterName: clusterName, - telemetryCollector: telemetryCollector, + client: client, + leadershipStateNotif: leadershipStateNotif, + subscribers: make(map[TargetObjKind]chan Request), + clusterName: clusterName, + telemetryCollector: telemetryCollector, + isLeaderFunc: isLeaderFunc, }, nil } @@ -48,9 +50,11 @@ func (rcp *remoteConfigProvider) start(stopCh <-chan struct{}) { rcp.client.Start() for { select { - case <-rcp.isLeaderNotif: - log.Info("Got a leader notification, polling from remote-config") - rcp.process(rcp.client.GetConfigs(state.ProductAPMTracing), rcp.client.UpdateApplyStatus) + case <-rcp.leadershipStateNotif: + if rcp.isLeaderFunc() { + log.Info("Got a leader notification, polling from remote-config") + rcp.process(rcp.client.GetConfigs(state.ProductAPMTracing), rcp.client.UpdateApplyStatus) + } case <-stopCh: log.Info("Shutting down remote-config patch provider") rcp.client.Close() diff --git a/pkg/clusteragent/admission/patch/rc_provider_test.go b/pkg/clusteragent/admission/patch/rc_provider_test.go index 7f709bcb81622..0d04e551c7d3e 100644 --- a/pkg/clusteragent/admission/patch/rc_provider_test.go +++ b/pkg/clusteragent/admission/patch/rc_provider_test.go @@ -12,9 +12,9 @@ import ( "testing" "github.com/DataDog/datadog-agent/pkg/clusteragent/telemetry" - rcclient "github.com/DataDog/datadog-agent/pkg/config/remote/client" "github.com/DataDog/datadog-agent/pkg/remoteconfig/state" + "github.com/stretchr/testify/require" ) @@ -40,7 +40,7 @@ func TestProcess(t *testing.T) { ` return []byte(fmt.Sprintf(base, cluster, kind)) } - rcp, err := newRemoteConfigProvider(&rcclient.Client{}, make(chan struct{}), telemetry.NewNoopCollector(), "dev") + rcp, err := newRemoteConfigProvider(&rcclient.Client{}, func() bool { return true }, make(<-chan struct{}), telemetry.NewNoopCollector(), "dev") require.NoError(t, err) notifs := rcp.subscribe(KindDeployment) in := map[string]state.RawConfig{ diff --git a/pkg/clusteragent/admission/patch/start.go b/pkg/clusteragent/admission/patch/start.go index 5d0cd02e27ac5..d05b1158f6158 100644 --- a/pkg/clusteragent/admission/patch/start.go +++ b/pkg/clusteragent/admission/patch/start.go @@ -17,13 +17,12 @@ import ( // ControllerContext holds necessary context for the patch controller type ControllerContext struct { - IsLeaderFunc func() bool - LeaderSubscribeFunc func() <-chan struct{} - K8sClient kubernetes.Interface - RcClient *rcclient.Client - ClusterName string - ClusterID string - StopCh chan struct{} + LeadershipStateSubscribeFunc func() (notifChan <-chan struct{}, isLeader func() bool) + K8sClient kubernetes.Interface + RcClient *rcclient.Client + ClusterName string + ClusterID string + StopCh chan struct{} } // StartControllers starts the patch controllers @@ -33,11 +32,12 @@ func StartControllers(ctx ControllerContext) error { if ctx.RcClient != nil { telemetryCollector = telemetry.NewCollector(ctx.RcClient.ID, ctx.ClusterID) } - provider, err := newPatchProvider(ctx.RcClient, ctx.LeaderSubscribeFunc(), telemetryCollector, ctx.ClusterName) + leadershipStateNotif, isLeaderFunc := ctx.LeadershipStateSubscribeFunc() + provider, err := newPatchProvider(ctx.RcClient, isLeaderFunc, leadershipStateNotif, telemetryCollector, ctx.ClusterName) if err != nil { return err } - patcher := newPatcher(ctx.K8sClient, ctx.IsLeaderFunc, telemetryCollector, provider) + patcher := newPatcher(ctx.K8sClient, isLeaderFunc, telemetryCollector, provider) go provider.start(ctx.StopCh) go patcher.start(ctx.StopCh) return nil diff --git a/pkg/clusteragent/admission/start.go b/pkg/clusteragent/admission/start.go index ef69edfd4a203..9b26f699246d0 100644 --- a/pkg/clusteragent/admission/start.go +++ b/pkg/clusteragent/admission/start.go @@ -28,15 +28,14 @@ import ( // ControllerContext holds necessary context for the admission controllers type ControllerContext struct { - IsLeaderFunc func() bool - LeaderSubscribeFunc func() <-chan struct{} - SecretInformers informers.SharedInformerFactory - ValidatingInformers informers.SharedInformerFactory - MutatingInformers informers.SharedInformerFactory - Client kubernetes.Interface - StopCh chan struct{} - ValidatingStopCh chan struct{} - Demultiplexer demultiplexer.Component + LeadershipStateSubscribeFunc func() (notifChan <-chan struct{}, isLeaderFunc func() bool) + SecretInformers informers.SharedInformerFactory + ValidatingInformers informers.SharedInformerFactory + MutatingInformers informers.SharedInformerFactory + Client kubernetes.Interface + StopCh chan struct{} + ValidatingStopCh chan struct{} + Demultiplexer demultiplexer.Component } // StartControllers starts the secret and webhook controllers @@ -48,6 +47,8 @@ func StartControllers(ctx ControllerContext, wmeta workloadmeta.Component, pa wo return webhooks, nil } + notifChan, isLeaderFunc := ctx.LeadershipStateSubscribeFunc() + certConfig := secret.NewCertConfig( datadogConfig.GetDuration("admission_controller.certificate.expiration_threshold")*time.Hour, datadogConfig.GetDuration("admission_controller.certificate.validity_bound")*time.Hour) @@ -59,8 +60,8 @@ func StartControllers(ctx ControllerContext, wmeta workloadmeta.Component, pa wo secretController := secret.NewController( ctx.Client, ctx.SecretInformers.Core().V1().Secrets(), - ctx.IsLeaderFunc, - ctx.LeaderSubscribeFunc(), + isLeaderFunc, + notifChan, secretConfig, ) @@ -85,8 +86,8 @@ func StartControllers(ctx ControllerContext, wmeta workloadmeta.Component, pa wo ctx.SecretInformers.Core().V1().Secrets(), ctx.ValidatingInformers.Admissionregistration(), ctx.MutatingInformers.Admissionregistration(), - ctx.IsLeaderFunc, - ctx.LeaderSubscribeFunc(), + isLeaderFunc, + notifChan, webhookConfig, wmeta, pa, diff --git a/pkg/util/kubernetes/apiserver/leaderelection/leaderelection.go b/pkg/util/kubernetes/apiserver/leaderelection/leaderelection.go index 79e8a89895beb..2c55b5042b8ce 100644 --- a/pkg/util/kubernetes/apiserver/leaderelection/leaderelection.go +++ b/pkg/util/kubernetes/apiserver/leaderelection/leaderelection.go @@ -281,17 +281,23 @@ func (le *LeaderEngine) IsLeader() bool { return le.GetLeader() == le.HolderIdentity } -// Subscribe allows any component to receive a notification -// when the current process becomes leader. +// Subscribe allows any component to receive a notification when leadership state of the current +// process changes. +// +// The subscriber will not be notified about the leadership state change if the previous notification +// hasn't yet been consumed from the notification channel. +// // Calling Subscribe is optional, use IsLeader if the client doesn't need an event-based approach. -func (le *LeaderEngine) Subscribe() <-chan struct{} { - c := make(chan struct{}, 5) // buffered channel to avoid blocking in case of stuck subscriber +func (le *LeaderEngine) Subscribe() (leadershipChangeNotif <-chan struct{}, isLeader func() bool) { + c := make(chan struct{}, 1) le.m.Lock() le.subscribers = append(le.subscribers, c) le.m.Unlock() - return c + leadershipChangeNotif = c + isLeader = le.IsLeader + return } func detectLeases(client discovery.DiscoveryInterface) (bool, error) { diff --git a/pkg/util/kubernetes/apiserver/leaderelection/leaderelection_engine.go b/pkg/util/kubernetes/apiserver/leaderelection/leaderelection_engine.go index d134c95f17eb2..4a765a46307a3 100644 --- a/pkg/util/kubernetes/apiserver/leaderelection/leaderelection_engine.go +++ b/pkg/util/kubernetes/apiserver/leaderelection/leaderelection_engine.go @@ -154,7 +154,7 @@ func (le *LeaderEngine) newElection() (*ld.LeaderElector, error) { OnStartedLeading: func(context.Context) { le.updateLeaderIdentity(le.HolderIdentity) le.reportLeaderMetric(true) - le.notify() + le.notify() // current process gained leadership log.Infof("Started leading as %q...", le.HolderIdentity) }, // OnStoppedLeading shouldn't be called unless the election is lost. This could happen if @@ -162,6 +162,7 @@ func (le *LeaderEngine) newElection() (*ld.LeaderElector, error) { OnStoppedLeading: func() { le.updateLeaderIdentity("") le.reportLeaderMetric(false) + le.notify() // current process lost leadership log.Infof("Stopped leading %q", le.HolderIdentity) }, } @@ -219,14 +220,18 @@ func (le *LeaderEngine) reportLeaderMetric(isLeader bool) { le.leaderMetric.Set(1.0, metrics.JoinLeaderValue, strconv.FormatBool(isLeader)) } -// notify sends a notification to subscribers when the current process becomes leader. -// notify is a simplistic notifier but can be extended to send different notification -// kinds (leadership acquisition / loss) in the future if needed. +// notify sends a notification to subscribers when the leadership state of the current +// process changes func (le *LeaderEngine) notify() { le.m.Lock() defer le.m.Unlock() for _, s := range le.subscribers { + if len(s) > 0 { + // subscriber already notified about the change in leadership state + continue + } + s <- struct{}{} } } diff --git a/pkg/util/kubernetes/apiserver/leaderelection/leaderelection_test.go b/pkg/util/kubernetes/apiserver/leaderelection/leaderelection_test.go index eefce053e2e22..f07e2b9d2e7c5 100644 --- a/pkg/util/kubernetes/apiserver/leaderelection/leaderelection_test.go +++ b/pkg/util/kubernetes/apiserver/leaderelection/leaderelection_test.go @@ -202,8 +202,9 @@ func TestSubscribe(t *testing.T) { } { t.Run(fmt.Sprintf("case %d: %s", nb, tc.name), func(t *testing.T) { client := fake.NewSimpleClientset() + ctx, cancel := context.WithCancel(context.Background()) le := &LeaderEngine{ - ctx: context.Background(), + ctx: ctx, HolderIdentity: "foo", LeaseName: leaseName, LeaderNamespace: "default", @@ -214,8 +215,8 @@ func TestSubscribe(t *testing.T) { lockType: tc.lockType, } - notif1 := le.Subscribe() - notif2 := le.Subscribe() + notif1, _ := le.Subscribe() + notif2, _ := le.Subscribe() require.Len(t, le.subscribers, 2) err := tc.getTokenFunc(client) @@ -234,6 +235,39 @@ func TestSubscribe(t *testing.T) { for { select { case <-notif1: + counter1++ + require.Truef(t, le.IsLeader(), "Expected to be leader") + if counter1 > 1 { + require.Fail(t, "Received too many notifications") + break + } + + case <-notif2: + counter2++ + require.Truef(t, le.IsLeader(), "Expected to be leader") + if counter2 > 1 { + require.Fail(t, "Received too many notifications") + break + } + + case <-time.After(5 * time.Second): + require.Fail(t, "Waiting on leader notification timed out") + break + } + + if counter1 == 1 && counter2 == 1 { + break + } + } + + // simulate leadership lease loss by cancelling leader election context + cancel() + + counter1, counter2 = 0, 0 + for { + select { + case <-notif1: + require.Falsef(t, le.IsLeader(), "Expected to be a follower") counter1++ if counter1 > 1 { require.Fail(t, "Received too many notifications") @@ -241,6 +275,7 @@ func TestSubscribe(t *testing.T) { } case <-notif2: + require.Falsef(t, le.IsLeader(), "Expected to be a follower") counter2++ if counter2 > 1 { require.Fail(t, "Received too many notifications") @@ -256,6 +291,7 @@ func TestSubscribe(t *testing.T) { break } } + }) }