Skip to content

Commit

Permalink
[Elastic-Agent] Make monitoring settings configurable by fleet (elast…
Browse files Browse the repository at this point in the history
…ic#17855)

[Elastic-Agent] Make monitoring settings configurable by fleet (elastic#17855)
  • Loading branch information
michalpristas committed Apr 22, 2020
1 parent 448f345 commit 2fa49c8
Show file tree
Hide file tree
Showing 16 changed files with 232 additions and 166 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@
- Introduced `mage demo` command {pull}17312[17312]
- Display the stability of the agent at enroll and start. {pull}17336[17336]
- Expose stream.* variables in events {pull}17468[17468]
- Monitoring configuration reloadable {pull}17855[17855]
12 changes: 11 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (
type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error)
type filterFunc = func(*logger.Logger, *transpiler.AST) error

type reloadable interface {
Reload(cfg *config.Config) error
}

type configModifiers struct {
Filters []filterFunc
Decorators []decoratorFunc
}

func emitter(log *logger.Logger, router *router, modifiers *configModifiers) emitterFunc {
func emitter(log *logger.Logger, router *router, modifiers *configModifiers, reloadables ...reloadable) emitterFunc {
return func(c *config.Config) error {
if err := InjectAgentConfig(c); err != nil {
return err
Expand Down Expand Up @@ -62,6 +66,12 @@ func emitter(log *logger.Logger, router *router, modifiers *configModifiers) emi
}
}

for _, r := range reloadables {
if err := r.Reload(c); err != nil {
return err
}
}

return router.Dispatch(ast.HashStr(), programsToRun)
}
}
Expand Down
14 changes: 10 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/dir"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log"
Expand Down Expand Up @@ -50,7 +51,7 @@ func newLocal(
ctx context.Context,
log *logger.Logger,
pathConfigFile string,
config *config.Config,
rawConfig *config.Config,
) (*Local, error) {
var err error
if log == nil {
Expand All @@ -65,7 +66,7 @@ func newLocal(
}

c := localConfigDefault()
if err := config.Unpack(c); err != nil {
if err := rawConfig.Unpack(c); err != nil {
return nil, errors.New(err, "initialize local mode")
}

Expand All @@ -80,13 +81,18 @@ func newLocal(

reporter := reporting.NewReporter(localApplication.bgContext, log, localApplication.agentInfo, logR)

router, err := newRouter(log, streamFactory(localApplication.bgContext, config, nil, reporter))
monitor, err := monitoring.NewMonitor(rawConfig)
if err != nil {
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, nil, reporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}

discover := discoverer(pathConfigFile, c.Management.Path)
emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}})
emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}, monitor)

var cfgSource source
if !c.Management.Reload.Enabled {
Expand Down
9 changes: 7 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter"
fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet"
Expand Down Expand Up @@ -118,13 +119,17 @@ func newManaged(
}

combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR, fleetR)
monitor, err := monitoring.NewMonitor(rawConfig)
if err != nil {
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter))
router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}

emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}})
emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}, monitor)
acker, err := newActionAcker(log, agentInfo, client)
if err != nil {
return nil, err
Expand Down
9 changes: 6 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
)

// EventProcessor is an processor of application event
Expand All @@ -44,6 +45,7 @@ type sender interface {
type operatorStream struct {
configHandler ConfigHandler
log *logger.Logger
monitor monitoring.Monitor
}

func (b *operatorStream) Close() error {
Expand All @@ -54,10 +56,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (stream, error) {
func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(ctx, log, id, cfg, r)
operator, err := newOperator(ctx, log, id, cfg, r, m)
if err != nil {
return nil, err
}
Expand All @@ -69,7 +71,7 @@ func streamFactory(ctx context.Context, cfg *config.Config, client sender, r rep
}
}

func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter) (*operation.Operator, error) {
func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter, m monitoring.Monitor) (*operation.Operator, error) {
operatorConfig := &operatorCfg.Config{}
if err := config.Unpack(&operatorConfig); err != nil {
return nil, err
Expand All @@ -95,5 +97,6 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config
installer,
stateResolver,
r,
m,
)
}
7 changes: 2 additions & 5 deletions x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry"
)
Expand All @@ -36,9 +36,6 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg.
DownloadConfig: &artifact.Config{
InstallPath: installPath,
},
MonitoringConfig: &monitoring.Config{
MonitorMetrics: false,
},
}

cfg, err := config.NewConfigFrom(operatorConfig)
Expand All @@ -56,7 +53,7 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg.
t.Fatal(err)
}

operator, err := NewOperator(context.Background(), l, "p1", cfg, fetcher, installer, stateResolver, nil)
operator, err := NewOperator(context.Background(), l, "p1", cfg, fetcher, installer, stateResolver, nil, noop.NewMonitor())
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 0 additions & 3 deletions x-pack/elastic-agent/pkg/agent/operation/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package config

import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry"
)
Expand All @@ -17,6 +16,4 @@ type Config struct {
RetryConfig *retry.Config `yaml:"retry" config:"retry"`

DownloadConfig *artifact.Config `yaml:"download" config:"download"`

MonitoringConfig *monitoring.Config `yaml:"settings.monitoring" config:"settings.monitoring"`
}
63 changes: 9 additions & 54 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,18 @@ import (

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
)

const (
monitoringName = "FLEET_MONITORING"
settingsKey = "settings"
monitoringKey = "monitoring"
outputKey = "output"
monitoringEnabledSubkey = "enabled"
)

func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
cfg, err := getConfigFromStep(s)
if err != nil {
return errors.New(err,
errors.TypeConfig,
"operator.handleStartSidecar failed to retrieve config from step")
}

// if monitoring is disabled and running stop it
if isEnabled := isMonitoringEnabled(o.logger, cfg); !isEnabled {
if !o.monitor.IsMonitoringEnabled() {
if o.isMonitoring {
o.logger.Info("operator.handleStartSidecar: monitoring is running and disabled, proceeding to stop")
return o.handleStopSidecar(s)
Expand Down Expand Up @@ -97,42 +86,6 @@ func monitoringTags() map[app.Tag]string {
}
}

func isMonitoringEnabled(logger *logger.Logger, cfg map[string]interface{}) bool {
settingsVal, found := cfg[settingsKey]
if !found {
logger.Error("operator.isMonitoringEnabled: settings not found in config")
return false
}

settingsMap, ok := settingsVal.(map[string]interface{})
if !ok {
logger.Error("operator.isMonitoringEnabled: settings not a map")
return false
}

monitoringVal, found := settingsMap[monitoringKey]
if !found {
logger.Error("operator.isMonitoringEnabled: settings.monitoring not found in config")
return false
}

monitoringMap, ok := monitoringVal.(map[string]interface{})
if !ok {
logger.Error("operator.isMonitoringEnabled: settings.monitoring not a map")
return false
}

enabledVal, found := monitoringMap[monitoringEnabledSubkey]
if !found {
logger.Infof("operator.isMonitoringEnabled: monitoring.enabled key not found: %v", monitoringMap)
return false
}

enabled, ok := enabledVal.(bool)

return enabled && ok
}

func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.Step {
// get output
config, err := getConfigFromStep(step)
Expand All @@ -159,13 +112,13 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.S
return nil
}

return o.generateMonitoringSteps(o.config.MonitoringConfig, step.Version, output)
return o.generateMonitoringSteps(step.Version, output)
}

func (o *Operator) generateMonitoringSteps(cfg *monitoring.Config, version string, output interface{}) []configrequest.Step {
func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step {
var steps []configrequest.Step

if cfg.MonitorLogs {
if o.monitor.WatchLogs() {
fbConfig, any := o.getMonitoringFilebeatConfig(output)
stepID := configrequest.StepRun
if !any {
Expand All @@ -183,7 +136,7 @@ func (o *Operator) generateMonitoringSteps(cfg *monitoring.Config, version strin
steps = append(steps, filebeatStep)
}

if cfg.MonitorMetrics {
if o.monitor.WatchMetrics() {
mbConfig, any := o.getMonitoringMetricbeatConfig(output)
stepID := configrequest.StepRun
if !any {
Expand Down Expand Up @@ -217,6 +170,7 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
map[string]interface{}{
"type": "log",
"paths": paths,
"index": "logs-agent-default",
},
},
},
Expand All @@ -242,6 +196,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
"metricsets": []string{"stats", "state"},
"period": "10s",
"hosts": hosts,
"index": "metrics-agent-default",
},
},
},
Expand All @@ -260,7 +215,7 @@ func (o *Operator) getLogFilePaths() []string {
defer o.appsLock.Unlock()

for _, a := range o.apps {
logPath := a.Monitor().LogPath()
logPath := a.Monitor().LogPath(a.Name(), o.pipelineID)
if logPath != "" {
paths = append(paths, logPath)
}
Expand All @@ -276,7 +231,7 @@ func (o *Operator) getMetricbeatEndpoints() []string {
defer o.appsLock.Unlock()

for _, a := range o.apps {
metricEndpoint := a.Monitor().MetricsPathPrefixed()
metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Name(), o.pipelineID)
if metricEndpoint != "" {
endpoints = append(endpoints, metricEndpoint)
}
Expand Down
Loading

0 comments on commit 2fa49c8

Please sign in to comment.