Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry ability to configsync comp #32390

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/otel-agent/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ func makeCommands(globalParams *subcommands.GlobalParams) *cobra.Command {

const configFlag = "config"
const coreConfigFlag = "core-config"
const syncDelayFlag = "sync-delay"
const syncDelayFlag = "sync-delay" // TODO: Change this to sync-on-init-timeout
const syncTimeoutFlag = "sync-to"

func flags(reg *featuregate.Registry, cfgs *subcommands.GlobalParams) *flag.FlagSet {
flagSet := new(flag.FlagSet)
flagSet.Var(cfgs, configFlag, "Locations to the config file(s), note that only a"+
" single location can be set per flag entry e.g. `--config=file:/path/to/first --config=file:path/to/second`.")
flagSet.StringVar(&cfgs.CoreConfPath, coreConfigFlag, "", "Location to the Datadog Agent config file.")
flagSet.DurationVar(&cfgs.SyncDelay, syncDelayFlag, 0, "Delay before first config sync.")
flagSet.DurationVar(&cfgs.SyncTimeout, syncTimeoutFlag, 3*time.Second, "Timeout for sync requests.")
flagSet.DurationVar(&cfgs.SyncOnInitTimeout, syncDelayFlag, 0, "How long should config sync retry at initialization before failing.")
flagSet.DurationVar(&cfgs.SyncTimeout, syncTimeoutFlag, 3*time.Second, "Timeout for config sync requests.")

flagSet.Func("set",
"Set arbitrary component config property. The component has to be defined in the config file and the flag"+
Expand Down
16 changes: 2 additions & 14 deletions cmd/otel-agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,7 @@ func runOTelAgentCommand(ctx context.Context, params *subcommands.GlobalParams,
}),
logfx.Module(),
fetchonlyimpl.Module(),
// TODO: don't rely on this pattern; remove this `ModuleWithParams` thing
// and instead adapt OptionalModule to allow parameter passing naturally.
// See: https://github.com/DataDog/datadog-agent/pull/28386
configsyncimpl.ModuleWithParams(),
fx.Provide(func() configsyncimpl.Params {
return configsyncimpl.NewParams(params.SyncTimeout, params.SyncDelay, true)
}),
configsyncimpl.Module(configsyncimpl.NewParams(params.SyncTimeout, true, params.SyncOnInitTimeout)),
converterfx.Module(),
fx.Provide(func(cp converter.Component, _ configsync.Component) confmap.Converter {
return cp
Expand Down Expand Up @@ -191,13 +185,7 @@ func runOTelAgentCommand(ctx context.Context, params *subcommands.GlobalParams,
fx.Invoke(func(_ collectordef.Component, _ defaultforwarder.Forwarder, _ optional.Option[logsagentpipeline.Component]) {
}),

// TODO: don't rely on this pattern; remove this `ModuleWithParams` thing
// and instead adapt OptionalModule to allow parameter passing naturally.
// See: https://github.com/DataDog/datadog-agent/pull/28386
configsyncimpl.ModuleWithParams(),
fx.Provide(func() configsyncimpl.Params {
return configsyncimpl.NewParams(params.SyncTimeout, params.SyncDelay, true)
}),
configsyncimpl.Module(configsyncimpl.NewParams(params.SyncTimeout, true, params.SyncOnInitTimeout)),

remoteTaggerFx.Module(tagger.RemoteParams{
RemoteTarget: func(c coreconfig.Component) (string, error) { return fmt.Sprintf(":%v", c.GetInt("cmd_port")), nil },
Expand Down
14 changes: 7 additions & 7 deletions cmd/otel-agent/subcommands/subcommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ import (
// A pointer to this type is passed to SubcommandFactory's, but its contents
// are not valid until Cobra calls the subcommand's Run or RunE function.
type GlobalParams struct {
ConfPaths []string
Sets []string
CoreConfPath string
ConfigName string
LoggerName string
SyncDelay time.Duration
SyncTimeout time.Duration
ConfPaths []string
Sets []string
CoreConfPath string
ConfigName string
LoggerName string
SyncOnInitTimeout time.Duration
SyncTimeout time.Duration
}

// Set is called by Cobra when a flag is set.
Expand Down
2 changes: 1 addition & 1 deletion cmd/process-agent/command/main_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func runApp(ctx context.Context, globalParams *GlobalParams) error {
fetchonlyimpl.Module(),

// Provide configsync module
configsyncimpl.Module(),
configsyncimpl.Module(configsyncimpl.NewDefaultParams()),

// Provide autoexit module
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/security-agent/main_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (s *service) Run(svcctx context.Context) error {
statusimpl.Module(),

fetchonlyimpl.Module(),
configsyncimpl.Module(),
configsyncimpl.Module(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the component
fx.Invoke(func(_ configsync.Component) {}),
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/security-agent/subcommands/start/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func Commands(globalParams *command.GlobalParams) []*cobra.Command {
}),
statusimpl.Module(),
fetchonlyimpl.Module(),
configsyncimpl.Module(),
configsyncimpl.Module(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the component
fx.Invoke(func(_ configsync.Component) {}),
autoexitimpl.Module(),
Expand Down
2 changes: 1 addition & 1 deletion cmd/trace-agent/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func runTraceAgentProcess(ctx context.Context, cliParams *Params, defaultConfPat
zstdfx.Module(),
trace.Bundle(),
fetchonlyimpl.Module(),
configsyncimpl.Module(),
configsyncimpl.Module(configsyncimpl.NewDefaultParams()),
// Force the instantiation of the components
fx.Invoke(func(_ traceagent.Component, _ configsync.Component, _ autoexit.Component) {}),
)
Expand Down
40 changes: 17 additions & 23 deletions comp/core/configsync/configsyncimpl/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,10 @@ type dependencies struct {
}

// Module defines the fx options for this component.
func Module() fxutil.Module {
return fxutil.Component(
fx.Provide(newComponent),
fx.Supply(Params{}),
)
}

// ModuleWithParams defines the fx options for this component, but
// requires additionally specifying custom Params from the fx App, to be
// passed to the constructor.
func ModuleWithParams() fxutil.Module {
func Module(params Params) fxutil.Module {
return fxutil.Component(
fx.Provide(newComponent),
fx.Supply(params),
)
}

Expand All @@ -64,13 +55,13 @@ type configSync struct {
}

// newComponent checks if the component was enabled as per the config and return a enable/disabled configsync
func newComponent(deps dependencies) configsync.Component {
func newComponent(deps dependencies) (configsync.Component, error) {
agentIPCPort := deps.Config.GetInt("agent_ipc.port")
configRefreshIntervalSec := deps.Config.GetInt("agent_ipc.config_refresh_interval")

if agentIPCPort <= 0 || configRefreshIntervalSec <= 0 {
deps.Log.Infof("configsync disabled (agent_ipc.port: %d | agent_ipc.config_refresh_interval: %d)", agentIPCPort, configRefreshIntervalSec)
return configSync{}
return configSync{}, nil
}

deps.Log.Infof("configsync enabled (agent_ipc '%s:%d' | agent_ipc.config_refresh_interval: %d)", deps.Config.GetString("agent_ipc.host"), agentIPCPort, configRefreshIntervalSec)
Expand All @@ -79,7 +70,7 @@ func newComponent(deps dependencies) configsync.Component {

// newConfigSync creates a new configSync component.
// agentIPCPort and configRefreshIntervalSec must be strictly positive.
func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec int) configsync.Component {
func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec int) (configsync.Component, error) {
agentIPCHost := deps.Config.GetString("agent_ipc.host")

url := &url.URL{
Expand All @@ -102,17 +93,20 @@ func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec
enabled: true,
}

if deps.SyncParams.OnInit {
if deps.SyncParams.Delay != 0 {
select {
case <-ctx.Done(): //context cancelled
// TODO: this component should return an error
if deps.SyncParams.OnInitSync {
deps.Log.Infof("triggering configsync on init (will retry for %s)", deps.SyncParams.OnInitSyncTimeout)
deadline := time.Now().Add(deps.SyncParams.OnInitSyncTimeout)
for {
if err := configSync.updater(); err == nil {
break
}
if time.Now().After(deadline) {
cancel()
return nil
case <-time.After(deps.SyncParams.Delay):
return nil, deps.Log.Errorf("failed to sync config at startup, is the core agent listening on '%s' ?", url.String())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking whether it's intentional, but if this error is returned due to all retries failing, that will cause the app to exit due to fx graph construction failing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think starting the process in a failed state is no OK. Especially with k8s which might restart it and fix the error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think that makes sense.

}
time.Sleep(2 * time.Second)
}
configSync.updater()
deps.Log.Infof("triggering configsync on init succeeded")
}

// start and stop the routine in fx hooks
Expand All @@ -127,5 +121,5 @@ func newConfigSync(deps dependencies, agentIPCPort int, configRefreshIntervalSec
},
})

return configSync
return configSync, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestOptionalModule(t *testing.T) {
comp := fxutil.Test[configsync.Component](t, fx.Options(
core.MockBundle(),
fetchonlyimpl.Module(),
Module(),
Module(Params{}),
fx.Populate(&cfg),
fx.Replace(config.MockParams{Overrides: overrides}),
))
Expand Down
10 changes: 7 additions & 3 deletions comp/core/configsync/configsyncimpl/module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,32 @@ import (

pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNewConfigSync(t *testing.T) {
t.Run("enabled", func(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.port", 1234, pkgconfigmodel.SourceFile)
deps.Config.Set("agent_ipc.config_refresh_interval", 30, pkgconfigmodel.SourceFile)
comp := newComponent(deps)
comp, err := newComponent(deps)
require.NoError(t, err)
assert.True(t, comp.(configSync).enabled)
})

t.Run("disabled ipc port zero", func(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.port", 0, pkgconfigmodel.SourceFile)
comp := newComponent(deps)
comp, err := newComponent(deps)
require.NoError(t, err)
assert.False(t, comp.(configSync).enabled)
})

t.Run("disabled config refresh interval zero", func(t *testing.T) {
deps := makeDeps(t)
deps.Config.Set("agent_ipc.config_refresh_interval", 0, pkgconfigmodel.SourceFile)
comp := newComponent(deps)
comp, err := newComponent(deps)
require.NoError(t, err)
assert.False(t, comp.(configSync).enabled)
})
}
21 changes: 15 additions & 6 deletions comp/core/configsync/configsyncimpl/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,26 @@ import "time"

// Params defines the parameters for the configsync component.
type Params struct {
// Timeout is the timeout use for each call to the core-agent
Timeout time.Duration
Delay time.Duration
OnInit bool
// OnInitSync makes configsync synchronize the configuration at initialization and fails init if we can get the
// configuration from the core agent
OnInitSync bool
// OnInitSyncTimeout represents how long configsync should retry to synchronize configuration at init
OnInitSyncTimeout time.Duration
}

// NewParams creates a new instance of Params
func NewParams(to time.Duration, delay time.Duration, sync bool) Params {
func NewParams(syncTimeout time.Duration, syncOnInit bool, syncOnInitTimeout time.Duration) Params {
params := Params{
Timeout: to,
Delay: delay,
OnInit: sync,
Timeout: syncTimeout,
OnInitSync: syncOnInit,
OnInitSyncTimeout: syncOnInitTimeout,
}
return params
}

// NewDefaultParams returns the default params for configsync
func NewDefaultParams() Params {
return Params{}
}
10 changes: 6 additions & 4 deletions comp/core/configsync/configsyncimpl/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ import (
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
)

func (cs *configSync) updater() {
func (cs *configSync) updater() error {
cs.Log.Debugf("Pulling new configuration from agent-core at '%s'", cs.url.String())
cfg, err := fetchConfig(cs.ctx, cs.client, cs.Authtoken.Get(), cs.url.String())
if err != nil {
if cs.connected {
cs.Log.Warnf("Failed to fetch config from core agent: %v", err)
cs.Log.Warnf("Loosed connectivity to core-agent to fetch config: %v", err)
hush-hush marked this conversation as resolved.
Show resolved Hide resolved
cs.connected = false
} else {
cs.Log.Debugf("Failed to fetch config from core agent: %v", err)
}
return
return err
}

if cs.connected {
Expand Down Expand Up @@ -62,6 +63,7 @@ func (cs *configSync) updater() {
updateConfig(cs.Config, key, value)
}
}
return nil
}

func (cs *configSync) runWithInterval(refreshInterval time.Duration) {
Expand All @@ -80,7 +82,7 @@ func (cs *configSync) runWithChan(ch <-chan time.Time) {
case <-cs.ctx.Done():
return
case <-ch:
cs.updater()
_ = cs.updater()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion comp/core/configsync/configsyncimpl/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func makeDeps(t *testing.T) dependencies {
return fxutil.Test[dependencies](t, fx.Options(
core.MockBundle(),
fetchonlyimpl.MockModule(), // use the mock to avoid trying to read the file
fx.Supply(NewParams(0, 0, false)),
fx.Supply(NewParams(0, false, 0)),
))
}

Expand Down
15 changes: 5 additions & 10 deletions comp/forwarder/defaultforwarder/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ func newForwarder(dep dependencies) provides {

func createOptions(params Params, config config.Component, log log.Component) *Options {
var options *Options
keysPerDomain := getMultipleEndpoints(config, log)
keysPerDomain, err := utils.GetMultipleEndpoints(config)
if err != nil {
log.Error("Misconfiguration of agent endpoints: ", err)
}

if !params.withResolver {
options = NewOptions(config, log, keysPerDomain)
} else {
Expand All @@ -61,15 +65,6 @@ func createOptions(params Params, config config.Component, log log.Component) *O
return options
}

func getMultipleEndpoints(config config.Component, log log.Component) map[string][]string {
// Inject the config to make sure we can call GetMultipleEndpoints.
keysPerDomain, err := utils.GetMultipleEndpoints(config)
if err != nil {
log.Error("Misconfiguration of agent endpoints: ", err)
}
return keysPerDomain
}

// NewForwarder returns a new forwarder component.
//
//nolint:revive
Expand Down
Loading