Skip to content

Commit

Permalink
[ASCII-2048] use telemetry component inside the autodiscovery compone…
Browse files Browse the repository at this point in the history
…nt (#27805)
  • Loading branch information
GustavoCaso authored and jackgopack4 committed Jul 29, 2024
1 parent 17e62f3 commit 8710454
Show file tree
Hide file tree
Showing 54 changed files with 366 additions and 172 deletions.
7 changes: 5 additions & 2 deletions cmd/agent/common/autodiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ var (

func setupAutoDiscovery(confSearchPaths []string, wmeta workloadmeta.Component, ac autodiscovery.Component) {
providers.InitConfigFilesReader(confSearchPaths)

acTelemetryStore := ac.GetTelemetryStore()

ac.AddConfigProvider(
providers.NewFileConfigProvider(),
providers.NewFileConfigProvider(acTelemetryStore),
config.Datadog().GetBool("autoconf_config_files_poll"),
time.Duration(config.Datadog().GetInt("autoconf_config_files_poll_interval"))*time.Second,
)
Expand Down Expand Up @@ -107,7 +110,7 @@ func setupAutoDiscovery(confSearchPaths []string, wmeta workloadmeta.Component,
for _, cp := range uniqueConfigProviders {
factory, found := ac.GetProviderCatalog()[cp.Name]
if found {
configProvider, err := factory(&cp, wmeta)
configProvider, err := factory(&cp, wmeta, acTelemetryStore)
if err != nil {
log.Errorf("Error while adding config provider %v: %v", cp.Name, err)
continue
Expand Down
39 changes: 28 additions & 11 deletions comp/core/autodiscovery/autodiscoveryimpl/autoconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ import (
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/scheduler"
autodiscoveryStatus "github.com/DataDog/datadog-agent/comp/core/autodiscovery/status"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
acTelemetry "github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
configComponent "github.com/DataDog/datadog-agent/comp/core/config"
flaretypes "github.com/DataDog/datadog-agent/comp/core/flare/types"
logComp "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/secrets"
"github.com/DataDog/datadog-agent/comp/core/status"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
"github.com/DataDog/datadog-agent/pkg/collector/check"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand All @@ -58,6 +59,7 @@ type dependencies struct {
TaggerComp tagger.Component
Secrets secrets.Component
WMeta optional.Option[workloadmeta.Component]
Telemetry telemetry.Component
}

// AutoConfig implements the agent's autodiscovery mechanism. It is
Expand All @@ -82,6 +84,7 @@ type AutoConfig struct {
wmeta optional.Option[workloadmeta.Component]
taggerComp tagger.Component
logs logComp.Component
telemetryStore *acTelemetry.Store

// m covers the `configPollers`, `listenerCandidates`, `listeners`, and `listenerRetryStop`, but
// not the values they point to.
Expand Down Expand Up @@ -123,17 +126,18 @@ func newProvides(deps dependencies) provides {
var _ autodiscovery.Component = (*AutoConfig)(nil)

type listenerCandidate struct {
factory listeners.ServiceListenerFactory
config listeners.Config
factory listeners.ServiceListenerFactory
config listeners.Config
telemetryStore *acTelemetry.Store
}

func (l *listenerCandidate) try() (listeners.ServiceListener, error) {
return l.factory(l.config)
return l.factory(l.config, l.telemetryStore)
}

// newAutoConfig creates an AutoConfig instance and starts it.
func newAutoConfig(deps dependencies) autodiscovery.Component {
ac := createNewAutoConfig(scheduler.NewController(), deps.Secrets, deps.WMeta, deps.TaggerComp, deps.Log)
ac := createNewAutoConfig(scheduler.NewController(), deps.Secrets, deps.WMeta, deps.TaggerComp, deps.Log, deps.Telemetry)
deps.Lc.Append(fx.Hook{
OnStart: func(c context.Context) error {
ac.Start()
Expand All @@ -148,7 +152,7 @@ func newAutoConfig(deps dependencies) autodiscovery.Component {
}

// createNewAutoConfig creates an AutoConfig instance (without starting).
func createNewAutoConfig(schedulerController *scheduler.Controller, secretResolver secrets.Component, wmeta optional.Option[workloadmeta.Component], taggerComp tagger.Component, logs logComp.Component) *AutoConfig {
func createNewAutoConfig(schedulerController *scheduler.Controller, secretResolver secrets.Component, wmeta optional.Option[workloadmeta.Component], taggerComp tagger.Component, logs logComp.Component, telemetryComp telemetry.Component) *AutoConfig {
cfgMgr := newReconcilingConfigManager(secretResolver)
ac := &AutoConfig{
configPollers: make([]*configPoller, 0, 9),
Expand All @@ -168,6 +172,7 @@ func createNewAutoConfig(schedulerController *scheduler.Controller, secretResolv
wmeta: wmeta,
taggerComp: taggerComp,
logs: logs,
telemetryStore: acTelemetry.NewStore(telemetryComp),
}
return ac
}
Expand Down Expand Up @@ -386,7 +391,7 @@ func (ac *AutoConfig) AddConfigProvider(provider providers.ConfigProvider, shoul
log.Warnf("Polling interval <= 0 for AD provider: %s, deactivating polling", provider.String())
shouldPoll = false
}
cp := newConfigPoller(provider, shouldPoll, pollInterval)
cp := newConfigPoller(provider, shouldPoll, pollInterval, ac.telemetryStore)

ac.m.Lock()
defer ac.m.Unlock()
Expand Down Expand Up @@ -448,6 +453,11 @@ func (ac *AutoConfig) GetAllConfigs() []integration.Config {
return configs
}

// GetTelemetryStore returns autodiscovery telemetry store.
func (ac *AutoConfig) GetTelemetryStore() *acTelemetry.Store {
return ac.telemetryStore
}

// processNewConfig store (in template cache) and resolves a given config,
// returning the changes to be made.
func (ac *AutoConfig) processNewConfig(config integration.Config) integration.ConfigChanges {
Expand Down Expand Up @@ -497,7 +507,7 @@ func (ac *AutoConfig) addListenerCandidates(listenerConfigs []config.Listeners)
continue
}
log.Debugf("Listener %s was registered", c.Name)
ac.listenerCandidates[c.Name] = &listenerCandidate{factory: factory, config: &c}
ac.listenerCandidates[c.Name] = &listenerCandidate{factory: factory, config: &c, telemetryStore: ac.telemetryStore}
}
}

Expand Down Expand Up @@ -658,17 +668,23 @@ func (ac *AutoConfig) GetAutodiscoveryErrors() map[string]map[string]providers.E

// applyChanges applies a configChanges object. This always unschedules first.
func (ac *AutoConfig) applyChanges(changes integration.ConfigChanges) {
telemetryStorePresent := ac.telemetryStore != nil

if len(changes.Unschedule) > 0 {
for _, conf := range changes.Unschedule {
log.Tracef("Unscheduling %s\n", conf.Dump(false))
telemetry.ScheduledConfigs.Dec(conf.Provider, configType(conf))
if telemetryStorePresent {
ac.telemetryStore.ScheduledConfigs.Dec(conf.Provider, configType(conf))
}
}
}

if len(changes.Schedule) > 0 {
for _, conf := range changes.Schedule {
log.Tracef("Scheduling %s\n", conf.Dump(false))
telemetry.ScheduledConfigs.Inc(conf.Provider, configType(conf))
if telemetryStorePresent {
ac.telemetryStore.ScheduledConfigs.Inc(conf.Provider, configType(conf))
}
}
}
ac.schedulerController.ApplyChanges(changes)
Expand Down Expand Up @@ -722,6 +738,7 @@ type optionalModuleDeps struct {
TaggerComp optional.Option[tagger.Component]
Secrets secrets.Component
WMeta optional.Option[workloadmeta.Component]
Telemetry telemetry.Component
}

// OptionalModule defines the fx options when ac should be used as an optional and not started
Expand All @@ -739,5 +756,5 @@ func newOptionalAutoConfig(deps optionalModuleDeps) optional.Option[autodiscover
return optional.NewNoneOption[autodiscovery.Component]()
}
return optional.NewOption[autodiscovery.Component](
createNewAutoConfig(scheduler.NewController(), deps.Secrets, deps.WMeta, taggerComp, deps.Log))
createNewAutoConfig(scheduler.NewController(), deps.Secrets, deps.WMeta, taggerComp, deps.Log, deps.Telemetry))
}
4 changes: 3 additions & 1 deletion comp/core/autodiscovery/autodiscoveryimpl/autoconfig_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/optional"
Expand All @@ -40,6 +41,7 @@ type mockdependencies struct {
Params MockParams
TaggerComp tagger.Mock
LogsComp log.Component
Telemetry telemetry.Component
}

type mockprovides struct {
Expand All @@ -50,7 +52,7 @@ type mockprovides struct {
}

func newMockAutoConfig(deps mockdependencies) mockprovides {
ac := createNewAutoConfig(deps.Params.Scheduler, nil, deps.WMeta, deps.TaggerComp, deps.LogsComp)
ac := createNewAutoConfig(deps.Params.Scheduler, nil, deps.WMeta, deps.TaggerComp, deps.LogsComp, deps.Telemetry)
return mockprovides{
Comp: ac,
Endpoint: api.NewAgentEndpointProvider(ac.mockHandleRequest, "/config-check", "GET"),
Expand Down
29 changes: 16 additions & 13 deletions comp/core/autodiscovery/autodiscoveryimpl/autoconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/scheduler"
acTelemetry "github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
log "github.com/DataDog/datadog-agent/comp/core/log/def"
"github.com/DataDog/datadog-agent/comp/core/secrets"
"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/taggerimpl"
"github.com/DataDog/datadog-agent/comp/core/telemetry"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
workloadmetafxmock "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx-mock"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
Expand Down Expand Up @@ -80,7 +82,7 @@ func (l *MockListener) Stop() {
l.stopReceived = true
}

func (l *MockListener) fakeFactory(listeners.Config) (listeners.ServiceListener, error) {
func (l *MockListener) fakeFactory(listeners.Config, *acTelemetry.Store) (listeners.ServiceListener, error) {
return l, nil
}

Expand All @@ -96,7 +98,7 @@ type factoryMock struct {
returnError error
}

func (o *factoryMock) make(listeners.Config) (listeners.ServiceListener, error) {
func (o *factoryMock) make(listeners.Config, *acTelemetry.Store) (listeners.ServiceListener, error) {
o.Lock()
defer o.Unlock()
if o.callChan != nil {
Expand Down Expand Up @@ -186,15 +188,15 @@ func (suite *AutoConfigTestSuite) SetupTest() {
suite.deps = createDeps(suite.T())
}

func getAutoConfig(schedulerController *scheduler.Controller, secretResolver secrets.Component, wmeta optional.Option[workloadmeta.Component], taggerComp tagger.Component, logsComp log.Component) *AutoConfig {
ac := createNewAutoConfig(schedulerController, secretResolver, wmeta, taggerComp, logsComp)
func getAutoConfig(schedulerController *scheduler.Controller, secretResolver secrets.Component, wmeta optional.Option[workloadmeta.Component], taggerComp tagger.Component, logsComp log.Component, telemetryComp telemetry.Component) *AutoConfig {
ac := createNewAutoConfig(schedulerController, secretResolver, wmeta, taggerComp, logsComp, telemetryComp)
go ac.serviceListening()
return ac
}

func (suite *AutoConfigTestSuite) TestAddConfigProvider() {
mockResolver := MockSecretResolver{suite.T(), nil}
ac := getAutoConfig(scheduler.NewController(), &mockResolver, suite.deps.WMeta, suite.deps.TaggerComp, suite.deps.LogsComp)
ac := getAutoConfig(scheduler.NewController(), &mockResolver, suite.deps.WMeta, suite.deps.TaggerComp, suite.deps.LogsComp, suite.deps.Telemetry)
assert.Len(suite.T(), ac.configPollers, 0)
mp := &MockProvider{}
ac.AddConfigProvider(mp, false, 0)
Expand All @@ -211,7 +213,7 @@ func (suite *AutoConfigTestSuite) TestAddConfigProvider() {

func (suite *AutoConfigTestSuite) TestAddListener() {
mockResolver := MockSecretResolver{suite.T(), nil}
ac := getAutoConfig(scheduler.NewController(), &mockResolver, suite.deps.WMeta, suite.deps.TaggerComp, suite.deps.LogsComp)
ac := getAutoConfig(scheduler.NewController(), &mockResolver, suite.deps.WMeta, suite.deps.TaggerComp, suite.deps.LogsComp, suite.deps.Telemetry)
assert.Len(suite.T(), ac.listeners, 0)

ml := &MockListener{}
Expand Down Expand Up @@ -249,7 +251,7 @@ func (suite *AutoConfigTestSuite) TestDiffConfigs() {

func (suite *AutoConfigTestSuite) TestStop() {
mockResolver := MockSecretResolver{suite.T(), nil}
ac := getAutoConfig(scheduler.NewController(), &mockResolver, suite.deps.WMeta, suite.deps.TaggerComp, suite.deps.LogsComp)
ac := getAutoConfig(scheduler.NewController(), &mockResolver, suite.deps.WMeta, suite.deps.TaggerComp, suite.deps.LogsComp, suite.deps.Telemetry)

ml := &MockListener{}
listeners.Register("mock", ml.fakeFactory, ac.serviceListenerFactories)
Expand All @@ -262,7 +264,7 @@ func (suite *AutoConfigTestSuite) TestStop() {

func (suite *AutoConfigTestSuite) TestListenerRetry() {
mockResolver := MockSecretResolver{suite.T(), nil}
ac := getAutoConfig(scheduler.NewController(), &mockResolver, suite.deps.WMeta, suite.deps.TaggerComp, suite.deps.LogsComp)
ac := getAutoConfig(scheduler.NewController(), &mockResolver, suite.deps.WMeta, suite.deps.TaggerComp, suite.deps.LogsComp, suite.deps.Telemetry)

// Hack the retry delay to shorten the test run time
initialListenerCandidateIntl := listenerCandidateIntl
Expand Down Expand Up @@ -370,7 +372,7 @@ func TestResolveTemplate(t *testing.T) {
msch.Register("mock", sch, false)

mockResolver := MockSecretResolver{t, nil}
ac := getAutoConfig(msch, &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp)
ac := getAutoConfig(msch, &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp, deps.Telemetry)
tpl := integration.Config{
Name: "cpu",
ADIdentifiers: []string{"redis"},
Expand Down Expand Up @@ -407,7 +409,7 @@ func TestRemoveTemplate(t *testing.T) {

mockResolver := MockSecretResolver{t, nil}

ac := getAutoConfig(scheduler.NewController(), &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp)
ac := getAutoConfig(scheduler.NewController(), &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp, deps.Telemetry)
// Add static config
c := integration.Config{
Name: "memory",
Expand Down Expand Up @@ -460,7 +462,7 @@ func TestDecryptConfig(t *testing.T) {
},
}}

ac := getAutoConfig(scheduler.NewController(), &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp)
ac := getAutoConfig(scheduler.NewController(), &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp, deps.Telemetry)
ac.processNewService(ctx, &dummyService{ID: "abcd", ADIdentifiers: []string{"redis"}})

tpl := integration.Config{
Expand Down Expand Up @@ -504,7 +506,7 @@ func TestProcessClusterCheckConfigWithSecrets(t *testing.T) {
returnedError: nil,
},
}}
ac := getAutoConfig(scheduler.NewController(), &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp)
ac := getAutoConfig(scheduler.NewController(), &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp, deps.Telemetry)

tpl := integration.Config{
Provider: names.ClusterChecks,
Expand Down Expand Up @@ -539,7 +541,7 @@ func TestWriteConfigEndpoint(t *testing.T) {
configName := "testConfig"

mockResolver := MockSecretResolver{t, nil}
ac := getAutoConfig(scheduler.NewController(), &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp)
ac := getAutoConfig(scheduler.NewController(), &mockResolver, deps.WMeta, deps.TaggerComp, deps.LogsComp, deps.Telemetry)

tpl := integration.Config{
Provider: names.ClusterChecks,
Expand Down Expand Up @@ -593,6 +595,7 @@ type Deps struct {
WMeta optional.Option[workloadmeta.Component]
TaggerComp tagger.Component
LogsComp log.Component
Telemetry telemetry.Component
}

func createDeps(t *testing.T) Deps {
Expand Down
22 changes: 13 additions & 9 deletions comp/core/autodiscovery/autodiscoveryimpl/config_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,19 @@ type configPoller struct {

stopChan chan struct{}

configsMu sync.Mutex
configs map[uint64]integration.Config
configsMu sync.Mutex
configs map[uint64]integration.Config
telemetryStore *telemetry.Store
}

func newConfigPoller(provider providers.ConfigProvider, canPoll bool, interval time.Duration) *configPoller {
func newConfigPoller(provider providers.ConfigProvider, canPoll bool, interval time.Duration, telemetryStore *telemetry.Store) *configPoller {
return &configPoller{
provider: provider,
configs: make(map[uint64]integration.Config),
canPoll: canPoll,
pollInterval: interval,
stopChan: make(chan struct{}),
provider: provider,
configs: make(map[uint64]integration.Config),
canPoll: canPoll,
pollInterval: interval,
stopChan: make(chan struct{}),
telemetryStore: telemetryStore,
}
}

Expand Down Expand Up @@ -203,7 +205,9 @@ func (cp *configPoller) collectOnce(ctx context.Context, provider providers.Coll
func (cp *configPoller) collect(ctx context.Context, provider providers.CollectingConfigProvider) ([]integration.Config, []integration.Config) {
start := time.Now()
defer func() {
telemetry.PollDuration.Observe(time.Since(start).Seconds(), cp.provider.String())
if cp.telemetryStore != nil {
cp.telemetryStore.PollDuration.Observe(time.Since(start).Seconds(), cp.provider.String())
}
}()

fetched, err := provider.Collect(ctx)
Expand Down
2 changes: 2 additions & 0 deletions comp/core/autodiscovery/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/scheduler"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
"github.com/DataDog/datadog-agent/pkg/config"
)
Expand All @@ -34,6 +35,7 @@ type Component interface {
GetIDOfCheckWithEncryptedSecrets(checkID checkid.ID) checkid.ID
GetAutodiscoveryErrors() map[string]map[string]providers.ErrorMsgSet
GetProviderCatalog() map[string]providers.ConfigProviderFactory
GetTelemetryStore() *telemetry.Store
// TODO (component): deprecate start/stop methods
Start()
Stop()
Expand Down
3 changes: 2 additions & 1 deletion comp/core/autodiscovery/listeners/cloudfoundry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
"github.com/DataDog/datadog-agent/pkg/util/cloudproviders/cloudfoundry"
"github.com/DataDog/datadog-agent/pkg/util/containers"
"github.com/DataDog/datadog-agent/pkg/util/log"
Expand Down Expand Up @@ -50,7 +51,7 @@ type CloudFoundryService struct {
var _ Service = &CloudFoundryService{}

// NewCloudFoundryListener creates a CloudFoundryListener
func NewCloudFoundryListener(Config) (ServiceListener, error) {
func NewCloudFoundryListener(Config, *telemetry.Store) (ServiceListener, error) {
bbsCache, err := cloudfoundry.GetGlobalBBSCache()
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 8710454

Please sign in to comment.