Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic-Agent] Make monitoring settings configurable by fleet #17855

Merged
merged 8 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@
- Introduced `mage demo` command {pull}17312[17312]
- Display the stability of the agent at enroll and start. {pull}17336[17336]
- Expose stream.* variables in events {pull}17468[17468]
- Monitoring configuration reloadable {pull}17855[17855]
12 changes: 11 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (
type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error)
type filterFunc = func(*logger.Logger, *transpiler.AST) error

type reloadable interface {
Reload(cfg *config.Config) error
}

type configModifiers struct {
Filters []filterFunc
Decorators []decoratorFunc
}

func emitter(log *logger.Logger, router *router, modifiers *configModifiers) emitterFunc {
func emitter(log *logger.Logger, router *router, modifiers *configModifiers, reloadables ...reloadable) emitterFunc {
return func(c *config.Config) error {
if err := InjectAgentConfig(c); err != nil {
return err
Expand Down Expand Up @@ -62,6 +66,12 @@ func emitter(log *logger.Logger, router *router, modifiers *configModifiers) emi
}
}

for _, r := range reloadables {
if err := r.Reload(c); err != nil {
return err
}
}

return router.Dispatch(ast.HashStr(), programsToRun)
}
}
Expand Down
14 changes: 10 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/dir"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log"
Expand Down Expand Up @@ -50,7 +51,7 @@ func newLocal(
ctx context.Context,
log *logger.Logger,
pathConfigFile string,
config *config.Config,
rawConfig *config.Config,
) (*Local, error) {
var err error
if log == nil {
Expand All @@ -65,7 +66,7 @@ func newLocal(
}

c := localConfigDefault()
if err := config.Unpack(c); err != nil {
if err := rawConfig.Unpack(c); err != nil {
return nil, errors.New(err, "initialize local mode")
}

Expand All @@ -80,13 +81,18 @@ func newLocal(

reporter := reporting.NewReporter(localApplication.bgContext, log, localApplication.agentInfo, logR)

router, err := newRouter(log, streamFactory(localApplication.bgContext, config, nil, reporter))
monitor, err := monitoring.NewMonitor(rawConfig)
if err != nil {
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, nil, reporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}

discover := discoverer(pathConfigFile, c.Management.Path)
emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}})
emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}, monitor)

var cfgSource source
if !c.Management.Reload.Enabled {
Expand Down
9 changes: 7 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet"
Expand Down Expand Up @@ -118,13 +119,17 @@ func newManaged(
}

combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR, fleetR)
monitor, err := monitoring.NewMonitor(rawConfig)
if err != nil {
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter))
router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}

emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}})
emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}, monitor)
acker, err := newActionAcker(log, agentInfo, client)
if err != nil {
return nil, err
Expand Down
9 changes: 6 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
)

// EventProcessor is an processor of application event
Expand All @@ -44,6 +45,7 @@ type sender interface {
type operatorStream struct {
configHandler ConfigHandler
log *logger.Logger
monitor monitoring.Monitor
}

func (b *operatorStream) Close() error {
Expand All @@ -54,10 +56,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (stream, error) {
func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(ctx, log, id, cfg, r)
operator, err := newOperator(ctx, log, id, cfg, r, m)
if err != nil {
return nil, err
}
Expand All @@ -69,7 +71,7 @@ func streamFactory(ctx context.Context, cfg *config.Config, client sender, r rep
}
}

func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter) (*operation.Operator, error) {
func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter, m monitoring.Monitor) (*operation.Operator, error) {
operatorConfig := &operatorCfg.Config{}
if err := config.Unpack(&operatorConfig); err != nil {
return nil, err
Expand All @@ -95,5 +97,6 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config
installer,
stateResolver,
r,
m,
)
}
7 changes: 2 additions & 5 deletions x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry"
)
Expand All @@ -36,9 +36,6 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg.
DownloadConfig: &artifact.Config{
InstallPath: installPath,
},
MonitoringConfig: &monitoring.Config{
MonitorMetrics: false,
},
}

cfg, err := config.NewConfigFrom(operatorConfig)
Expand All @@ -56,7 +53,7 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg.
t.Fatal(err)
}

operator, err := NewOperator(context.Background(), l, "p1", cfg, fetcher, installer, stateResolver, nil)
operator, err := NewOperator(context.Background(), l, "p1", cfg, fetcher, installer, stateResolver, nil, noop.NewMonitor())
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 0 additions & 3 deletions x-pack/elastic-agent/pkg/agent/operation/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package config

import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry"
)
Expand All @@ -17,6 +16,4 @@ type Config struct {
RetryConfig *retry.Config `yaml:"retry" config:"retry"`

DownloadConfig *artifact.Config `yaml:"download" config:"download"`

MonitoringConfig *monitoring.Config `yaml:"settings.monitoring" config:"settings.monitoring"`
}
63 changes: 9 additions & 54 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,18 @@ import (

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
)

const (
monitoringName = "FLEET_MONITORING"
settingsKey = "settings"
monitoringKey = "monitoring"
outputKey = "output"
monitoringEnabledSubkey = "enabled"
)

func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
cfg, err := getConfigFromStep(s)
if err != nil {
return errors.New(err,
errors.TypeConfig,
"operator.handleStartSidecar failed to retrieve config from step")
}

// if monitoring is disabled and running stop it
if isEnabled := isMonitoringEnabled(o.logger, cfg); !isEnabled {
if !o.monitor.IsMonitoringEnabled() {
if o.isMonitoring {
o.logger.Info("operator.handleStartSidecar: monitoring is running and disabled, proceeding to stop")
return o.handleStopSidecar(s)
Expand Down Expand Up @@ -97,42 +86,6 @@ func monitoringTags() map[app.Tag]string {
}
}

func isMonitoringEnabled(logger *logger.Logger, cfg map[string]interface{}) bool {
settingsVal, found := cfg[settingsKey]
if !found {
logger.Error("operator.isMonitoringEnabled: settings not found in config")
return false
}

settingsMap, ok := settingsVal.(map[string]interface{})
if !ok {
logger.Error("operator.isMonitoringEnabled: settings not a map")
return false
}

monitoringVal, found := settingsMap[monitoringKey]
if !found {
logger.Error("operator.isMonitoringEnabled: settings.monitoring not found in config")
return false
}

monitoringMap, ok := monitoringVal.(map[string]interface{})
if !ok {
logger.Error("operator.isMonitoringEnabled: settings.monitoring not a map")
return false
}

enabledVal, found := monitoringMap[monitoringEnabledSubkey]
if !found {
logger.Infof("operator.isMonitoringEnabled: monitoring.enabled key not found: %v", monitoringMap)
return false
}

enabled, ok := enabledVal.(bool)

return enabled && ok
}

func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.Step {
// get output
config, err := getConfigFromStep(step)
Expand All @@ -159,13 +112,13 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.S
return nil
}

return o.generateMonitoringSteps(o.config.MonitoringConfig, step.Version, output)
return o.generateMonitoringSteps(step.Version, output)
}

func (o *Operator) generateMonitoringSteps(cfg *monitoring.Config, version string, output interface{}) []configrequest.Step {
func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step {
var steps []configrequest.Step

if cfg.MonitorLogs {
if o.monitor.WatchLogs() {
fbConfig, any := o.getMonitoringFilebeatConfig(output)
stepID := configrequest.StepRun
if !any {
Expand All @@ -183,7 +136,7 @@ func (o *Operator) generateMonitoringSteps(cfg *monitoring.Config, version strin
steps = append(steps, filebeatStep)
}

if cfg.MonitorMetrics {
if o.monitor.WatchMetrics() {
mbConfig, any := o.getMonitoringMetricbeatConfig(output)
stepID := configrequest.StepRun
if !any {
Expand Down Expand Up @@ -217,6 +170,7 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
map[string]interface{}{
"type": "log",
"paths": paths,
"index": "logs-agent-default",
},
},
},
Expand Down Expand Up @@ -244,6 +198,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
"metricsets": []string{"stats", "state"},
"period": "10s",
"hosts": hosts,
"index": "metrics-agent-default",
},
},
},
Expand All @@ -264,7 +219,7 @@ func (o *Operator) getLogFilePaths() []string {
defer o.appsLock.Unlock()

for _, a := range o.apps {
logPath := a.Monitor().LogPath()
logPath := a.Monitor().LogPath(a.Name(), o.pipelineID)
if logPath != "" {
paths = append(paths, logPath)
}
Expand All @@ -280,7 +235,7 @@ func (o *Operator) getMetricbeatEndpoints() []string {
defer o.appsLock.Unlock()

for _, a := range o.apps {
metricEndpoint := a.Monitor().MetricsPathPrefixed()
metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Name(), o.pipelineID)
if metricEndpoint != "" {
endpoints = append(endpoints, metricEndpoint)
}
Expand Down
Loading