Skip to content

Commit

Permalink
[backport 7.62.x] [bugfix] Add retry ability to configsync comp (#32390
Browse files Browse the repository at this point in the history
…) (#32649)

Co-authored-by: maxime mouial <[email protected]>
  • Loading branch information
truthbk and hush-hush authored Jan 3, 2025
1 parent 7f09cc1 commit 808ffbd
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 76 deletions.
6 changes: 3 additions & 3 deletions cmd/otel-agent/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ func makeCommands(globalParams *subcommands.GlobalParams) *cobra.Command {

const configFlag = "config"
const coreConfigFlag = "core-config"
const syncDelayFlag = "sync-delay"
const syncDelayFlag = "sync-delay" // TODO: Change this to sync-on-init-timeout
const syncTimeoutFlag = "sync-to"

func flags(reg *featuregate.Registry, cfgs *subcommands.GlobalParams) *flag.FlagSet {
flagSet := new(flag.FlagSet)
flagSet.Var(cfgs, configFlag, "Locations to the config file(s), note that only a"+
" single location can be set per flag entry e.g. `--config=file:/path/to/first --config=file:path/to/second`.")
flagSet.StringVar(&cfgs.CoreConfPath, coreConfigFlag, "", "Location to the Datadog Agent config file.")
flagSet.DurationVar(&cfgs.SyncDelay, syncDelayFlag, 0, "Delay before first config sync.")
flagSet.DurationVar(&cfgs.SyncTimeout, syncTimeoutFlag, 3*time.Second, "Timeout for sync requests.")
flagSet.DurationVar(&cfgs.SyncOnInitTimeout, syncDelayFlag, 0, "How long should config sync retry at initialization before failing.")
flagSet.DurationVar(&cfgs.SyncTimeout, syncTimeoutFlag, 3*time.Second, "Timeout for config sync requests.")

flagSet.Func("set",
"Set arbitrary component config property. The component has to be defined in the config file and the flag"+
Expand Down
16 changes: 2 additions & 14 deletions cmd/otel-agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,7 @@ func runOTelAgentCommand(ctx context.Context, params *subcommands.GlobalParams,
}),
logfx.Module(),
fetchonlyimpl.Module(),
// TODO: don't rely on this pattern; remove this `ModuleWithParams` thing
// and instead adapt OptionalModule to allow parameter passing naturally.
// See: https://github.com/DataDog/datadog-agent/pull/28386
configsyncimpl.ModuleWithParams(),
fx.Provide(func() configsyncimpl.Params {
return configsyncimpl.NewParams(params.SyncTimeout, params.SyncDelay, true)
}),
configsyncimpl.Module(configsyncimpl.NewParams(params.SyncTimeout, true, params.SyncOnInitTimeout)),
converterfx.Module(),
fx.Provide(func(cp converter.Component, _ configsync.Component) confmap.Converter {
return cp
Expand Down Expand Up @@ -191,13 +185,7 @@ func runOTelAgentCommand(ctx context.Context, params *subcommands.GlobalParams,
fx.Invoke(func(_ collectordef.Component, _ defaultforwarder.Forwarder, _ optional.Option[logsagentpipeline.Component]) {
}),

// TODO: don't rely on this pattern; remove this `ModuleWithParams` thing
// and instead adapt OptionalModule to allow parameter passing naturally.
// See: https://github.com/DataDog/datadog-agent/pull/28386
configsyncimpl.ModuleWithParams(),
fx.Provide(func() configsyncimpl.Params {
return configsyncimpl.NewParams(params.SyncTimeout, params.SyncDelay, true)
}),
configsyncimpl.Module(configsyncimpl.NewParams(params.SyncTimeout, true, params.SyncOnInitTimeout)),

remoteTaggerFx.Module(tagger.RemoteParams{
RemoteTarget: func(c coreconfig.Component) (string, error) { return fmt.Sprintf(":%v", c.GetInt("cmd_port")), nil },
Expand Down
14 changes: 7 additions & 7 deletions cmd/otel-agent/subcommands/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
// A pointer to this type is passed to SubcommandFactory's, but its contents
// are not valid until Cobra calls the subcommand's Run or RunE function.
type GlobalParams struct {
ConfPaths []string
Sets []string
CoreConfPath string
ConfigName string
LoggerName string
SyncDelay time.Duration
SyncTimeout time.Duration
ConfPaths []string
Sets []string
CoreConfPath string
ConfigName string
LoggerName string
SyncOnInitTimeout time.Duration
SyncTimeout time.Duration
}

// Set is called by Cobra when a flag is set.
Expand Down
2 changes: 1 addition & 1 deletion cmd/process-agent/command/main_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func runApp(ctx context.Context, globalParams *GlobalParams) error {
fetchonlyimpl.Module(),

// Provide configsync module
configsyncimpl.Module(),
configsyncimpl.Module(configsyncimpl.NewDefaultParams()),

// Provide autoexit module
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/security-agent/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *service) Run(svcctx context.Context) error {
statusimpl.Module(),

fetchonlyimpl.Module(),
configsyncimpl.Module(),
configsyncimpl.Module(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the component
fx.Invoke(func(_ configsync.Component) {}),
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/security-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
}),
statusimpl.Module(),
fetchonlyimpl.Module(),
configsyncimpl.Module(),
configsyncimpl.Module(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the component
fx.Invoke(func(_ configsync.Component) {}),
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/trace-agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func runTraceAgentProcess(ctx context.Context, cliParams *Params, defaultConfPat
zstdfx.Module(),
trace.Bundle(),
fetchonlyimpl.Module(),
configsyncimpl.Module(),
configsyncimpl.Module(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the components
fx.Invoke(func(_ traceagent.Component, _ configsync.Component, _ autoexit.Component) {}),
)
Expand Down
40 changes: 17 additions & 23 deletions comp/core/configsync/configsyncimpl/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,10 @@ type dependencies struct {
}

// Module defines the fx options for this component.
func Module() fxutil.Module {
return fxutil.Component(
fx.Provide(newComponent),
fx.Supply(Params{}),
)
}

// ModuleWithParams defines the fx options for this component, but
// requires additionally specifying custom Params from the fx App, to be
// passed to the constructor.
func ModuleWithParams() fxutil.Module {
func Module(params Params) fxutil.Module {
return fxutil.Component(
fx.Provide(newComponent),
fx.Supply(params),
)
}

Expand All @@ -64,13 +55,13 @@ type configSync struct {
}

// newComponent checks if the component was enabled as per the config and return a enable/disabled configsync
func newComponent(deps dependencies) configsync.Component {
func newComponent(deps dependencies) (configsync.Component, error) {
agentIPCPort := deps.Config.GetInt("agent_ipc.port")
configRefreshIntervalSec := deps.Config.GetInt("agent_ipc.config_refresh_interval")

if agentIPCPort <= 0 || configRefreshIntervalSec <= 0 {
deps.Log.Infof("configsync disabled (agent_ipc.port: %d | agent_ipc.config_refresh_interval: %d)", agentIPCPort, configRefreshIntervalSec)
return configSync{}
return configSync{}, nil
}

deps.Log.Infof("configsync enabled (agent_ipc '%s:%d' | agent_ipc.config_refresh_interval: %d)", deps.Config.GetString("agent_ipc.host"), agentIPCPort, configRefreshIntervalSec)
Expand All @@ -79,7 +70,7 @@ func newComponent(deps dependencies) configsync.Component {

// newConfigSync creates a new configSync component.
// agentIPCPort and configRefreshIntervalSec must be strictly positive.
func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec int) configsync.Component {
func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec int) (configsync.Component, error) {
agentIPCHost := deps.Config.GetString("agent_ipc.host")

url := &url.URL{
Expand All @@ -102,17 +93,20 @@ func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec
enabled: true,
}

if deps.SyncParams.OnInit {
if deps.SyncParams.Delay != 0 {
select {
case <-ctx.Done(): //context cancelled
// TODO: this component should return an error
if deps.SyncParams.OnInitSync {
deps.Log.Infof("triggering configsync on init (will retry for %s)", deps.SyncParams.OnInitSyncTimeout)
deadline := time.Now().Add(deps.SyncParams.OnInitSyncTimeout)
for {
if err := configSync.updater(); err == nil {
break
}
if time.Now().After(deadline) {
cancel()
return nil
case <-time.After(deps.SyncParams.Delay):
return nil, deps.Log.Errorf("failed to sync config at startup, is the core agent listening on '%s' ?", url.String())
}
time.Sleep(2 * time.Second)
}
configSync.updater()
deps.Log.Infof("triggering configsync on init succeeded")
}

// start and stop the routine in fx hooks
Expand All @@ -127,5 +121,5 @@ func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec
},
})

return configSync
return configSync, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestOptionalModule(t *testing.T) {
comp := fxutil.Test[configsync.Component](t, fx.Options(
core.MockBundle(),
fetchonlyimpl.Module(),
Module(),
Module(Params{}),
fx.Populate(&cfg),
fx.Replace(config.MockParams{Overrides: overrides}),
))
Expand Down
10 changes: 7 additions & 3 deletions comp/core/configsync/configsyncimpl/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,32 @@ import (

pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewConfigSync(t *testing.T) {
t.Run("enabled", func(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.port", 1234, pkgconfigmodel.SourceFile)
deps.Config.Set("agent_ipc.config_refresh_interval", 30, pkgconfigmodel.SourceFile)
comp := newComponent(deps)
comp, err := newComponent(deps)
require.NoError(t, err)
assert.True(t, comp.(configSync).enabled)
})

t.Run("disabled ipc port zero", func(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.port", 0, pkgconfigmodel.SourceFile)
comp := newComponent(deps)
comp, err := newComponent(deps)
require.NoError(t, err)
assert.False(t, comp.(configSync).enabled)
})

t.Run("disabled config refresh interval zero", func(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.config_refresh_interval", 0, pkgconfigmodel.SourceFile)
comp := newComponent(deps)
comp, err := newComponent(deps)
require.NoError(t, err)
assert.False(t, comp.(configSync).enabled)
})
}
21 changes: 15 additions & 6 deletions comp/core/configsync/configsyncimpl/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,26 @@ import "time"

// Params defines the parameters for the configsync component.
type Params struct {
// Timeout is the timeout use for each call to the core-agent
Timeout time.Duration
Delay time.Duration
OnInit bool
// OnInitSync makes configsync synchronize the configuration at initialization and fails init if we can get the
// configuration from the core agent
OnInitSync bool
// OnInitSyncTimeout represents how long configsync should retry to synchronize configuration at init
OnInitSyncTimeout time.Duration
}

// NewParams creates a new instance of Params
func NewParams(to time.Duration, delay time.Duration, sync bool) Params {
func NewParams(syncTimeout time.Duration, syncOnInit bool, syncOnInitTimeout time.Duration) Params {
params := Params{
Timeout: to,
Delay: delay,
OnInit: sync,
Timeout: syncTimeout,
OnInitSync: syncOnInit,
OnInitSyncTimeout: syncOnInitTimeout,
}
return params
}

// NewDefaultParams returns the default params for configsync
func NewDefaultParams() Params {
return Params{}
}
10 changes: 6 additions & 4 deletions comp/core/configsync/configsyncimpl/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import (
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
)

func (cs *configSync) updater() {
func (cs *configSync) updater() error {
cs.Log.Debugf("Pulling new configuration from agent-core at '%s'", cs.url.String())
cfg, err := fetchConfig(cs.ctx, cs.client, cs.Authtoken.Get(), cs.url.String())
if err != nil {
if cs.connected {
cs.Log.Warnf("Failed to fetch config from core agent: %v", err)
cs.Log.Warnf("Loosed connectivity to core-agent to fetch config: %v", err)
cs.connected = false
} else {
cs.Log.Debugf("Failed to fetch config from core agent: %v", err)
}
return
return err
}

if cs.connected {
Expand Down Expand Up @@ -62,6 +63,7 @@ func (cs *configSync) updater() {
updateConfig(cs.Config, key, value)
}
}
return nil
}

func (cs *configSync) runWithInterval(refreshInterval time.Duration) {
Expand All @@ -80,7 +82,7 @@ func (cs *configSync) runWithChan(ch <-chan time.Time) {
case <-cs.ctx.Done():
return
case <-ch:
cs.updater()
_ = cs.updater()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion comp/core/configsync/configsyncimpl/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func makeDeps(t *testing.T) dependencies {
return fxutil.Test[dependencies](t, fx.Options(
core.MockBundle(),
fetchonlyimpl.MockModule(), // use the mock to avoid trying to read the file
fx.Supply(NewParams(0, 0, false)),
fx.Supply(NewParams(0, false, 0)),
))
}

Expand Down
15 changes: 5 additions & 10 deletions comp/forwarder/defaultforwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func newForwarder(dep dependencies) provides {

func createOptions(params Params, config config.Component, log log.Component) *Options {
var options *Options
keysPerDomain := getMultipleEndpoints(config, log)
keysPerDomain, err := utils.GetMultipleEndpoints(config)
if err != nil {
log.Error("Misconfiguration of agent endpoints: ", err)
}

if !params.withResolver {
options = NewOptions(config, log, keysPerDomain)
} else {
Expand All @@ -61,15 +65,6 @@ func createOptions(params Params, config config.Component, log log.Component) *O
return options
}

func getMultipleEndpoints(config config.Component, log log.Component) map[string][]string {
// Inject the config to make sure we can call GetMultipleEndpoints.
keysPerDomain, err := utils.GetMultipleEndpoints(config)
if err != nil {
log.Error("Misconfiguration of agent endpoints: ", err)
}
return keysPerDomain
}

// NewForwarder returns a new forwarder component.
//
//nolint:revive
Expand Down

0 comments on commit 808ffbd

Please sign in to comment.