Skip to content

Commit

Permalink
Add retry ability to configsync comp
Browse files Browse the repository at this point in the history
  • Loading branch information
hush-hush committed Dec 20, 2024
1 parent 1c318b7 commit 2a2c5a2
Show file tree
Hide file tree
Showing 14 changed files with 71 additions and 78 deletions.
6 changes: 3 additions & 3 deletions cmd/otel-agent/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ func makeCommands(globalParams *subcommands.GlobalParams) *cobra.Command {

const configFlag = "config"
const coreConfigFlag = "core-config"
const syncDelayFlag = "sync-delay"
const syncDelayFlag = "sync-delay" // TODO: Change this to sync-on-init-timeout
const syncTimeoutFlag = "sync-to"

func flags(reg *featuregate.Registry, cfgs *subcommands.GlobalParams) *flag.FlagSet {
flagSet := new(flag.FlagSet)
flagSet.Var(cfgs, configFlag, "Locations to the config file(s), note that only a"+
" single location can be set per flag entry e.g. `--config=file:/path/to/first --config=file:path/to/second`.")
flagSet.StringVar(&cfgs.CoreConfPath, coreConfigFlag, "", "Location to the Datadog Agent config file.")
flagSet.DurationVar(&cfgs.SyncDelay, syncDelayFlag, 0, "Delay before first config sync.")
flagSet.DurationVar(&cfgs.SyncTimeout, syncTimeoutFlag, 3*time.Second, "Timeout for sync requests.")
flagSet.DurationVar(&cfgs.SyncOnInitTimeout, syncDelayFlag, 0, "How long should config sync retry at initialization before failing.")
flagSet.DurationVar(&cfgs.SyncTimeout, syncTimeoutFlag, 3*time.Second, "Timeout for config sync requests.")

flagSet.Func("set",
"Set arbitrary component config property. The component has to be defined in the config file and the flag"+
Expand Down
16 changes: 2 additions & 14 deletions cmd/otel-agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,7 @@ func runOTelAgentCommand(ctx context.Context, params *subcommands.GlobalParams,
}),
logfx.Module(),
fetchonlyimpl.Module(),
// TODO: don't rely on this pattern; remove this `OptionalModuleWithParams` thing
// and instead adapt OptionalModule to allow parameter passing naturally.
// See: https://github.com/DataDog/datadog-agent/pull/28386
configsyncimpl.OptionalModuleWithParams(),
fx.Provide(func() configsyncimpl.Params {
return configsyncimpl.NewParams(params.SyncTimeout, params.SyncDelay, true)
}),
configsyncimpl.OptionalModule(configsyncimpl.NewParams(params.SyncTimeout, true, params.SyncOnInitTimeout)),
converterfx.Module(),
fx.Provide(func(cp converter.Component, _ optional.Option[configsync.Component]) confmap.Converter {
return cp
Expand Down Expand Up @@ -191,13 +185,7 @@ func runOTelAgentCommand(ctx context.Context, params *subcommands.GlobalParams,
fx.Invoke(func(_ collectordef.Component, _ defaultforwarder.Forwarder, _ optional.Option[logsagentpipeline.Component]) {
}),

// TODO: don't rely on this pattern; remove this `OptionalModuleWithParams` thing
// and instead adapt OptionalModule to allow parameter passing naturally.
// See: https://github.com/DataDog/datadog-agent/pull/28386
configsyncimpl.OptionalModuleWithParams(),
fx.Provide(func() configsyncimpl.Params {
return configsyncimpl.NewParams(params.SyncTimeout, params.SyncDelay, true)
}),
configsyncimpl.OptionalModule(configsyncimpl.NewParams(params.SyncTimeout, true, params.SyncOnInitTimeout)),

remoteTaggerFx.Module(tagger.RemoteParams{
RemoteTarget: func(c coreconfig.Component) (string, error) { return fmt.Sprintf(":%v", c.GetInt("cmd_port")), nil },
Expand Down
14 changes: 7 additions & 7 deletions cmd/otel-agent/subcommands/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
// A pointer to this type is passed to SubcommandFactory's, but its contents
// are not valid until Cobra calls the subcommand's Run or RunE function.
type GlobalParams struct {
ConfPaths []string
Sets []string
CoreConfPath string
ConfigName string
LoggerName string
SyncDelay time.Duration
SyncTimeout time.Duration
ConfPaths []string
Sets []string
CoreConfPath string
ConfigName string
LoggerName string
SyncOnInitTimeout time.Duration
SyncTimeout time.Duration
}

// Set is called by Cobra when a flag is set.
Expand Down
2 changes: 1 addition & 1 deletion cmd/process-agent/command/main_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func runApp(ctx context.Context, globalParams *GlobalParams) error {
fetchonlyimpl.Module(),

// Provide configsync module
configsyncimpl.OptionalModule(),
configsyncimpl.OptionalModule(configsyncimpl.NewDefaultParams()),

// Provide autoexit module
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/security-agent/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *service) Run(svcctx context.Context) error {
statusimpl.Module(),

fetchonlyimpl.Module(),
configsyncimpl.OptionalModule(),
configsyncimpl.OptionalModule(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the component
fx.Invoke(func(_ optional.Option[configsync.Component]) {}),
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/security-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
}),
statusimpl.Module(),
fetchonlyimpl.Module(),
configsyncimpl.OptionalModule(),
configsyncimpl.OptionalModule(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the component
fx.Invoke(func(_ optional.Option[configsync.Component]) {}),
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/trace-agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func runTraceAgentProcess(ctx context.Context, cliParams *Params, defaultConfPat
zstdfx.Module(),
trace.Bundle(),
fetchonlyimpl.Module(),
configsyncimpl.OptionalModule(),
configsyncimpl.OptionalModule(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the components
fx.Invoke(func(_ traceagent.Component, _ optional.Option[configsync.Component], _ autoexit.Component) {}),
)
Expand Down
46 changes: 21 additions & 25 deletions comp/core/configsync/configsyncimpl/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,10 @@ type dependencies struct {
}

// OptionalModule defines the fx options for this component.
func OptionalModule() fxutil.Module {
return fxutil.Component(
fx.Provide(newOptionalConfigSync),
fx.Supply(Params{}),
)
}

// OptionalModuleWithParams defines the fx options for this component, but
// requires additionally specifying custom Params from the fx App, to be
// passed to the constructor.
func OptionalModuleWithParams() fxutil.Module {
func OptionalModule(params Params) fxutil.Module {
return fxutil.Component(
fx.Provide(newOptionalConfigSync),
fx.Supply(params),
)
}

Expand All @@ -64,21 +55,23 @@ type configSync struct {
}

// newOptionalConfigSync checks if the component was enabled as per the config, and returns an optional.Option
func newOptionalConfigSync(deps dependencies) optional.Option[configsync.Component] {
func newOptionalConfigSync(deps dependencies) (optional.Option[configsync.Component], error) {
agentIPCPort := deps.Config.GetInt("agent_ipc.port")
configRefreshIntervalSec := deps.Config.GetInt("agent_ipc.config_refresh_interval")

if agentIPCPort <= 0 || configRefreshIntervalSec <= 0 {
return optional.NewNoneOption[configsync.Component]()
deps.Log.Infof("configsync disabled (agent_ipc.port: %d | agent_ipc.config_refresh_interval: %d)", agentIPCPort, configRefreshIntervalSec)
return optional.NewNoneOption[configsync.Component](), nil
}

configSync := newConfigSync(deps, agentIPCPort, configRefreshIntervalSec)
return optional.NewOption(configSync)
deps.Log.Infof("configsync enabled (agent_ipc '%s:%d' | agent_ipc.config_refresh_interval: %d)", deps.Config.GetString("agent_ipc.host"), agentIPCPort, configRefreshIntervalSec)
configSync, err := newConfigSync(deps, agentIPCPort, configRefreshIntervalSec)
return optional.NewOption(configSync), err
}

// newConfigSync creates a new configSync component.
// agentIPCPort and configRefreshIntervalSec must be strictly positive.
func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec int) configsync.Component {
func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec int) (configsync.Component, error) {
agentIPCHost := deps.Config.GetString("agent_ipc.host")

url := &url.URL{
Expand All @@ -100,17 +93,20 @@ func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec
ctx: ctx,
}

if deps.SyncParams.OnInit {
if deps.SyncParams.Delay != 0 {
select {
case <-ctx.Done(): //context cancelled
// TODO: this component should return an error
if deps.SyncParams.OnInitSync {
deps.Log.Infof("triggering configsync on init (will retry for %s)", deps.SyncParams.OnInitSyncTimeout)
deadline := time.Now().Add(deps.SyncParams.OnInitSyncTimeout)
for {
if err := configSync.updater(); err == nil {
break
}
if time.Now().After(deadline) {
cancel()
return nil
case <-time.After(deps.SyncParams.Delay):
return nil, deps.Log.Errorf("failed to sync config at startup, is the core agent listening on '%s' ?", url.String())
}
time.Sleep(2 * time.Second)
}
configSync.updater()
deps.Log.Infof("triggering configsync on init succeeded")
}

// start and stop the routine in fx hooks
Expand All @@ -125,5 +121,5 @@ func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec
},
})

return configSync
return configSync, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestOptionalModule(t *testing.T) {
csopt := fxutil.Test[optional.Option[configsync.Component]](t, fx.Options(
core.MockBundle(),
fetchonlyimpl.Module(),
OptionalModule(),
OptionalModule(Params{}),
fx.Populate(&cfg),
fx.Replace(config.MockParams{Overrides: overrides}),
))
Expand Down
9 changes: 6 additions & 3 deletions comp/core/configsync/configsyncimpl/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ func TestNewOptionalConfigSync(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.port", 1234, pkgconfigmodel.SourceFile)
deps.Config.Set("agent_ipc.config_refresh_interval", 30, pkgconfigmodel.SourceFile)
optConfigSync := newOptionalConfigSync(deps)
optConfigSync, err := newOptionalConfigSync(deps)
require.NoError(t, err)
_, ok := optConfigSync.Get()
require.True(t, ok)
})

t.Run("disabled ipc port zero", func(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.port", 0, pkgconfigmodel.SourceFile)
optConfigSync := newOptionalConfigSync(deps)
optConfigSync, err := newOptionalConfigSync(deps)
require.NoError(t, err)
_, ok := optConfigSync.Get()
require.False(t, ok)
})

t.Run("disabled config refresh interval zero", func(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.config_refresh_interval", 0, pkgconfigmodel.SourceFile)
optConfigSync := newOptionalConfigSync(deps)
optConfigSync, err := newOptionalConfigSync(deps)
require.NoError(t, err)
_, ok := optConfigSync.Get()
require.False(t, ok)
})
Expand Down
21 changes: 15 additions & 6 deletions comp/core/configsync/configsyncimpl/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,26 @@ import "time"

// Params defines the parameters for the configsync component.
type Params struct {
// Timeout is the timeout use for each call to the core-agent
Timeout time.Duration
Delay time.Duration
OnInit bool
// OnInitSync makes configsync synchronize the configuration at initialization and fails init if we can get the
// configuration from the core agent
OnInitSync bool
// OnInitSyncTimeout represents how long configsync should retry to synchronize configuration at init
OnInitSyncTimeout time.Duration
}

// NewParams creates a new instance of Params
func NewParams(to time.Duration, delay time.Duration, sync bool) Params {
func NewParams(syncTimeout time.Duration, syncOnInit bool, syncOnInitTimeout time.Duration) Params {
params := Params{
Timeout: to,
Delay: delay,
OnInit: sync,
Timeout: syncTimeout,
OnInitSync: syncOnInit,
OnInitSyncTimeout: syncOnInitTimeout,
}
return params
}

// NewDefaultParams returns the default params for configsync
func NewDefaultParams() Params {
return Params{}
}
10 changes: 6 additions & 4 deletions comp/core/configsync/configsyncimpl/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import (
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
)

func (cs *configSync) updater() {
func (cs *configSync) updater() error {
cs.Log.Debugf("Pulling new configuration from agent-core at '%s'", cs.url.String())
cfg, err := fetchConfig(cs.ctx, cs.client, cs.Authtoken.Get(), cs.url.String())
if err != nil {
if cs.connected {
cs.Log.Warnf("Failed to fetch config from core agent: %v", err)
cs.Log.Warnf("Loosed connectivity to core-agent to fetch config: %v", err)
cs.connected = false
} else {
cs.Log.Debugf("Failed to fetch config from core agent: %v", err)
}
return
return err
}

if cs.connected {
Expand Down Expand Up @@ -62,6 +63,7 @@ func (cs *configSync) updater() {
updateConfig(cs.Config, key, value)
}
}
return nil
}

func (cs *configSync) runWithInterval(refreshInterval time.Duration) {
Expand All @@ -80,7 +82,7 @@ func (cs *configSync) runWithChan(ch <-chan time.Time) {
case <-cs.ctx.Done():
return
case <-ch:
cs.updater()
_ = cs.updater()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion comp/core/configsync/configsyncimpl/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func makeDeps(t *testing.T) dependencies {
return fxutil.Test[dependencies](t, fx.Options(
core.MockBundle(),
fetchonlyimpl.MockModule(), // use the mock to avoid trying to read the file
fx.Supply(NewParams(0, 0, false)),
fx.Supply(NewParams(0, false, 0)),
))
}

Expand Down
15 changes: 5 additions & 10 deletions comp/forwarder/defaultforwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func newForwarder(dep dependencies) provides {

func createOptions(params Params, config config.Component, log log.Component) *Options {
var options *Options
keysPerDomain := getMultipleEndpoints(config, log)
keysPerDomain, err := utils.GetMultipleEndpoints(config)
if err != nil {
log.Error("Misconfiguration of agent endpoints: ", err)
}

if !params.withResolver {
options = NewOptions(config, log, keysPerDomain)
} else {
Expand All @@ -61,15 +65,6 @@ func createOptions(params Params, config config.Component, log log.Component) *O
return options
}

func getMultipleEndpoints(config config.Component, log log.Component) map[string][]string {
// Inject the config to make sure we can call GetMultipleEndpoints.
keysPerDomain, err := utils.GetMultipleEndpoints(config)
if err != nil {
log.Error("Misconfiguration of agent endpoints: ", err)
}
return keysPerDomain
}

// NewForwarder returns a new forwarder component.
//
//nolint:revive
Expand Down

0 comments on commit 2a2c5a2

Please sign in to comment.