diff --git a/beater/beater.go b/beater/beater.go index 02f172abc6a..501d36f341d 100644 --- a/beater/beater.go +++ b/beater/beater.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/instrumentation" "github.com/elastic/beats/v7/libbeat/logp" @@ -67,66 +68,43 @@ func NewCreator(args CreatorParams) beat.Creator { if err := checkConfig(logger); err != nil { return nil, err } - var esOutputCfg *common.Config - if isElasticsearchOutput(b) { - esOutputCfg = b.Config.Output.Config() + bt := &beater{ + rawConfig: ucfg, + stopped: false, + logger: logger, + wrapRunServer: args.WrapRunServer, } - beaterConfig, err := config.NewConfig(ucfg, esOutputCfg) + esOutputCfg := elasticsearchOutputConfig(b) + + var err error + bt.config, err = config.NewConfig(bt.rawConfig, esOutputCfg) if err != nil { return nil, err } - // send configs to telemetry - recordConfigs(b.Info, beaterConfig, ucfg, logger) - bt := &beater{ - config: beaterConfig, - stopped: false, - logger: logger, - wrapRunServer: args.WrapRunServer, - } + // setup pipelines if explicitly directed to or setup --pipelines and config is not set at all, + // and apm-server is not running supervised by Elastic Agent + shouldSetupPipelines := bt.config.Register.Ingest.Pipeline.IsEnabled() || + (b.InSetupCmd && bt.config.Register.Ingest.Pipeline.Enabled == nil) + runningUnderElasticAgent := b.Manager != nil && b.Manager.Enabled() - // setup pipelines if explicitly directed to or setup --pipelines and config is not set at all - shouldSetupPipelines := beaterConfig.Register.Ingest.Pipeline.IsEnabled() || - (b.InSetupCmd && beaterConfig.Register.Ingest.Pipeline.Enabled == nil) - if isElasticsearchOutput(b) && shouldSetupPipelines { - logger.Info("Registering pipeline callback") + if esOutputCfg != nil && shouldSetupPipelines && !runningUnderElasticAgent { + bt.logger.Info("Registering pipeline callback") err := bt.registerPipelineCallback(b) if err != nil { return nil, err } } else { - logger.Info("No pipeline callback registered") + bt.logger.Info("No pipeline callback registered") } - return bt, nil - } -} -// checkConfig verifies the global configuration doesn't use unsupported settings -func checkConfig(logger *logp.Logger) error { - cfg, err := cfgfile.Load("", nil) - if err != nil { - // responsibility for failing to load configuration lies elsewhere - // this is not reachable after going through normal beat creation - return nil - } - - var s struct { - Dashboards *common.Config `config:"setup.dashboards"` - } - if err := cfg.Unpack(&s); err != nil { - return err - } - if s.Dashboards != nil { - if s.Dashboards.Enabled() { - return errSetupDashboardRemoved - } - logger.Warn(errSetupDashboardRemoved) + return bt, nil } - return nil } type beater struct { + rawConfig *common.Config config *config.Config logger *logp.Logger wrapRunServer func(RunServerFunc) RunServerFunc @@ -140,6 +118,35 @@ type beater struct { // or a fatal error occurs. func (bt *beater) Run(b *beat.Beat) error { + done := make(chan struct{}) + + var reloadOnce sync.Once + var reloadable = reload.ReloadableFunc(func(ucfg *reload.ConfigWithMeta) error { + var err error + // Elastic Agent might call ReloadableFunc many times, but we only need to act upon the first call, + // during startup. This might change when APM Server is included in Fleet + reloadOnce.Do(func() { + defer close(done) + var cfg *config.Config + cfg, err = config.NewConfig(ucfg.Config, elasticsearchOutputConfig(b)) + if err != nil { + bt.logger.Warn("Could not parse configuration from Elastic Agent ", err) + } + bt.config = cfg + bt.rawConfig = ucfg.Config + bt.logger.Info("Applying configuration from Elastic Agent... ") + }) + return err + }) + if b.Manager != nil && b.Manager.Enabled() { + bt.logger.Info("Running under Elastic Agent, waiting for configuration... ") + reload.Register.MustRegister("inputs", reloadable) + <-done + } + + // send configs to telemetry + recordConfigs(b.Info, bt.config, bt.rawConfig, bt.logger) + tracer, tracerServer, err := bt.initTracing(b) if err != nil { return err @@ -212,8 +219,36 @@ func (bt *beater) Run(b *beat.Beat) error { }) } -func isElasticsearchOutput(b *beat.Beat) bool { - return b.Config != nil && b.Config.Output.Name() == "elasticsearch" +// checkConfig verifies the global configuration doesn't use unsupported settings +func checkConfig(logger *logp.Logger) error { + cfg, err := cfgfile.Load("", nil) + if err != nil { + // responsibility for failing to load configuration lies elsewhere + // this is not reachable after going through normal beat creation + return nil + } + + var s struct { + Dashboards *common.Config `config:"setup.dashboards"` + } + if err := cfg.Unpack(&s); err != nil { + return err + } + if s.Dashboards != nil { + if s.Dashboards.Enabled() { + return errSetupDashboardRemoved + } + logger.Warn(errSetupDashboardRemoved) + } + return nil +} + +// elasticsearchOutputConfig returns nil if the output is not elasticsearch +func elasticsearchOutputConfig(b *beat.Beat) *common.Config { + if b.Config != nil && b.Config.Output.Name() == "elasticsearch" { + return b.Config.Output.Config() + } + return nil } func (bt *beater) registerPipelineCallback(b *beat.Beat) error {