Skip to content

Commit

Permalink
[8.7](backport #2485) Better callback registration/deregistration in …
Browse files Browse the repository at this point in the history
…host provider's lifecycle (#2486)

* Better callback registration/deregistration in host provider's lifecycle (#2485)

* Implement CloseableProvider

* Make hostProvider implement CloseableProvider

* Update test to be more resilient

* Move feature flags parsing from standalone configuration to late in the Agent constructor

* Add explanatory comment for closeProvider function

* Running mage fmt

* Remove unused function

* Add Close() to controller interface and implement it

* Call composable controller's Close() method

* Use buffered channel to prevent blocking

* Don't block until Run() is running

* Call composable controller's Close() in tests

(cherry picked from commit 86c3395)

# Conflicts:
#	internal/pkg/agent/application/application.go
#	internal/pkg/agent/cmd/run.go

* Fixing conflicts

---------

Co-authored-by: Shaunak Kashyap <[email protected]>
  • Loading branch information
mergify[bot] and ycombinator authored Apr 11, 2023
1 parent 721f56b commit 18de34e
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 61 deletions.
37 changes: 20 additions & 17 deletions internal/pkg/agent/application/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/pkg/features"

"go.elastic.co/apm"

Expand All @@ -25,7 +26,6 @@ import (
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/component/runtime"
"github.com/elastic/elastic-agent/pkg/core/logger"
"github.com/elastic/elastic-agent/pkg/features"
)

// New creates a new Agent and bootstrap the required subsystem.
Expand All @@ -38,40 +38,36 @@ func New(
tracer *apm.Tracer,
disableMonitoring bool,
modifiers ...component.PlatformModifier,
) (*coordinator.Coordinator, error) {
) (*coordinator.Coordinator, composable.Controller, error) {
platform, err := component.LoadPlatformDetail(modifiers...)
if err != nil {
return nil, fmt.Errorf("failed to gather system information: %w", err)
return nil, nil, fmt.Errorf("failed to gather system information: %w", err)
}
log.Info("Gathered system information")

specs, err := component.LoadRuntimeSpecs(paths.Components(), platform)
if err != nil {
return nil, fmt.Errorf("failed to detect inputs and outputs: %w", err)
return nil, nil, fmt.Errorf("failed to detect inputs and outputs: %w", err)
}
log.With("inputs", specs.Inputs()).Info("Detected available inputs and outputs")

caps, err := capabilities.Load(paths.AgentCapabilitiesPath(), log)
if err != nil {
return nil, fmt.Errorf("failed to determine capabilities: %w", err)
return nil, nil, fmt.Errorf("failed to determine capabilities: %w", err)
}
log.Info("Determined allowed capabilities")

pathConfigFile := paths.ConfigFile()
rawConfig, err := config.LoadFile(pathConfigFile)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %w", err)
return nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
if err := info.InjectAgentConfig(rawConfig); err != nil {
return nil, fmt.Errorf("failed to load configuration: %w", err)
return nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}
cfg, err := configuration.NewFromConfig(rawConfig)
if err != nil {
return nil, fmt.Errorf("failed to load configuration: %w", err)
}

if err := features.Apply(rawConfig); err != nil {
return nil, fmt.Errorf("could not parse and apply feature flags config: %w", err)
return nil, nil, fmt.Errorf("failed to load configuration: %w", err)
}

// monitoring is not supported in bootstrap mode https://github.com/elastic/elastic-agent/issues/1761
Expand All @@ -89,7 +85,7 @@ func New(
cfg.Settings.GRPC,
)
if err != nil {
return nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
return nil, nil, fmt.Errorf("failed to initialize runtime manager: %w", err)
}

var configMgr coordinator.ConfigManager
Expand All @@ -114,7 +110,7 @@ func New(
var store storage.Store
store, cfg, err = mergeFleetConfig(rawConfig)
if err != nil {
return nil, err
return nil, nil, err
}

if configuration.IsFleetServerBootstrap(cfg.Fleet) {
Expand All @@ -131,15 +127,15 @@ func New(

managed, err = newManagedConfigManager(log, agentInfo, cfg, store, runtime)
if err != nil {
return nil, err
return nil, nil, err
}
configMgr = managed
}
}

composable, err := composable.New(log, rawConfig, composableManaged)
if err != nil {
return nil, errors.New(err, "failed to initialize composable controller")
return nil, nil, errors.New(err, "failed to initialize composable controller")
}

coord := coordinator.New(log, logLevel, agentInfo, specs, reexec, upgrader, runtime, configMgr, composable, caps, monitor, isManaged, compModifiers...)
Expand All @@ -148,7 +144,14 @@ func New(
// coordinator, so it must be set here once the coordinator is created
managed.coord = coord
}
return coord, nil

// It is important that feature flags from configuration are applied as late as possible. This will ensure that
// any feature flag change callbacks are registered before they get called by `features.Apply`.
if err := features.Apply(rawConfig); err != nil {
return nil, nil, fmt.Errorf("could not parse and apply feature flags config: %w", err)
}

return coord, composable, nil
}

func mergeFleetConfig(rawConfig *config.Config) (storage.Store, *configuration.Configuration, error) {
Expand Down
3 changes: 2 additions & 1 deletion internal/pkg/agent/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,11 @@ func run(override cfgOverrider, modifiers ...component.PlatformModifier) error {
l.Info("APM instrumentation disabled")
}

coord, err := application.New(l, baseLogger, logLvl, agentInfo, rex, tracer, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
coord, composable, err := application.New(l, baseLogger, logLvl, agentInfo, rex, tracer, configuration.IsFleetServerBootstrap(cfg.Fleet), modifiers...)
if err != nil {
return err
}
defer composable.Close()

serverStopFn, err := setupMetrics(l, cfg.Settings.DownloadConfig.OS(), cfg.Settings.MonitoringConfig, tracer, coord)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/agent/vars/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func WaitForVariables(ctx context.Context, l *logger.Logger, cfg *config.Config,
if err != nil {
return nil, fmt.Errorf("failed to create composable controller: %w", err)
}
defer composable.Close()

hasTimeout := false
if wait > time.Duration(0) {
Expand Down
32 changes: 32 additions & 0 deletions internal/pkg/composable/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ type Controller interface {

// Watch returns the channel to watch for variable changes.
Watch() <-chan []*transpiler.Vars

// Close closes the controller, allowing for any resource
// cleanup and such.
Close()
}

// controller manages the state of the providers current context.
Expand Down Expand Up @@ -251,6 +255,34 @@ func (c *controller) Watch() <-chan []*transpiler.Vars {
return c.ch
}

// Close closes the controller, allowing for any resource
// cleanup and such.
func (c *controller) Close() {
// Attempt to close all closeable context providers.
for name, state := range c.contextProviders {
cp, ok := state.provider.(corecomp.CloseableProvider)
if !ok {
continue
}

if err := cp.Close(); err != nil {
c.logger.Errorf("unable to close context provider %q: %s", name, err.Error())
}
}

// Attempt to close all closeable dynamic providers.
for name, state := range c.dynamicProviders {
cp, ok := state.provider.(corecomp.CloseableProvider)
if !ok {
continue
}

if err := cp.Close(); err != nil {
c.logger.Errorf("unable to close dynamic provider %q: %s", name, err.Error())
}
}
}

type contextProviderState struct {
context.Context

Expand Down
4 changes: 4 additions & 0 deletions internal/pkg/composable/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func TestCancellation(t *testing.T) {
t.Run(fmt.Sprintf("test run %d", i), func(t *testing.T) {
c, err := composable.New(log, cfg, false)
require.NoError(t, err)
defer c.Close()

ctx, cancelFn := context.WithTimeout(context.Background(), timeout)
defer cancelFn()
err = c.Run(ctx)
Expand All @@ -205,6 +207,8 @@ func TestCancellation(t *testing.T) {
t.Run("immediate cancellation", func(t *testing.T) {
c, err := composable.New(log, cfg, false)
require.NoError(t, err)
defer c.Close()

ctx, cancelFn := context.WithTimeout(context.Background(), 0)
cancelFn()
err = c.Run(ctx)
Expand Down
55 changes: 33 additions & 22 deletions internal/pkg/composable/providers/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ import (
"github.com/elastic/elastic-agent/pkg/core/logger"
)

// DefaultCheckInterval is the default timeout used to check if any host information has changed.
const DefaultCheckInterval = 5 * time.Minute
const (
// DefaultCheckInterval is the default timeout used to check if any host information has changed.
DefaultCheckInterval = 5 * time.Minute

fqdnFeatureFlagCallbackID = "host_provider"
)

func init() {
composable.Providers.MustAddContextProvider("host", ContextProviderBuilder)
Expand All @@ -34,6 +38,10 @@ type contextProvider struct {

CheckInterval time.Duration `config:"check_interval"`

// fqdnFFChangeCh is used to signal when the FQDN
// feature flag has changed
fqdnFFChangeCh chan struct{}

// used by testing
fetcher infoFetcher
}
Expand All @@ -49,29 +57,14 @@ func (c *contextProvider) Run(comm corecomp.ContextProviderComm) error {
return errors.New(err, "failed to set mapping", errors.TypeUnexpected)
}

const fqdnFeatureFlagCallbackID = "host_provider"
fqdnFFChangeCh := make(chan struct{})
err = features.AddFQDNOnChangeCallback(
onFQDNFeatureFlagChange(fqdnFFChangeCh),
fqdnFeatureFlagCallbackID,
)
if err != nil {
return fmt.Errorf("unable to add FQDN onChange callback in host provider: %w", err)
}

defer func() {
features.RemoveFQDNOnChangeCallback(fqdnFeatureFlagCallbackID)
close(fqdnFFChangeCh)
}()

// Update context when any host information changes.
for {
t := time.NewTimer(c.CheckInterval)
select {
case <-comm.Done():
t.Stop()
return comm.Err()
case <-fqdnFFChangeCh:
case <-c.fqdnFFChangeCh:
case <-t.C:
}

Expand All @@ -92,13 +85,21 @@ func (c *contextProvider) Run(comm corecomp.ContextProviderComm) error {
}
}

func onFQDNFeatureFlagChange(fqdnFFChangeCh chan struct{}) features.BoolValueOnChangeCallback {
return func(new, old bool) {
// FQDN feature flag was toggled, so notify on channel
fqdnFFChangeCh <- struct{}{}
func (c *contextProvider) onFQDNFeatureFlagChange(new, old bool) {
// FQDN feature flag was toggled, so notify on channel
select {
case c.fqdnFFChangeCh <- struct{}{}:
default:
}
}

func (c *contextProvider) Close() error {
features.RemoveFQDNOnChangeCallback(fqdnFeatureFlagCallbackID)
close(c.fqdnFFChangeCh)

return nil
}

// ContextProviderBuilder builds the context provider.
func ContextProviderBuilder(log *logger.Logger, c *config.Config, _ bool) (corecomp.ContextProvider, error) {
p := &contextProvider{
Expand All @@ -114,6 +115,16 @@ func ContextProviderBuilder(log *logger.Logger, c *config.Config, _ bool) (corec
if p.CheckInterval <= 0 {
p.CheckInterval = DefaultCheckInterval
}

p.fqdnFFChangeCh = make(chan struct{}, 1)
err := features.AddFQDNOnChangeCallback(
p.onFQDNFeatureFlagChange,
fqdnFeatureFlagCallbackID,
)
if err != nil {
return nil, fmt.Errorf("unable to add FQDN onChange callback in host provider: %w", err)
}

return p, nil
}

Expand Down
21 changes: 9 additions & 12 deletions internal/pkg/composable/providers/host/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ func TestFQDNFeatureFlagToggle(t *testing.T) {
provider, err := builder(log, c, true)
require.NoError(t, err)

hostProvider, ok := provider.(*contextProvider)
require.True(t, ok)
defer func() {
err := hostProvider.Close()
require.NoError(t, err)
}()

ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
Expand All @@ -109,26 +116,16 @@ func TestFQDNFeatureFlagToggle(t *testing.T) {

// Track the number of times hostProvider.fetcher is called.
numCalled := 0
hostProvider, ok := provider.(*contextProvider)
require.True(t, ok)

hostProvider.fetcher = func() (map[string]interface{}, error) {
numCalled++
return nil, nil
}

// Run the provider
go func() {
err = provider.Run(comm)
err = hostProvider.Run(comm)
}()

// Wait long enough for provider.Run to register
// the FQDN feature flag onChange callback.
numCallbacks := features.NumFQDNOnChangeCallbacks()
require.Eventually(t, func() bool {
return features.NumFQDNOnChangeCallbacks() == numCallbacks+1
}, 100*time.Millisecond, 10*time.Millisecond)

// Trigger the FQDN feature flag callback by
// toggling the FQDN feature flag
err = features.Apply(config.MustNewConfigFrom(map[string]interface{}{
Expand All @@ -143,7 +140,7 @@ func TestFQDNFeatureFlagToggle(t *testing.T) {
// - once, right after the provider is run, and
// - once again, when the FQDN feature flag callback is triggered
return numCalled == 2
}, 100*time.Millisecond, 10*time.Millisecond)
}, 10*time.Second, 100*time.Millisecond)
}

func returnHostMapping(log *logger.Logger) infoFetcher {
Expand Down
8 changes: 8 additions & 0 deletions internal/pkg/core/composable/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,11 @@ type ContextProvider interface {
// Run runs the context provider.
Run(ContextProviderComm) error
}

// CloseableProvider is an interface that providers may choose to implement
// if it makes sense for them, e.g. if they have any resources that need
// cleaning up after the provider's (final) run.
type CloseableProvider interface {
// Close is called after all runs of the provider have finished.
Close() error
}
9 changes: 0 additions & 9 deletions pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,6 @@ func RemoveFQDNOnChangeCallback(id string) {
delete(current.fqdnCallbacks, id)
}

// NumFQDNOnChangeCallbacks returns the number of FQDN onChange
// callbacks currently registered. Useful for testing.
func NumFQDNOnChangeCallbacks() int {
current.mu.RLock()
defer current.mu.RUnlock()

return len(current.fqdnCallbacks)
}

// setFQDN sets the value of the FQDN flag in Flags.
func (f *Flags) setFQDN(newValue bool) {
f.mu.Lock()
Expand Down

0 comments on commit 18de34e

Please sign in to comment.