Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

notify leader election subscribers on leadership state change #32323

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,13 +455,12 @@ func start(log log.Component,
if config.GetBool("admission_controller.enabled") {
if config.GetBool("admission_controller.auto_instrumentation.patcher.enabled") {
patchCtx := admissionpatch.ControllerContext{
IsLeaderFunc: le.IsLeader,
LeaderSubscribeFunc: le.Subscribe,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
ClusterID: clusterID,
StopCh: stopCh,
LeadershipStateSubscribeFunc: le.Subscribe,
K8sClient: apiCl.Cl,
RcClient: rcClient,
ClusterName: clusterName,
ClusterID: clusterID,
StopCh: stopCh,
}
if err := admissionpatch.StartControllers(patchCtx); err != nil {
log.Errorf("Cannot start auto instrumentation patcher: %v", err)
Expand All @@ -471,15 +470,14 @@ func start(log log.Component,
}

admissionCtx := admissionpkg.ControllerContext{
IsLeaderFunc: le.IsLeader,
LeaderSubscribeFunc: le.Subscribe,
SecretInformers: apiCl.CertificateSecretInformerFactory,
ValidatingInformers: apiCl.WebhookConfigInformerFactory,
MutatingInformers: apiCl.WebhookConfigInformerFactory,
Client: apiCl.Cl,
StopCh: stopCh,
ValidatingStopCh: validatingStopCh,
Demultiplexer: demultiplexer,
LeadershipStateSubscribeFunc: le.Subscribe,
SecretInformers: apiCl.CertificateSecretInformerFactory,
ValidatingInformers: apiCl.WebhookConfigInformerFactory,
MutatingInformers: apiCl.WebhookConfigInformerFactory,
Client: apiCl.Cl,
StopCh: stopCh,
ValidatingStopCh: validatingStopCh,
Demultiplexer: demultiplexer,
}

webhooks, err := admissionpkg.StartControllers(admissionCtx, wmeta, pa, datadogConfig)
Expand Down
32 changes: 17 additions & 15 deletions pkg/clusteragent/admission/controllers/secret/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ import (
// Controller is responsible for creating and refreshing the Secret object
// that contains the certificate of the Admission Webhook.
type Controller struct {
clientSet kubernetes.Interface
secretsLister corelisters.SecretLister
secretsSynced cache.InformerSynced
config Config
dnsNames []string
dnsNamesDigest uint64
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
isLeaderNotif <-chan struct{}
clientSet kubernetes.Interface
secretsLister corelisters.SecretLister
secretsSynced cache.InformerSynced
config Config
dnsNames []string
dnsNamesDigest uint64
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
leadershipStateNotif <-chan struct{}
}

// NewController returns a new Secret Controller.
func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, isLeaderFunc func() bool, isLeaderNotif <-chan struct{}, config Config) *Controller {
func NewController(client kubernetes.Interface, secretInformer coreinformers.SecretInformer, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, config Config) *Controller {
dnsNames := generateDNSNames(config.GetNs(), config.GetSvc())
controller := &Controller{
clientSet: client,
Expand All @@ -59,8 +59,8 @@ func NewController(client kubernetes.Interface, secretInformer coreinformers.Sec
workqueue.DefaultTypedControllerRateLimiter[string](),
workqueue.TypedRateLimitingQueueConfig[string]{Name: "secrets"},
),
isLeaderFunc: isLeaderFunc,
isLeaderNotif: isLeaderNotif,
isLeaderFunc: isLeaderFunc,
leadershipStateNotif: leadershipStateNotif,
}
if _, err := secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
Expand Down Expand Up @@ -101,9 +101,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
func (c *Controller) enqueueOnLeaderNotif(stop <-chan struct{}) {
for {
select {
case <-c.isLeaderNotif:
log.Infof("Got a leader notification, enqueuing a reconciliation for %s/%s", c.config.GetNs(), c.config.GetName())
c.triggerReconciliation()
case <-c.leadershipStateNotif:
if c.isLeaderFunc() {
log.Infof("Got a leader notification, enqueuing a reconciliation for %s/%s", c.config.GetNs(), c.config.GetName())
c.triggerReconciliation()
}
case <-stop:
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (f *fixture) run(stopCh <-chan struct{}) *Controller {
f.client,
factory.Core().V1().Secrets(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
cfg,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ func NewController(
validatingInformers admissionregistration.Interface,
mutatingInformers admissionregistration.Interface,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan struct{},
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
datadogConfig config.Component,
demultiplexer demultiplexer.Component,
) Controller {
if config.useAdmissionV1() {
return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, isLeaderNotif, config, wmeta, pa, datadogConfig, demultiplexer)
return NewControllerV1(client, secretInformer, validatingInformers.V1().ValidatingWebhookConfigurations(), mutatingInformers.V1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pa, datadogConfig, demultiplexer)
}
return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, isLeaderNotif, config, wmeta, pa, datadogConfig, demultiplexer)
return NewControllerV1beta1(client, secretInformer, validatingInformers.V1beta1().ValidatingWebhookConfigurations(), mutatingInformers.V1beta1().MutatingWebhookConfigurations(), isLeaderFunc, leadershipStateNotif, config, wmeta, pa, datadogConfig, demultiplexer)
}

// Webhook represents an admission webhook
Expand Down Expand Up @@ -162,7 +162,7 @@ type controllerBase struct {
mutatingWebhooksSynced cache.InformerSynced //nolint:structcheck
queue workqueue.TypedRateLimitingInterface[string]
isLeaderFunc func() bool
isLeaderNotif <-chan struct{}
leadershipStateNotif <-chan struct{}
webhooks []Webhook
}

Expand All @@ -186,9 +186,11 @@ func (c *controllerBase) EnabledWebhooks() []Webhook {
func (c *controllerBase) enqueueOnLeaderNotif(stop <-chan struct{}) {
for {
select {
case <-c.isLeaderNotif:
log.Infof("Got a leader notification, enqueuing a reconciliation for %q", c.config.getWebhookName())
c.triggerReconciliation()
case <-c.leadershipStateNotif:
if c.isLeaderFunc() {
Comment on lines +189 to +190
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: Are we certain that the isLeaderFunc() return value is always updated before receiving the leadershipStateNotif notification?
What I’d like to confirm is that there’s no potential race condition between these two lines. For example, the notification might be sent because the DCA instance became the leader, but this information hasn’t yet propagated to the c.isLeaderFunc() output. This could result in skipping the execution of c.triggerReconciliation().

Would it make sense to add a log in the else case, stating: “Received a notification but not recognized as leader yet”?

Copy link
Contributor Author

@adel121 adel121 Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The notification is never sent before the leadership status is updated fully and propagated to isLeader

Here is where notify is called. You can see that we synchronously call updateLeaderIdentity before we notify subscribers.

Finally, IsLeader has a very simple implementation. Once we updateLeaderIdentity, the change is instantly visible in isLeader func result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add a log in the else case, stating: “Received a notification but not recognized as leader yet”?

If it is not clear, the notification is not to tell the subscriber that the instance has become a leader. But rather to tell it that a change has occurred on the leadership state.

The subscriber is responsible to handle the logic based on the usecase, and isLeaderFunc is provided to determine the live state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add, I already added unit tests for the subscription feature which validates that the change will have already been propagated to isLeaderFunc when the notification is sent.

case <-notif1:
counter1++
require.Truef(t, le.IsLeader(), "Expected to be leader")

log.Infof("Got a leader notification, enqueuing a reconciliation for %q", c.config.getWebhookName())
c.triggerReconciliation()
}
case <-stop:
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestNewController(t *testing.T) {
factory.Admissionregistration(),
factory.Admissionregistration(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
getV1Cfg(t),
wmeta,
nil,
Expand All @@ -52,7 +52,7 @@ func TestNewController(t *testing.T) {
factory.Admissionregistration(),
factory.Admissionregistration(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
getV1beta1Cfg(t),
wmeta,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewControllerV1(
validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer,
mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan struct{},
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
Expand All @@ -75,7 +75,7 @@ func NewControllerV1(
workqueue.TypedRateLimitingQueueConfig[string]{Name: "webhooks"},
)
controller.isLeaderFunc = isLeaderFunc
controller.isLeaderNotif = isLeaderNotif
controller.leadershipStateNotif = leadershipStateNotif
controller.webhooks = controller.generateWebhooks(wmeta, pa, datadogConfig, demultiplexer)
controller.generateTemplates()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func (f *fixtureV1) createController() (*ControllerV1, informers.SharedInformerF
factory.Admissionregistration().V1().ValidatingWebhookConfigurations(),
factory.Admissionregistration().V1().MutatingWebhookConfigurations(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
getV1Cfg(f.t),
wmeta,
nil,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewControllerV1beta1(
validatingWebhookInformer admissioninformers.ValidatingWebhookConfigurationInformer,
mutatingWebhookInformer admissioninformers.MutatingWebhookConfigurationInformer,
isLeaderFunc func() bool,
isLeaderNotif <-chan struct{},
leadershipStateNotif <-chan struct{},
config Config,
wmeta workloadmeta.Component,
pa workload.PodPatcher,
Expand All @@ -76,7 +76,7 @@ func NewControllerV1beta1(
workqueue.TypedRateLimitingQueueConfig[string]{Name: "webhooks"},
)
controller.isLeaderFunc = isLeaderFunc
controller.isLeaderNotif = isLeaderNotif
controller.leadershipStateNotif = leadershipStateNotif
controller.webhooks = controller.generateWebhooks(wmeta, pa, datadogConfig, demultiplexer)
controller.generateTemplates()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ func (f *fixtureV1beta1) createController() (*ControllerV1beta1, informers.Share
factory.Admissionregistration().V1beta1().ValidatingWebhookConfigurations(),
factory.Admissionregistration().V1beta1().MutatingWebhookConfigurations(),
func() bool { return true },
make(chan struct{}),
make(<-chan struct{}),
getV1beta1Cfg(f.t),
wmeta,
nil,
Expand Down
24 changes: 14 additions & 10 deletions pkg/clusteragent/admission/patch/file_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import (
// filePatchProvider this is a stub and will be used for e2e testing only
type filePatchProvider struct {
file string
isLeaderNotif <-chan struct{}
leadershipStateNotif <-chan struct{}
isLeaderFunc func() bool
pollInterval time.Duration
subscribers map[TargetObjKind]chan Request
lastSuccessfulRefresh time.Time
Expand All @@ -27,13 +28,14 @@ type filePatchProvider struct {

var _ patchProvider = &filePatchProvider{}

func newfileProvider(file string, isLeaderNotif <-chan struct{}, clusterName string) *filePatchProvider {
func newfileProvider(file string, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, clusterName string) *filePatchProvider {
return &filePatchProvider{
file: file,
isLeaderNotif: isLeaderNotif,
pollInterval: 15 * time.Second,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
file: file,
leadershipStateNotif: leadershipStateNotif,
pollInterval: 15 * time.Second,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
isLeaderFunc: isLeaderFunc,
}
}

Expand All @@ -49,9 +51,11 @@ func (fpp *filePatchProvider) start(stopCh <-chan struct{}) {
defer ticker.Stop()
for {
select {
case <-fpp.isLeaderNotif:
log.Info("Got a leader notification, polling from file")
fpp.process(true)
case <-fpp.leadershipStateNotif:
if fpp.isLeaderFunc() {
log.Info("Got a leader notification, polling from file")
fpp.process(true)
}
case <-ticker.C:
fpp.process(false)
case <-stopCh:
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusteragent/admission/patch/file_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func TestFileProviderProcess(t *testing.T) {
fpp := newfileProvider("testdata/auto-instru.json", make(chan struct{}), "dev")
fpp := newfileProvider("testdata/auto-instru.json", func() bool { return true }, make(<-chan struct{}), "dev")
notifs := fpp.subscribe(KindDeployment)
fpp.process(false)
require.Len(t, notifs, 1)
Expand Down
6 changes: 3 additions & 3 deletions pkg/clusteragent/admission/patch/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ type patchProvider interface {
subscribe(kind TargetObjKind) chan Request
}

func newPatchProvider(rcClient *rcclient.Client, isLeaderNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (patchProvider, error) {
func newPatchProvider(rcClient *rcclient.Client, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (patchProvider, error) {
if pkgconfigsetup.IsRemoteConfigEnabled(pkgconfigsetup.Datadog()) {
return newRemoteConfigProvider(rcClient, isLeaderNotif, telemetryCollector, clusterName)
return newRemoteConfigProvider(rcClient, isLeaderFunc, leadershipStateNotif, telemetryCollector, clusterName)
}
if pkgconfigsetup.Datadog().GetBool("admission_controller.auto_instrumentation.patcher.fallback_to_file_provider") {
// Use the file config provider for e2e testing only (it replaces RC as a source of configs)
file := pkgconfigsetup.Datadog().GetString("admission_controller.auto_instrumentation.patcher.file_provider_path")
return newfileProvider(file, isLeaderNotif, clusterName), nil
return newfileProvider(file, isLeaderFunc, leadershipStateNotif, clusterName), nil
}
return nil, errors.New("remote config is disabled")
}
32 changes: 18 additions & 14 deletions pkg/clusteragent/admission/patch/rc_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,27 @@ import (

// remoteConfigProvider consumes tracing configs from RC and delivers them to the patcher
type remoteConfigProvider struct {
client *rcclient.Client
isLeaderNotif <-chan struct{}
subscribers map[TargetObjKind]chan Request
clusterName string
telemetryCollector telemetry.TelemetryCollector
client *rcclient.Client
leadershipStateNotif <-chan struct{}
isLeaderFunc func() bool
subscribers map[TargetObjKind]chan Request
clusterName string
telemetryCollector telemetry.TelemetryCollector
}

var _ patchProvider = &remoteConfigProvider{}

func newRemoteConfigProvider(client *rcclient.Client, isLeaderNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (*remoteConfigProvider, error) {
func newRemoteConfigProvider(client *rcclient.Client, isLeaderFunc func() bool, leadershipStateNotif <-chan struct{}, telemetryCollector telemetry.TelemetryCollector, clusterName string) (*remoteConfigProvider, error) {
if client == nil {
return nil, errors.New("remote config client not initialized")
}
return &remoteConfigProvider{
client: client,
isLeaderNotif: isLeaderNotif,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
telemetryCollector: telemetryCollector,
client: client,
leadershipStateNotif: leadershipStateNotif,
subscribers: make(map[TargetObjKind]chan Request),
clusterName: clusterName,
telemetryCollector: telemetryCollector,
isLeaderFunc: isLeaderFunc,
}, nil
}

Expand All @@ -48,9 +50,11 @@ func (rcp *remoteConfigProvider) start(stopCh <-chan struct{}) {
rcp.client.Start()
for {
select {
case <-rcp.isLeaderNotif:
log.Info("Got a leader notification, polling from remote-config")
rcp.process(rcp.client.GetConfigs(state.ProductAPMTracing), rcp.client.UpdateApplyStatus)
case <-rcp.leadershipStateNotif:
if rcp.isLeaderFunc() {
log.Info("Got a leader notification, polling from remote-config")
rcp.process(rcp.client.GetConfigs(state.ProductAPMTracing), rcp.client.UpdateApplyStatus)
}
case <-stopCh:
log.Info("Shutting down remote-config patch provider")
rcp.client.Close()
Expand Down
4 changes: 2 additions & 2 deletions pkg/clusteragent/admission/patch/rc_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"testing"

"github.com/DataDog/datadog-agent/pkg/clusteragent/telemetry"

rcclient "github.com/DataDog/datadog-agent/pkg/config/remote/client"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"

"github.com/stretchr/testify/require"
)

Expand All @@ -40,7 +40,7 @@ func TestProcess(t *testing.T) {
`
return []byte(fmt.Sprintf(base, cluster, kind))
}
rcp, err := newRemoteConfigProvider(&rcclient.Client{}, make(chan struct{}), telemetry.NewNoopCollector(), "dev")
rcp, err := newRemoteConfigProvider(&rcclient.Client{}, func() bool { return true }, make(<-chan struct{}), telemetry.NewNoopCollector(), "dev")
require.NoError(t, err)
notifs := rcp.subscribe(KindDeployment)
in := map[string]state.RawConfig{
Expand Down
Loading
Loading