Skip to content

Commit

Permalink
Cleanup K8S APIClient clients and informers, fix short watch on Helm …
Browse files Browse the repository at this point in the history
…and Orch. Explorer informers
  • Loading branch information
vboulineau committed Nov 22, 2023
1 parent 5df3863 commit d6df23a
Show file tree
Hide file tree
Showing 20 changed files with 155 additions and 261 deletions.
19 changes: 8 additions & 11 deletions cmd/cluster-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func start(log log.Component, config config.Component, telemetry telemetry.Compo
}()

// Setup healthcheck port
var healthPort = pkgconfig.Datadog.GetInt("health_port")
healthPort := pkgconfig.Datadog.GetInt("health_port")
if healthPort > 0 {
err := healthprobe.Serve(mainCtx, healthPort)
if err != nil {
Expand Down Expand Up @@ -219,15 +219,13 @@ func start(log log.Component, config config.Component, telemetry telemetry.Compo
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "datadog-cluster-agent"})

ctx := apiserver.ControllerContext{
InformerFactory: apiCl.InformerFactory,
WPAClient: apiCl.WPAClient,
WPAInformerFactory: apiCl.WPAInformerFactory,
DDClient: apiCl.DDClient,
DDInformerFactory: apiCl.DynamicInformerFactory,
Client: apiCl.Cl,
IsLeaderFunc: le.IsLeader,
EventRecorder: eventRecorder,
StopCh: stopCh,
InformerFactory: apiCl.InformerFactory,
DynamicClient: apiCl.DynamicInformerCl,
DynamicInformerFactory: apiCl.DynamicInformerFactory,
Client: apiCl.InformerCl,
IsLeaderFunc: le.IsLeader,
EventRecorder: eventRecorder,
StopCh: stopCh,
}

if aggErr := apiserver.StartControllers(ctx); aggErr != nil {
Expand Down Expand Up @@ -330,7 +328,6 @@ func start(log log.Component, config config.Component, telemetry telemetry.Compo
SecretInformers: apiCl.CertificateSecretInformerFactory,
WebhookInformers: apiCl.WebhookConfigInformerFactory,
Client: apiCl.Cl,
DiscoveryClient: apiCl.DiscoveryCl,
StopCh: stopCh,
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/cluster-agent/subcommands/start/compliance.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func startCompliance(senderManager sender.SenderManager, stopper startstop.Stopp
func wrapKubernetesClient(apiCl *apiserver.APIClient, isLeader func() bool) compliance.KubernetesProvider {
return func(ctx context.Context) (dynamic.Interface, discovery.DiscoveryInterface, error) {
if isLeader() {
return apiCl.DynamicCl, apiCl.DiscoveryCl, nil
return apiCl.DynamicCl, apiCl.Cl.Discovery(), nil
}
return nil, nil, compliance.ErrIncompatibleEnvironment
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/security-agent/subcommands/check/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func dumpComplianceEvents(reportFile string, events []*compliance.CheckEvent) er
if err != nil {
return fmt.Errorf("could not marshal events map: %w", err)
}
if err := os.WriteFile(reportFile, b, 0644); err != nil {
if err := os.WriteFile(reportFile, b, 0o644); err != nil {
return fmt.Errorf("could not write report file in %q: %w", reportFile, err)
}
return nil
Expand Down Expand Up @@ -271,7 +271,7 @@ func complianceKubernetesProvider(_ctx context.Context) (dynamic.Interface, disc
if err != nil {
return nil, nil, err
}
return apiCl.DynamicCl, apiCl.DiscoveryCl, nil
return apiCl.DynamicCl, apiCl.Cl.Discovery(), nil
}

type fakeResolver struct {
Expand Down
6 changes: 2 additions & 4 deletions pkg/clusteragent/admission/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/common"
"github.com/DataDog/datadog-agent/pkg/util/log"

"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
Expand All @@ -31,7 +30,6 @@ type ControllerContext struct {
SecretInformers informers.SharedInformerFactory
WebhookInformers informers.SharedInformerFactory
Client kubernetes.Interface
DiscoveryClient discovery.DiscoveryInterface
StopCh chan struct{}
}

Expand All @@ -58,12 +56,12 @@ func StartControllers(ctx ControllerContext) error {
secretConfig,
)

nsSelectorEnabled, err := useNamespaceSelector(ctx.DiscoveryClient)
nsSelectorEnabled, err := useNamespaceSelector(ctx.Client.Discovery())
if err != nil {
return err
}

v1Enabled, err := UseAdmissionV1(ctx.DiscoveryClient)
v1Enabled, err := UseAdmissionV1(ctx.Client.Discovery())
if err != nil {
return err
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/clusteragent/externalmetrics/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"

Expand Down Expand Up @@ -61,6 +62,7 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient) (
splitBatchBackoffOnErrors := config.Datadog.GetBool("external_metrics_provider.split_batches_with_backoff")
autogenNamespace := common.GetResourcesNamespace()
autogenEnabled := config.Datadog.GetBool("external_metrics_provider.enable_datadogmetric_autogen")
wpaEnabled := config.Datadog.GetBool("external_metrics_provider.wpa_controller")

provider := &datadogMetricProvider{
apiCl: apiCl,
Expand All @@ -80,6 +82,11 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient) (
}
go metricsRetriever.Run(ctx.Done())

var wpaInformer dynamicinformer.DynamicSharedInformerFactory
if wpaEnabled {
wpaInformer = apiCl.DynamicInformerFactory
}

// Start AutoscalerWatcher, only leader will flag DatadogMetrics as Active/Inactive
// WPAInformerFactory is nil when WPA is not used. AutoscalerWatcher will check value itself.
autoscalerWatcher, err := NewAutoscalerWatcher(
Expand All @@ -89,27 +96,25 @@ func NewDatadogMetricProvider(ctx context.Context, apiCl *apiserver.APIClient) (
autogenNamespace,
apiCl.Cl,
apiCl.InformerFactory,
apiCl.WPAInformerFactory,
wpaInformer,
le.IsLeader,
&provider.store,
)
if err != nil {
return nil, fmt.Errorf("Unabled to create DatadogMetricProvider as AutoscalerWatcher failed with: %v", err)
}
apiCl.InformerFactory.Start(ctx.Done())
if apiCl.WPAInformerFactory != nil {
apiCl.WPAInformerFactory.Start(ctx.Done())
}
go autoscalerWatcher.Run(ctx.Done())

// We shift controller refresh period from retrieverRefreshPeriod to maximize the probability to have new data from DD
controller, err := NewDatadogMetricController(apiCl.DDClient, apiCl.DynamicInformerFactory, le.IsLeader, &provider.store)
controller, err := NewDatadogMetricController(apiCl.DynamicCl, apiCl.DynamicInformerFactory, le.IsLeader, &provider.store)
if err != nil {
return nil, fmt.Errorf("Unable to create DatadogMetricProvider as DatadogMetric Controller failed with: %v", err)
}

// Start informers & controllers (informers can be started multiple times)
apiCl.DynamicInformerFactory.Start(ctx.Done())
apiCl.InformerFactory.Start(ctx.Done())

go autoscalerWatcher.Run(ctx.Done())
go controller.Run(ctx)

return provider, nil
Expand Down
10 changes: 4 additions & 6 deletions pkg/collector/corechecks/cluster/helm/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/metrics/servicecheck"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/pointer"
)

const (
Expand Down Expand Up @@ -119,7 +120,6 @@ func (hc *HelmCheck) Configure(senderManager sender.SenderManager, integrationCo

hc.setSharedInformerFactory(apiClient)
hc.startTS = time.Now()

hc.informersStopCh = make(chan struct{})

return nil
Expand Down Expand Up @@ -205,13 +205,11 @@ func (hc *HelmCheck) setupInformers() error {
}

func (hc *HelmCheck) setSharedInformerFactory(apiClient *apiserver.APIClient) {
hc.informerFactory = informers.NewSharedInformerFactoryWithOptions(
apiClient.Cl,
hc.getInformersResyncPeriod(),
hc.informerFactory = apiClient.GetInformerWithOptions(
pointer.Ptr(hc.getInformersResyncPeriod()),
informers.WithTweakListOptions(func(opts *metav1.ListOptions) {
opts.LabelSelector = labelSelector
}),
)
}))
}

func (hc *HelmCheck) allTags(release *release, storageDriver helmStorage, includeRevision bool) []string {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
// NewAPIServiceFactory returns a new APIService metric family generator factory.
func NewAPIServiceFactory(client *apiserver.APIClient) customresource.RegistryFactory {
return &apiserviceFactory{
client: client.APISClient,
client: client.APISInformerClient,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
// metric family generator factory.
func NewCustomResourceDefinitionFactory(client *apiserver.APIClient) customresource.RegistryFactory {
return &crdFactory{
client: client.CRDClient,
client: client.CRDInformerClient,
}
}

Expand Down
11 changes: 3 additions & 8 deletions pkg/collector/corechecks/cluster/ksm/kubernetes_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func (k *KSMCheck) Configure(senderManager sender.SenderManager, integrationConf
}

// Discover resources that are currently available
resources, err := discoverResources(c.DiscoveryCl)
resources, err := discoverResources(c.Cl.Discovery())
if err != nil {
return err
}
Expand Down Expand Up @@ -294,9 +294,9 @@ func (k *KSMCheck) Configure(senderManager sender.SenderManager, integrationConf

builder.WithFamilyGeneratorFilter(allowDenyList)

builder.WithKubeClient(c.Cl)
builder.WithKubeClient(c.InformerCl)

builder.WithVPAClient(c.VPAClient)
builder.WithVPAClient(c.VPAInformerClient)

ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel
Expand Down Expand Up @@ -417,11 +417,6 @@ func (k *KSMCheck) discoverCustomResources(c *apiserver.APIClient, collectors []
}

func manageResourcesReplacement(c *apiserver.APIClient, factories []customresource.RegistryFactory, resources []*v1.APIResourceList) []customresource.RegistryFactory {
if c.DiscoveryCl == nil {
log.Warn("Kubernetes discovery client has not been properly initialized")
return factories
}

// backwards/forwards compatibility resource factories are only
// registered if they're needed, otherwise they'd overwrite the default
// ones that ship with ksm
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func NewAPIServerDiscoveryProvider() *APIServerDiscoveryProvider {
// Discover returns collectors to enable based on information exposed by the API server.
func (p *APIServerDiscoveryProvider) Discover(inventory *inventory.CollectorInventory) ([]collectors.Collector, error) {
groups, resources, err := GetServerGroupsAndResources()

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -74,7 +73,7 @@ func GetServerGroupsAndResources() ([]*v1.APIGroup, []*v1.APIResourceList, error
return nil, nil, err
}

groups, resources, err := client.DiscoveryCl.ServerGroupsAndResources()
groups, resources, err := client.Cl.Discovery().ServerGroupsAndResources()
if err != nil {
if !discovery.IsGroupDiscoveryFailedError(err) {
return nil, nil, err
Expand Down
17 changes: 9 additions & 8 deletions pkg/collector/corechecks/cluster/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,17 @@ func (o *OrchestratorCheck) Cancel() {
}

func getOrchestratorInformerFactory(apiClient *apiserver.APIClient) *collectors.OrchestratorInformerFactory {
of := &collectors.OrchestratorInformerFactory{
InformerFactory: informers.NewSharedInformerFactory(apiClient.Cl, defaultResyncInterval),
CRDInformerFactory: externalversions.NewSharedInformerFactory(apiClient.CRDClient, defaultResyncInterval),
DynamicInformerFactory: dynamicinformer.NewDynamicSharedInformerFactory(apiClient.DynamicCl, defaultResyncInterval),
VPAInformerFactory: vpai.NewSharedInformerFactory(apiClient.VPAClient, defaultResyncInterval),
}

tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", "").String()
}
of.UnassignedPodInformerFactory = informers.NewSharedInformerFactoryWithOptions(apiClient.Cl, defaultResyncInterval, informers.WithTweakListOptions(tweakListOptions))

of := &collectors.OrchestratorInformerFactory{
InformerFactory: informers.NewSharedInformerFactoryWithOptions(apiClient.InformerCl, defaultResyncInterval),
CRDInformerFactory: externalversions.NewSharedInformerFactory(apiClient.CRDInformerClient, defaultResyncInterval),
DynamicInformerFactory: dynamicinformer.NewDynamicSharedInformerFactory(apiClient.DynamicInformerCl, defaultResyncInterval),
VPAInformerFactory: vpai.NewSharedInformerFactory(apiClient.VPAInformerClient, defaultResyncInterval),
UnassignedPodInformerFactory: informers.NewSharedInformerFactoryWithOptions(apiClient.InformerCl, defaultResyncInterval, informers.WithTweakListOptions(tweakListOptions)),
}

return of
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestOrchestratorCheckSafeReSchedule(t *testing.T) {
client := fake.NewSimpleClientset()
vpaClient := vpa.NewSimpleClientset()
crdClient := crd.NewSimpleClientset()
cl := &apiserver.APIClient{Cl: client, VPAClient: vpaClient, CRDClient: crdClient}
cl := &apiserver.APIClient{InformerCl: client, VPAInformerClient: vpaClient, CRDInformerClient: crdClient}
orchCheck := OrchestratorFactory().(*OrchestratorCheck)
orchCheck.apiClient = cl

Expand Down Expand Up @@ -85,7 +85,6 @@ func TestOrchestratorCheckSafeReSchedule(t *testing.T) {
writeNode(t, client, "2")

assert.True(t, waitTimeout(&wg, 2*time.Second))

}

func writeNode(t *testing.T, client *fake.Clientset, version string) {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ func InitConfig(config Config) {
config.BindEnvAndSetDefault("kubernetes_collect_metadata_tags", true)
config.BindEnvAndSetDefault("kubernetes_metadata_tag_update_freq", 60) // Polling frequency of the Agent to the DCA in seconds (gets the local cache if the DCA is disabled)
config.BindEnvAndSetDefault("kubernetes_apiserver_client_timeout", 10)
config.BindEnvAndSetDefault("kubernetes_apiserver_informer_client_timeout", 0)
config.BindEnvAndSetDefault("kubernetes_map_services_on_ip", false) // temporary opt-out of the new mapping logic
config.BindEnvAndSetDefault("kubernetes_apiserver_use_protobuf", false)
config.BindEnvAndSetDefault("kubernetes_ad_tags_disabled", []string{})
Expand Down
Loading

0 comments on commit d6df23a

Please sign in to comment.