From 3ceef38aed1e83fb574401b19a3625cdb2c26d98 Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Thu, 19 Dec 2024 13:30:21 +0100 Subject: [PATCH] Add retry ability to configsync comp --- cmd/otel-agent/command/command.go | 6 +-- cmd/otel-agent/subcommands/run/command.go | 8 +--- cmd/otel-agent/subcommands/subcommands.go | 14 +++--- cmd/process-agent/command/main_common.go | 2 +- cmd/security-agent/main_windows.go | 2 +- .../subcommands/start/command.go | 2 +- cmd/trace-agent/subcommands/run/command.go | 2 +- comp/core/configsync/configsyncimpl/module.go | 46 +++++++++---------- .../configsyncimpl/module_integration_test.go | 2 +- .../configsync/configsyncimpl/module_test.go | 9 ++-- comp/core/configsync/configsyncimpl/params.go | 21 ++++++--- comp/core/configsync/configsyncimpl/sync.go | 10 ++-- .../configsync/configsyncimpl/test_common.go | 2 +- comp/forwarder/defaultforwarder/forwarder.go | 15 ++---- 14 files changed, 70 insertions(+), 71 deletions(-) diff --git a/cmd/otel-agent/command/command.go b/cmd/otel-agent/command/command.go index 94e3d2c3f70e3a..7ea79949cb799a 100644 --- a/cmd/otel-agent/command/command.go +++ b/cmd/otel-agent/command/command.go @@ -82,7 +82,7 @@ func makeCommands(globalParams *subcommands.GlobalParams) *cobra.Command { const configFlag = "config" const coreConfigFlag = "core-config" -const syncDelayFlag = "sync-delay" +const syncDelayFlag = "sync-delay" // TODO: Change this to sync-on-init-timeout const syncTimeoutFlag = "sync-to" func flags(reg *featuregate.Registry, cfgs *subcommands.GlobalParams) *flag.FlagSet { @@ -90,8 +90,8 @@ func flags(reg *featuregate.Registry, cfgs *subcommands.GlobalParams) *flag.Flag flagSet.Var(cfgs, configFlag, "Locations to the config file(s), note that only a"+ " single location can be set per flag entry e.g. `--config=file:/path/to/first --config=file:path/to/second`.") flagSet.StringVar(&cfgs.CoreConfPath, coreConfigFlag, "", "Location to the Datadog Agent config file.") - flagSet.DurationVar(&cfgs.SyncDelay, syncDelayFlag, 0, "Delay before first config sync.") - flagSet.DurationVar(&cfgs.SyncTimeout, syncTimeoutFlag, 3*time.Second, "Timeout for sync requests.") + flagSet.DurationVar(&cfgs.SyncOnInitTimeout, syncDelayFlag, 0, "How long should config sync retry at initialization before failing.") + flagSet.DurationVar(&cfgs.SyncTimeout, syncTimeoutFlag, 3*time.Second, "Timeout for config sync requests.") flagSet.Func("set", "Set arbitrary component config property. The component has to be defined in the config file and the flag"+ diff --git a/cmd/otel-agent/subcommands/run/command.go b/cmd/otel-agent/subcommands/run/command.go index 08c14729981574..99a5c3aff1befb 100644 --- a/cmd/otel-agent/subcommands/run/command.go +++ b/cmd/otel-agent/subcommands/run/command.go @@ -191,13 +191,7 @@ func runOTelAgentCommand(ctx context.Context, params *subcommands.GlobalParams, fx.Invoke(func(_ collectordef.Component, _ defaultforwarder.Forwarder, _ optional.Option[logsagentpipeline.Component]) { }), - // TODO: don't rely on this pattern; remove this `OptionalModuleWithParams` thing - // and instead adapt OptionalModule to allow parameter passing naturally. - // See: https://github.com/DataDog/datadog-agent/pull/28386 - configsyncimpl.OptionalModuleWithParams(), - fx.Provide(func() configsyncimpl.Params { - return configsyncimpl.NewParams(params.SyncTimeout, params.SyncDelay, true) - }), + configsyncimpl.OptionalModule(configsyncimpl.NewParams(params.SyncTimeout, true, params.SyncOnInitTimeout)), remoteTaggerFx.Module(tagger.RemoteParams{ RemoteTarget: func(c coreconfig.Component) (string, error) { return fmt.Sprintf(":%v", c.GetInt("cmd_port")), nil }, diff --git a/cmd/otel-agent/subcommands/subcommands.go b/cmd/otel-agent/subcommands/subcommands.go index 1d3dbe688724f5..4abed724947ee6 100644 --- a/cmd/otel-agent/subcommands/subcommands.go +++ b/cmd/otel-agent/subcommands/subcommands.go @@ -16,13 +16,13 @@ import ( // A pointer to this type is passed to SubcommandFactory's, but its contents // are not valid until Cobra calls the subcommand's Run or RunE function. type GlobalParams struct { - ConfPaths []string - Sets []string - CoreConfPath string - ConfigName string - LoggerName string - SyncDelay time.Duration - SyncTimeout time.Duration + ConfPaths []string + Sets []string + CoreConfPath string + ConfigName string + LoggerName string + SyncOnInitTimeout time.Duration + SyncTimeout time.Duration } // Set is called by Cobra when a flag is set. diff --git a/cmd/process-agent/command/main_common.go b/cmd/process-agent/command/main_common.go index dcb9287e74f04b..d0f79399b412c2 100644 --- a/cmd/process-agent/command/main_common.go +++ b/cmd/process-agent/command/main_common.go @@ -156,7 +156,7 @@ func runApp(ctx context.Context, globalParams *GlobalParams) error { fetchonlyimpl.Module(), // Provide configsync module - configsyncimpl.OptionalModule(), + configsyncimpl.OptionalModule(configsyncimpl.NewDefaultParams()), // Provide autoexit module autoexitimpl.Module(), diff --git a/cmd/security-agent/main_windows.go b/cmd/security-agent/main_windows.go index a729e7a2003f01..079866beb75192 100644 --- a/cmd/security-agent/main_windows.go +++ b/cmd/security-agent/main_windows.go @@ -166,7 +166,7 @@ func (s *service) Run(svcctx context.Context) error { statusimpl.Module(), fetchonlyimpl.Module(), - configsyncimpl.OptionalModule(), + configsyncimpl.OptionalModule(configsyncimpl.NewDefaultParams()), // Force the instantiation of the component fx.Invoke(func(_ optional.Option[configsync.Component]) {}), autoexitimpl.Module(), diff --git a/cmd/security-agent/subcommands/start/command.go b/cmd/security-agent/subcommands/start/command.go index 2a3dd883dcd844..4f1493ada77c9c 100644 --- a/cmd/security-agent/subcommands/start/command.go +++ b/cmd/security-agent/subcommands/start/command.go @@ -174,7 +174,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command { }), statusimpl.Module(), fetchonlyimpl.Module(), - configsyncimpl.OptionalModule(), + configsyncimpl.OptionalModule(configsyncimpl.NewDefaultParams()), // Force the instantiation of the component fx.Invoke(func(_ optional.Option[configsync.Component]) {}), autoexitimpl.Module(), diff --git a/cmd/trace-agent/subcommands/run/command.go b/cmd/trace-agent/subcommands/run/command.go index 1d0788501ffae2..fbec5d7a6aa34d 100644 --- a/cmd/trace-agent/subcommands/run/command.go +++ b/cmd/trace-agent/subcommands/run/command.go @@ -113,7 +113,7 @@ func runTraceAgentProcess(ctx context.Context, cliParams *Params, defaultConfPat zstdfx.Module(), trace.Bundle(), fetchonlyimpl.Module(), - configsyncimpl.OptionalModule(), + configsyncimpl.OptionalModule(configsyncimpl.NewDefaultParams()), // Force the instantiation of the components fx.Invoke(func(_ traceagent.Component, _ optional.Option[configsync.Component], _ autoexit.Component) {}), ) diff --git a/comp/core/configsync/configsyncimpl/module.go b/comp/core/configsync/configsyncimpl/module.go index 0b77c1370de204..097d2c132f4b23 100644 --- a/comp/core/configsync/configsyncimpl/module.go +++ b/comp/core/configsync/configsyncimpl/module.go @@ -36,19 +36,10 @@ type dependencies struct { } // OptionalModule defines the fx options for this component. -func OptionalModule() fxutil.Module { - return fxutil.Component( - fx.Provide(newOptionalConfigSync), - fx.Supply(Params{}), - ) -} - -// OptionalModuleWithParams defines the fx options for this component, but -// requires additionally specifying custom Params from the fx App, to be -// passed to the constructor. -func OptionalModuleWithParams() fxutil.Module { +func OptionalModule(params Params) fxutil.Module { return fxutil.Component( fx.Provide(newOptionalConfigSync), + fx.Supply(params), ) } @@ -64,21 +55,23 @@ type configSync struct { } // newOptionalConfigSync checks if the component was enabled as per the config, and returns an optional.Option -func newOptionalConfigSync(deps dependencies) optional.Option[configsync.Component] { +func newOptionalConfigSync(deps dependencies) (optional.Option[configsync.Component], error) { agentIPCPort := deps.Config.GetInt("agent_ipc.port") configRefreshIntervalSec := deps.Config.GetInt("agent_ipc.config_refresh_interval") if agentIPCPort <= 0 || configRefreshIntervalSec <= 0 { - return optional.NewNoneOption[configsync.Component]() + deps.Log.Infof("configsync disabled (agent_ipc.port: %d | agent_ipc.config_refresh_interval: %d)", agentIPCPort, configRefreshIntervalSec) + return optional.NewNoneOption[configsync.Component](), nil } - configSync := newConfigSync(deps, agentIPCPort, configRefreshIntervalSec) - return optional.NewOption(configSync) + deps.Log.Infof("configsync enabled (agent_ipc '%s:%d' | agent_ipc.config_refresh_interval: %d)", deps.Config.GetString("agent_ipc.host"), agentIPCPort, configRefreshIntervalSec) + configSync, err := newConfigSync(deps, agentIPCPort, configRefreshIntervalSec) + return optional.NewOption(configSync), err } // newConfigSync creates a new configSync component. // agentIPCPort and configRefreshIntervalSec must be strictly positive. -func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec int) configsync.Component { +func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec int) (configsync.Component, error) { agentIPCHost := deps.Config.GetString("agent_ipc.host") url := &url.URL{ @@ -100,17 +93,20 @@ func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec ctx: ctx, } - if deps.SyncParams.OnInit { - if deps.SyncParams.Delay != 0 { - select { - case <-ctx.Done(): //context cancelled - // TODO: this component should return an error + if deps.SyncParams.OnInitSync { + deps.Log.Infof("triggering configsync on init (will retry for %s)", deps.SyncParams.OnInitSyncTimeout) + deadline := time.Now().Add(deps.SyncParams.OnInitSyncTimeout) + for { + if err := configSync.updater(); err == nil { + break + } + if time.Now().After(deadline) { cancel() - return nil - case <-time.After(deps.SyncParams.Delay): + return nil, deps.Log.Errorf("failed to sync config at startup, is the core agent listening on '%s' ?", url.String()) } + time.Sleep(2 * time.Second) } - configSync.updater() + deps.Log.Infof("triggering configsync on init succeeded") } // start and stop the routine in fx hooks @@ -125,5 +121,5 @@ func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec }, }) - return configSync + return configSync, nil } diff --git a/comp/core/configsync/configsyncimpl/module_integration_test.go b/comp/core/configsync/configsyncimpl/module_integration_test.go index e4cfa8ec1ba637..bee0a23d3ee8f3 100644 --- a/comp/core/configsync/configsyncimpl/module_integration_test.go +++ b/comp/core/configsync/configsyncimpl/module_integration_test.go @@ -47,7 +47,7 @@ func TestOptionalModule(t *testing.T) { csopt := fxutil.Test[optional.Option[configsync.Component]](t, fx.Options( core.MockBundle(), fetchonlyimpl.Module(), - OptionalModule(), + OptionalModule(Params{}), fx.Populate(&cfg), fx.Replace(config.MockParams{Overrides: overrides}), )) diff --git a/comp/core/configsync/configsyncimpl/module_test.go b/comp/core/configsync/configsyncimpl/module_test.go index 076288737a65ef..3cd3aacc22c768 100644 --- a/comp/core/configsync/configsyncimpl/module_test.go +++ b/comp/core/configsync/configsyncimpl/module_test.go @@ -18,7 +18,8 @@ func TestNewOptionalConfigSync(t *testing.T) { deps := makeDeps(t) deps.Config.Set("agent_ipc.port", 1234, pkgconfigmodel.SourceFile) deps.Config.Set("agent_ipc.config_refresh_interval", 30, pkgconfigmodel.SourceFile) - optConfigSync := newOptionalConfigSync(deps) + optConfigSync, err := newOptionalConfigSync(deps) + require.NoError(t, err) _, ok := optConfigSync.Get() require.True(t, ok) }) @@ -26,7 +27,8 @@ func TestNewOptionalConfigSync(t *testing.T) { t.Run("disabled ipc port zero", func(t *testing.T) { deps := makeDeps(t) deps.Config.Set("agent_ipc.port", 0, pkgconfigmodel.SourceFile) - optConfigSync := newOptionalConfigSync(deps) + optConfigSync, err := newOptionalConfigSync(deps) + require.NoError(t, err) _, ok := optConfigSync.Get() require.False(t, ok) }) @@ -34,7 +36,8 @@ func TestNewOptionalConfigSync(t *testing.T) { t.Run("disabled config refresh interval zero", func(t *testing.T) { deps := makeDeps(t) deps.Config.Set("agent_ipc.config_refresh_interval", 0, pkgconfigmodel.SourceFile) - optConfigSync := newOptionalConfigSync(deps) + optConfigSync, err := newOptionalConfigSync(deps) + require.NoError(t, err) _, ok := optConfigSync.Get() require.False(t, ok) }) diff --git a/comp/core/configsync/configsyncimpl/params.go b/comp/core/configsync/configsyncimpl/params.go index 79442bce595371..84de8bfeeb2fb4 100644 --- a/comp/core/configsync/configsyncimpl/params.go +++ b/comp/core/configsync/configsyncimpl/params.go @@ -10,17 +10,26 @@ import "time" // Params defines the parameters for the configsync component. type Params struct { + // Timeout is the timeout use for each call to the core-agent Timeout time.Duration - Delay time.Duration - OnInit bool + // OnInitSync makes configsync synchronize the configuration at initialization and fails init if we can get the + // configuration from the core agent + OnInitSync bool + // OnInitSyncTimeout represents how long configsync should retry to synchronize configuration at init + OnInitSyncTimeout time.Duration } // NewParams creates a new instance of Params -func NewParams(to time.Duration, delay time.Duration, sync bool) Params { +func NewParams(syncTimeout time.Duration, syncOnInit bool, syncOnInitTimeout time.Duration) Params { params := Params{ - Timeout: to, - Delay: delay, - OnInit: sync, + Timeout: syncTimeout, + OnInitSync: syncOnInit, + OnInitSyncTimeout: syncOnInitTimeout, } return params } + +// NewDefaultParams returns the default params for configsync +func NewDefaultParams() Params { + return Params{} +} diff --git a/comp/core/configsync/configsyncimpl/sync.go b/comp/core/configsync/configsyncimpl/sync.go index e22cdeae254ce8..fab7228dd305f4 100644 --- a/comp/core/configsync/configsyncimpl/sync.go +++ b/comp/core/configsync/configsyncimpl/sync.go @@ -17,16 +17,17 @@ import ( pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model" ) -func (cs *configSync) updater() { +func (cs *configSync) updater() error { + cs.Log.Debugf("Pulling new configuration from agent-core at '%s'", cs.url.String()) cfg, err := fetchConfig(cs.ctx, cs.client, cs.Authtoken.Get(), cs.url.String()) if err != nil { if cs.connected { - cs.Log.Warnf("Failed to fetch config from core agent: %v", err) + cs.Log.Warnf("Loosed connectivity to core-agent to fetch config: %v", err) cs.connected = false } else { cs.Log.Debugf("Failed to fetch config from core agent: %v", err) } - return + return err } if cs.connected { @@ -62,6 +63,7 @@ func (cs *configSync) updater() { updateConfig(cs.Config, key, value) } } + return nil } func (cs *configSync) runWithInterval(refreshInterval time.Duration) { @@ -80,7 +82,7 @@ func (cs *configSync) runWithChan(ch <-chan time.Time) { case <-cs.ctx.Done(): return case <-ch: - cs.updater() + _ = cs.updater() } } } diff --git a/comp/core/configsync/configsyncimpl/test_common.go b/comp/core/configsync/configsyncimpl/test_common.go index c6699ef0f1a025..19a7c9108f8c16 100644 --- a/comp/core/configsync/configsyncimpl/test_common.go +++ b/comp/core/configsync/configsyncimpl/test_common.go @@ -28,7 +28,7 @@ func makeDeps(t *testing.T) dependencies { return fxutil.Test[dependencies](t, fx.Options( core.MockBundle(), fetchonlyimpl.MockModule(), // use the mock to avoid trying to read the file - fx.Supply(NewParams(0, 0, false)), + fx.Supply(NewParams(0, false, 0)), )) } diff --git a/comp/forwarder/defaultforwarder/forwarder.go b/comp/forwarder/defaultforwarder/forwarder.go index 64cc62cd636dfa..9cacf54934a093 100644 --- a/comp/forwarder/defaultforwarder/forwarder.go +++ b/comp/forwarder/defaultforwarder/forwarder.go @@ -46,7 +46,11 @@ func newForwarder(dep dependencies) provides { func createOptions(params Params, config config.Component, log log.Component) *Options { var options *Options - keysPerDomain := getMultipleEndpoints(config, log) + keysPerDomain, err := utils.GetMultipleEndpoints(config) + if err != nil { + log.Error("Misconfiguration of agent endpoints: ", err) + } + if !params.withResolver { options = NewOptions(config, log, keysPerDomain) } else { @@ -61,15 +65,6 @@ func createOptions(params Params, config config.Component, log log.Component) *O return options } -func getMultipleEndpoints(config config.Component, log log.Component) map[string][]string { - // Inject the config to make sure we can call GetMultipleEndpoints. - keysPerDomain, err := utils.GetMultipleEndpoints(config) - if err != nil { - log.Error("Misconfiguration of agent endpoints: ", err) - } - return keysPerDomain -} - // NewForwarder returns a new forwarder component. // //nolint:revive