Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial support for beats central management #4228

Merged
merged 8 commits into from
Oct 1, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 79 additions & 44 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/instrumentation"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -67,66 +68,43 @@ func NewCreator(args CreatorParams) beat.Creator {
if err := checkConfig(logger); err != nil {
return nil, err
}
var esOutputCfg *common.Config
if isElasticsearchOutput(b) {
esOutputCfg = b.Config.Output.Config()
bt := &beater{
rawConfig: ucfg,
stopped: false,
logger: logger,
wrapRunServer: args.WrapRunServer,
}

beaterConfig, err := config.NewConfig(ucfg, esOutputCfg)
esOutputCfg := elasticsearchOutputConfig(b)

var err error
bt.config, err = config.NewConfig(bt.rawConfig, esOutputCfg)
if err != nil {
return nil, err
}
// send configs to telemetry
recordConfigs(b.Info, beaterConfig, ucfg, logger)

bt := &beater{
config: beaterConfig,
stopped: false,
logger: logger,
wrapRunServer: args.WrapRunServer,
}
// setup pipelines if explicitly directed to or setup --pipelines and config is not set at all,
// and apm-server is not running supervised by Elastic Agent
shouldSetupPipelines := bt.config.Register.Ingest.Pipeline.IsEnabled() ||
(b.InSetupCmd && bt.config.Register.Ingest.Pipeline.Enabled == nil)
runningUnderElasticAgent := b.Manager != nil && b.Manager.Enabled()

// setup pipelines if explicitly directed to or setup --pipelines and config is not set at all
shouldSetupPipelines := beaterConfig.Register.Ingest.Pipeline.IsEnabled() ||
(b.InSetupCmd && beaterConfig.Register.Ingest.Pipeline.Enabled == nil)
if isElasticsearchOutput(b) && shouldSetupPipelines {
logger.Info("Registering pipeline callback")
if esOutputCfg != nil && shouldSetupPipelines && !runningUnderElasticAgent {
bt.logger.Info("Registering pipeline callback")
err := bt.registerPipelineCallback(b)
if err != nil {
return nil, err
}
} else {
logger.Info("No pipeline callback registered")
bt.logger.Info("No pipeline callback registered")
}
return bt, nil
}
}

// checkConfig verifies the global configuration doesn't use unsupported settings
func checkConfig(logger *logp.Logger) error {
cfg, err := cfgfile.Load("", nil)
if err != nil {
// responsibility for failing to load configuration lies elsewhere
// this is not reachable after going through normal beat creation
return nil
}

var s struct {
Dashboards *common.Config `config:"setup.dashboards"`
}
if err := cfg.Unpack(&s); err != nil {
return err
}
if s.Dashboards != nil {
if s.Dashboards.Enabled() {
return errSetupDashboardRemoved
}
logger.Warn(errSetupDashboardRemoved)
return bt, nil
}
return nil
}

type beater struct {
rawConfig *common.Config
config *config.Config
logger *logp.Logger
wrapRunServer func(RunServerFunc) RunServerFunc
Expand All @@ -140,6 +118,35 @@ type beater struct {
// or a fatal error occurs.
func (bt *beater) Run(b *beat.Beat) error {

done := make(chan struct{})

var reloadOnce sync.Once
var reloadable = reload.ReloadableFunc(func(ucfg *reload.ConfigWithMeta) error {
var err error
// Elastic Agent might call ReloadableFunc many times, but we only need to act upon the first call,
// during startup. This might change when APM Server is included in Fleet
reloadOnce.Do(func() {
defer close(done)
var cfg *config.Config
cfg, err = config.NewConfig(ucfg.Config, elasticsearchOutputConfig(b))
if err != nil {
bt.logger.Warn("Could not parse configuration from Elastic Agent ", err)
}
bt.config = cfg
bt.rawConfig = ucfg.Config
bt.logger.Info("Applying configuration from Elastic Agent... ")
})
return err
})
if b.Manager != nil && b.Manager.Enabled() {
bt.logger.Info("Running under Elastic Agent, waiting for configuration... ")
reload.Register.MustRegister("inputs", reloadable)
<-done
}

// send configs to telemetry
recordConfigs(b.Info, bt.config, bt.rawConfig, bt.logger)

tracer, tracerServer, err := bt.initTracing(b)
if err != nil {
return err
Expand Down Expand Up @@ -212,8 +219,36 @@ func (bt *beater) Run(b *beat.Beat) error {
})
}

func isElasticsearchOutput(b *beat.Beat) bool {
return b.Config != nil && b.Config.Output.Name() == "elasticsearch"
// checkConfig verifies the global configuration doesn't use unsupported settings
func checkConfig(logger *logp.Logger) error {
cfg, err := cfgfile.Load("", nil)
if err != nil {
// responsibility for failing to load configuration lies elsewhere
// this is not reachable after going through normal beat creation
return nil
}

var s struct {
Dashboards *common.Config `config:"setup.dashboards"`
}
if err := cfg.Unpack(&s); err != nil {
return err
}
if s.Dashboards != nil {
if s.Dashboards.Enabled() {
return errSetupDashboardRemoved
}
logger.Warn(errSetupDashboardRemoved)
}
return nil
}

// elasticsearchOutputConfig returns nil if the output is not elasticsearch
func elasticsearchOutputConfig(b *beat.Beat) *common.Config {
if b.Config != nil && b.Config.Output.Name() == "elasticsearch" {
return b.Config.Output.Config()
}
return nil
}

func (bt *beater) registerPipelineCallback(b *beat.Beat) error {
Expand Down