diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 312dc4589c15..99ac6b776c40 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -33,3 +33,4 @@ - Introduced `mage demo` command {pull}17312[17312] - Display the stability of the agent at enroll and start. {pull}17336[17336] - Expose stream.* variables in events {pull}17468[17468] +- Monitoring configuration reloadable {pull}17855[17855] diff --git a/x-pack/elastic-agent/pkg/agent/application/emitter.go b/x-pack/elastic-agent/pkg/agent/application/emitter.go index 2e4c07bf0ddb..2279d78565d9 100644 --- a/x-pack/elastic-agent/pkg/agent/application/emitter.go +++ b/x-pack/elastic-agent/pkg/agent/application/emitter.go @@ -17,12 +17,16 @@ import ( type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error) type filterFunc = func(*logger.Logger, *transpiler.AST) error +type reloadable interface { + Reload(cfg *config.Config) error +} + type configModifiers struct { Filters []filterFunc Decorators []decoratorFunc } -func emitter(log *logger.Logger, router *router, modifiers *configModifiers) emitterFunc { +func emitter(log *logger.Logger, router *router, modifiers *configModifiers, reloadables ...reloadable) emitterFunc { return func(c *config.Config) error { if err := InjectAgentConfig(c); err != nil { return err @@ -62,6 +66,12 @@ func emitter(log *logger.Logger, router *router, modifiers *configModifiers) emi } } + for _, r := range reloadables { + if err := r.Reload(c); err != nil { + return err + } + } + return router.Dispatch(ast.HashStr(), programsToRun) } } diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index 58a05bc86d04..65a4927a55d3 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/dir" reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" logreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/log" @@ -50,7 +51,7 @@ func newLocal( ctx context.Context, log *logger.Logger, pathConfigFile string, - config *config.Config, + rawConfig *config.Config, ) (*Local, error) { var err error if log == nil { @@ -65,7 +66,7 @@ func newLocal( } c := localConfigDefault() - if err := config.Unpack(c); err != nil { + if err := rawConfig.Unpack(c); err != nil { return nil, errors.New(err, "initialize local mode") } @@ -80,13 +81,18 @@ func newLocal( reporter := reporting.NewReporter(localApplication.bgContext, log, localApplication.agentInfo, logR) - router, err := newRouter(log, streamFactory(localApplication.bgContext, config, nil, reporter)) + monitor, err := monitoring.NewMonitor(rawConfig) + if err != nil { + return nil, errors.New(err, "failed to initialize monitoring") + } + + router, err := newRouter(log, streamFactory(localApplication.bgContext, rawConfig, nil, reporter, monitor)) if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } discover := discoverer(pathConfigFile, c.Management.Path) - emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}) + emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}, monitor) var cfgSource source if !c.Management.Reload.Enabled { diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 5267c8fdfda2..6431ec039754 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -19,6 +19,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" reporting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter" fleetreporter "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/reporter/fleet" @@ -118,13 +119,17 @@ func newManaged( } combinedReporter := reporting.NewReporter(managedApplication.bgContext, log, agentInfo, logR, fleetR) + monitor, err := monitoring.NewMonitor(rawConfig) + if err != nil { + return nil, errors.New(err, "failed to initialize monitoring") + } - router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter)) + router, err := newRouter(log, streamFactory(managedApplication.bgContext, rawConfig, client, combinedReporter, monitor)) if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } - emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}) + emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}, monitor) acker, err := newActionAcker(log, agentInfo, client) if err != nil { return nil, err diff --git a/x-pack/elastic-agent/pkg/agent/application/stream.go b/x-pack/elastic-agent/pkg/agent/application/stream.go index ab3e6426093f..1e8a1f100487 100644 --- a/x-pack/elastic-agent/pkg/agent/application/stream.go +++ b/x-pack/elastic-agent/pkg/agent/application/stream.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" ) // EventProcessor is an processor of application event @@ -44,6 +45,7 @@ type sender interface { type operatorStream struct { configHandler ConfigHandler log *logger.Logger + monitor monitoring.Monitor } func (b *operatorStream) Close() error { @@ -54,10 +56,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error { return b.configHandler.HandleConfig(cfg) } -func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter) func(*logger.Logger, routingKey) (stream, error) { +func streamFactory(ctx context.Context, cfg *config.Config, client sender, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) { return func(log *logger.Logger, id routingKey) (stream, error) { // new operator per stream to isolate processes without using tags - operator, err := newOperator(ctx, log, id, cfg, r) + operator, err := newOperator(ctx, log, id, cfg, r, m) if err != nil { return nil, err } @@ -69,7 +71,7 @@ func streamFactory(ctx context.Context, cfg *config.Config, client sender, r rep } } -func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter) (*operation.Operator, error) { +func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, r reporter, m monitoring.Monitor) (*operation.Operator, error) { operatorConfig := &operatorCfg.Config{} if err := config.Unpack(&operatorConfig); err != nil { return nil, err @@ -95,5 +97,6 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config installer, stateResolver, r, + m, ) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/common_test.go b/x-pack/elastic-agent/pkg/agent/operation/common_test.go index a50b4dfc7a03..10a3aab90d1c 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/common_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/common_test.go @@ -17,7 +17,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry" ) @@ -36,9 +36,6 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg. DownloadConfig: &artifact.Config{ InstallPath: installPath, }, - MonitoringConfig: &monitoring.Config{ - MonitorMetrics: false, - }, } cfg, err := config.NewConfigFrom(operatorConfig) @@ -56,7 +53,7 @@ func getTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg. t.Fatal(err) } - operator, err := NewOperator(context.Background(), l, "p1", cfg, fetcher, installer, stateResolver, nil) + operator, err := NewOperator(context.Background(), l, "p1", cfg, fetcher, installer, stateResolver, nil, noop.NewMonitor()) if err != nil { t.Fatal(err) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/config/config.go b/x-pack/elastic-agent/pkg/agent/operation/config/config.go index 74a4212e83ca..f1c15df70071 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/config/config.go +++ b/x-pack/elastic-agent/pkg/agent/operation/config/config.go @@ -6,7 +6,6 @@ package config import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry" ) @@ -17,6 +16,4 @@ type Config struct { RetryConfig *retry.Config `yaml:"retry" config:"retry"` DownloadConfig *artifact.Config `yaml:"download" config:"download"` - - MonitoringConfig *monitoring.Config `yaml:"settings.monitoring" config:"settings.monitoring"` } diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index 8157bf9e876d..6c5e52e68b10 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -9,29 +9,18 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" ) const ( monitoringName = "FLEET_MONITORING" - settingsKey = "settings" - monitoringKey = "monitoring" outputKey = "output" monitoringEnabledSubkey = "enabled" ) func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { - cfg, err := getConfigFromStep(s) - if err != nil { - return errors.New(err, - errors.TypeConfig, - "operator.handleStartSidecar failed to retrieve config from step") - } - // if monitoring is disabled and running stop it - if isEnabled := isMonitoringEnabled(o.logger, cfg); !isEnabled { + if !o.monitor.IsMonitoringEnabled() { if o.isMonitoring { o.logger.Info("operator.handleStartSidecar: monitoring is running and disabled, proceeding to stop") return o.handleStopSidecar(s) @@ -97,42 +86,6 @@ func monitoringTags() map[app.Tag]string { } } -func isMonitoringEnabled(logger *logger.Logger, cfg map[string]interface{}) bool { - settingsVal, found := cfg[settingsKey] - if !found { - logger.Error("operator.isMonitoringEnabled: settings not found in config") - return false - } - - settingsMap, ok := settingsVal.(map[string]interface{}) - if !ok { - logger.Error("operator.isMonitoringEnabled: settings not a map") - return false - } - - monitoringVal, found := settingsMap[monitoringKey] - if !found { - logger.Error("operator.isMonitoringEnabled: settings.monitoring not found in config") - return false - } - - monitoringMap, ok := monitoringVal.(map[string]interface{}) - if !ok { - logger.Error("operator.isMonitoringEnabled: settings.monitoring not a map") - return false - } - - enabledVal, found := monitoringMap[monitoringEnabledSubkey] - if !found { - logger.Infof("operator.isMonitoringEnabled: monitoring.enabled key not found: %v", monitoringMap) - return false - } - - enabled, ok := enabledVal.(bool) - - return enabled && ok -} - func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.Step { // get output config, err := getConfigFromStep(step) @@ -159,13 +112,13 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.S return nil } - return o.generateMonitoringSteps(o.config.MonitoringConfig, step.Version, output) + return o.generateMonitoringSteps(step.Version, output) } -func (o *Operator) generateMonitoringSteps(cfg *monitoring.Config, version string, output interface{}) []configrequest.Step { +func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step { var steps []configrequest.Step - if cfg.MonitorLogs { + if o.monitor.WatchLogs() { fbConfig, any := o.getMonitoringFilebeatConfig(output) stepID := configrequest.StepRun if !any { @@ -183,7 +136,7 @@ func (o *Operator) generateMonitoringSteps(cfg *monitoring.Config, version strin steps = append(steps, filebeatStep) } - if cfg.MonitorMetrics { + if o.monitor.WatchMetrics() { mbConfig, any := o.getMonitoringMetricbeatConfig(output) stepID := configrequest.StepRun if !any { @@ -217,6 +170,7 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i map[string]interface{}{ "type": "log", "paths": paths, + "index": "logs-agent-default", }, }, }, @@ -242,6 +196,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string "metricsets": []string{"stats", "state"}, "period": "10s", "hosts": hosts, + "index": "metrics-agent-default", }, }, }, @@ -260,7 +215,7 @@ func (o *Operator) getLogFilePaths() []string { defer o.appsLock.Unlock() for _, a := range o.apps { - logPath := a.Monitor().LogPath() + logPath := a.Monitor().LogPath(a.Name(), o.pipelineID) if logPath != "" { paths = append(paths, logPath) } @@ -276,7 +231,7 @@ func (o *Operator) getMetricbeatEndpoints() []string { defer o.appsLock.Unlock() for _, a := range o.apps { - metricEndpoint := a.Monitor().MetricsPathPrefixed() + metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Name(), o.pipelineID) if metricEndpoint != "" { endpoints = append(endpoints, metricEndpoint) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index 4ae08d4001c7..826e5bbd03d6 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/beats" + monitoringConfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/retry" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" @@ -23,26 +23,27 @@ import ( func TestGenerateSteps(t *testing.T) { const sampleOutput = "sample-output" - operator, _ := getMonitorableTestOperator(t, "tests/scripts") type testCase struct { Name string - Config *monitoring.Config + Config *monitoringConfig.MonitoringConfig ExpectedSteps int FilebeatStep bool MetricbeatStep bool } testCases := []testCase{ - testCase{"NO monitoring", &monitoring.Config{MonitorLogs: false, MonitorMetrics: false}, 0, false, false}, - testCase{"FB monitoring", &monitoring.Config{MonitorLogs: true, MonitorMetrics: false}, 1, true, false}, - testCase{"MB monitoring", &monitoring.Config{MonitorLogs: false, MonitorMetrics: true}, 1, false, true}, - testCase{"ALL monitoring", &monitoring.Config{MonitorLogs: true, MonitorMetrics: true}, 2, true, true}, + {"NO monitoring", &monitoringConfig.MonitoringConfig{MonitorLogs: false, MonitorMetrics: false}, 0, false, false}, + {"FB monitoring", &monitoringConfig.MonitoringConfig{MonitorLogs: true, MonitorMetrics: false}, 1, true, false}, + {"MB monitoring", &monitoringConfig.MonitoringConfig{MonitorLogs: false, MonitorMetrics: true}, 1, false, true}, + {"ALL monitoring", &monitoringConfig.MonitoringConfig{MonitorLogs: true, MonitorMetrics: true}, 2, true, true}, } for _, tc := range testCases { t.Run(tc.Name, func(t *testing.T) { - steps := operator.generateMonitoringSteps(tc.Config, "8.0", sampleOutput) + m := &testMonitor{monitorLogs: tc.Config.MonitorLogs, monitorMetrics: tc.Config.MonitorMetrics} + operator, _ := getMonitorableTestOperator(t, "tests/scripts", m) + steps := operator.generateMonitoringSteps("8.0", sampleOutput) if actualSteps := len(steps); actualSteps != tc.ExpectedSteps { t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps) } @@ -91,7 +92,7 @@ func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s conf } } -func getMonitorableTestOperator(t *testing.T, installPath string) (*Operator, *operatorCfg.Config) { +func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.Monitor) (*Operator, *operatorCfg.Config) { operatorConfig := &operatorCfg.Config{ RetryConfig: &retry.Config{ Enabled: true, @@ -104,9 +105,6 @@ func getMonitorableTestOperator(t *testing.T, installPath string) (*Operator, *o InstallPath: installPath, OperatingSystem: "darwin", }, - MonitoringConfig: &monitoring.Config{ - MonitorMetrics: true, - }, } cfg, err := config.NewConfigFrom(operatorConfig) @@ -124,19 +122,19 @@ func getMonitorableTestOperator(t *testing.T, installPath string) (*Operator, *o t.Fatal(err) } ctx := context.Background() - operator, err := NewOperator(ctx, l, "p1", cfg, fetcher, installer, stateResolver, nil) + + operator, err := NewOperator(ctx, l, "p1", cfg, fetcher, installer, stateResolver, nil, m) if err != nil { t.Fatal(err) } - monitor := beats.NewMonitor("dummmy", "p1234", &artifact.Config{OperatingSystem: "linux", InstallPath: "/install/path"}, true, true) - operator.apps["dummy"] = &testMonitorableApp{monitor: monitor} + operator.apps["dummy"] = &testMonitorableApp{monitor: m} return operator, operatorConfig } type testMonitorableApp struct { - monitor *beats.Monitor + monitor monitoring.Monitor } func (*testMonitorableApp) Name() string { return "" } @@ -147,3 +145,53 @@ func (*testMonitorableApp) Configure(_ context.Context, config map[string]interf } func (*testMonitorableApp) State() state.State { return state.State{} } func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor } + +type testMonitor struct { + monitorLogs bool + monitorMetrics bool +} + +// EnrichArgs enriches arguments provided to application, in order to enable +// monitoring +func (b *testMonitor) EnrichArgs(_ string, _ string, args []string) []string { return args } + +// Cleanup cleans up all drops. +func (b *testMonitor) Cleanup(string, string) error { return nil } + +// Prepare executes steps in order for monitoring to work correctly +func (b *testMonitor) Prepare(string, string, int, int) error { return nil } + +// LogPath describes a path where application stores logs. Empty if +// application is not monitorable +func (b *testMonitor) LogPath(string, string) string { + if !b.monitorLogs { + return "" + } + return "path" +} + +// MetricsPath describes a location where application exposes metrics +// collectable by metricbeat. +func (b *testMonitor) MetricsPath(string, string) string { + if !b.monitorMetrics { + return "" + } + return "path" +} + +// MetricsPathPrefixed return metrics path prefixed with http+ prefix. +func (b *testMonitor) MetricsPathPrefixed(string, string) string { + return "http+path" +} + +// Reload reloads state based on configuration. +func (b *testMonitor) Reload(cfg *config.Config) error { return nil } + +// IsMonitoringEnabled returns true if monitoring is configured. +func (b *testMonitor) IsMonitoringEnabled() bool { return b.monitorLogs || b.monitorMetrics } + +// WatchLogs return true if monitoring is configured and monitoring logs is enabled. +func (b *testMonitor) WatchLogs() bool { return b.monitorLogs } + +// WatchMetrics return true if monitoring is configured and monitoring metrics is enabled. +func (b *testMonitor) WatchMetrics() bool { return b.monitorMetrics } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index ed72f3884d8b..4a2b2208ee8c 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -41,6 +41,7 @@ type Operator struct { handlers map[string]handleFunc stateResolver *stateresolver.StateResolver eventProcessor callbackHooks + monitor monitoring.Monitor isMonitoring bool apps map[string]Application @@ -61,7 +62,8 @@ func NewOperator( fetcher download.Downloader, installer install.Installer, stateResolver *stateresolver.StateResolver, - eventProcessor callbackHooks) (*Operator, error) { + eventProcessor callbackHooks, + monitor monitoring.Monitor) (*Operator, error) { operatorConfig := defaultOperatorConfig() if err := config.Unpack(&operatorConfig); err != nil { @@ -86,6 +88,7 @@ func NewOperator( stateResolver: stateResolver, apps: make(map[string]Application), eventProcessor: eventProcessor, + monitor: monitor, } operator.initHandlerMap() @@ -98,10 +101,6 @@ func NewOperator( func defaultOperatorConfig() *operatorCfg.Config { return &operatorCfg.Config{ - MonitoringConfig: &monitoring.Config{ - MonitorLogs: false, - MonitorMetrics: false, - }, RetryConfig: &retry.Config{ Enabled: false, RetriesCount: 0, @@ -254,9 +253,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) { return nil, fmt.Errorf("descriptor is not an app.Specifier") } - monitor := monitoring.NewMonitor(isMonitorable(p), p.BinaryName(), o.pipelineID, o.config.DownloadConfig, o.config.MonitoringConfig.MonitorLogs, o.config.MonitoringConfig.MonitorMetrics) - - a, err := app.NewApplication(o.bgContext, p.ID(), p.BinaryName(), o.pipelineID, specifier, factory, o.config, o.logger, o.eventProcessor.OnFailing, monitor) + a, err := app.NewApplication(o.bgContext, p.ID(), p.BinaryName(), o.pipelineID, specifier, factory, o.config, o.logger, o.eventProcessor.OnFailing, o.monitor) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/app.go b/x-pack/elastic-agent/pkg/core/plugin/app/app.go index dcfd33851e4e..be361354a5e2 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/app.go @@ -139,7 +139,7 @@ func (a *Application) Stop() { } // cleanup drops - a.monitor.Cleanup() + a.monitor.Cleanup(a.name, a.pipelineID) } } @@ -199,7 +199,7 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState { } func (a *Application) reportCrash(ctx context.Context) { - a.monitor.Cleanup() + a.monitor.Cleanup(a.name, a.pipelineID) // TODO: reporting crash if a.failureReporter != nil { diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/beats/beats_monitor.go index eda80bf1ffe2..14d191c54779 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/beats/beats_monitor.go @@ -12,60 +12,84 @@ import ( "unicode" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + monitoringConfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config" ) const httpPlusPrefix = "http+" +type wrappedConfig struct { + MonitoringConfig *monitoringConfig.MonitoringConfig `config:"settings.monitoring" yaml:"settings.monitoring"` +} + // Monitor is a monitoring interface providing information about the way // how beat is monitored type Monitor struct { - pipelineID string - - process string - monitoringEndpoint string - loggingPath string - - monitorLogs bool - monitorMetrics bool + operatingSystem string + config *monitoringConfig.MonitoringConfig + installPath string } // NewMonitor creates a beats monitor. -func NewMonitor(process, pipelineID string, downloadConfig *artifact.Config, monitorLogs, monitorMetrics bool) *Monitor { - var monitoringEndpoint, loggingPath string - - if monitorMetrics { - monitoringEndpoint = getMonitoringEndpoint(process, downloadConfig.OS(), pipelineID) - } - if monitorLogs { - loggingPath = getLoggingFileDirectory(downloadConfig.InstallPath, downloadConfig.OS(), pipelineID) +func NewMonitor(downloadConfig *artifact.Config) *Monitor { + return &Monitor{ + operatingSystem: downloadConfig.OS(), + installPath: downloadConfig.InstallPath, + config: &monitoringConfig.MonitoringConfig{}, } +} - return &Monitor{ - pipelineID: pipelineID, - process: process, - monitoringEndpoint: monitoringEndpoint, - loggingPath: loggingPath, - monitorLogs: monitorLogs, - monitorMetrics: monitorMetrics, +// Reload reloads state of the monitoring based on config. +func (b *Monitor) Reload(rawConfig *config.Config) error { + cfg := &wrappedConfig{} + if err := rawConfig.Unpack(&cfg); err != nil { + return err } + + b.config = cfg.MonitoringConfig + return nil +} + +// IsMonitoringEnabled returns true if monitoring is enabled. +func (b *Monitor) IsMonitoringEnabled() bool { return b.config.Enabled } + +// WatchLogs returns true if monitoring is enabled and monitor should watch logs. +func (b *Monitor) WatchLogs() bool { return b.config.Enabled && b.config.MonitorLogs } + +// WatchMetrics returns true if monitoring is enabled and monitor should watch metrics. +func (b *Monitor) WatchMetrics() bool { return b.config.Enabled && b.config.MonitorMetrics } + +func (b *Monitor) generateMonitoringEndpoint(process, pipelineID string) string { + return getMonitoringEndpoint(process, b.operatingSystem, pipelineID) +} + +func (b *Monitor) generateLoggingFile(process, pipelineID string) string { + return getLoggingFile(process, b.operatingSystem, b.installPath, pipelineID) +} + +func (b *Monitor) generateLoggingPath(process, pipelineID string) string { + return filepath.Dir(b.generateLoggingFile(process, pipelineID)) + } // EnrichArgs enriches arguments provided to application, in order to enable // monitoring -func (b *Monitor) EnrichArgs(args []string) []string { +func (b *Monitor) EnrichArgs(process, pipelineID string, args []string) []string { appendix := make([]string, 0, 7) - if b.monitoringEndpoint != "" { + monitoringEndpoint := b.generateMonitoringEndpoint(process, pipelineID) + if monitoringEndpoint != "" { appendix = append(appendix, "-E", "http.enabled=true", - "-E", "http.host="+b.monitoringEndpoint, + "-E", "http.host="+monitoringEndpoint, ) } - if b.loggingPath != "" { + loggingPath := b.generateLoggingPath(process, pipelineID) + if loggingPath != "" { appendix = append(appendix, - "-E", "logging.files.path="+b.loggingPath, - "-E", "logging.files.name="+b.process, + "-E", "logging.files.path="+loggingPath, + "-E", "logging.files.name="+process, "-E", "logging.files.keepfiles=7", "-E", "logging.files.permission=0644", "-E", "logging.files.interval=1h", @@ -76,9 +100,9 @@ func (b *Monitor) EnrichArgs(args []string) []string { } // Cleanup removes -func (b *Monitor) Cleanup() error { +func (b *Monitor) Cleanup(process, pipelineID string) error { // do not cleanup logs, they might not be all processed - drop := b.monitoringDrop() + drop := b.monitoringDrop(process, pipelineID) if drop == "" { return nil } @@ -87,9 +111,9 @@ func (b *Monitor) Cleanup() error { } // Prepare executes steps in order for monitoring to work correctly -func (b *Monitor) Prepare(uid, gid int) error { - drops := []string{b.loggingPath} - if drop := b.monitoringDrop(); drop != "" { +func (b *Monitor) Prepare(process, pipelineID string, uid, gid int) error { + drops := []string{b.generateLoggingPath(process, pipelineID)} + if drop := b.monitoringDrop(process, pipelineID); drop != "" { drops = append(drops, drop) } @@ -120,31 +144,31 @@ func (b *Monitor) Prepare(uid, gid int) error { // LogPath describes a path where application stores logs. Empty if // application is not monitorable -func (b *Monitor) LogPath() string { - if !b.monitorLogs { +func (b *Monitor) LogPath(process, pipelineID string) string { + if !b.WatchLogs() { return "" } - return b.loggingPath + return b.generateLoggingFile(process, pipelineID) } // MetricsPath describes a location where application exposes metrics // collectable by metricbeat. -func (b *Monitor) MetricsPath() string { - if !b.monitorMetrics { +func (b *Monitor) MetricsPath(process, pipelineID string) string { + if !b.WatchMetrics() { return "" } - return b.monitoringEndpoint + return b.generateMonitoringEndpoint(process, pipelineID) } // MetricsPathPrefixed return metrics path prefixed with http+ prefix. -func (b *Monitor) MetricsPathPrefixed() string { - return httpPlusPrefix + b.MetricsPath() +func (b *Monitor) MetricsPathPrefixed(process, pipelineID string) string { + return httpPlusPrefix + b.MetricsPath(process, pipelineID) } -func (b *Monitor) monitoringDrop() string { - return monitoringDrop(b.monitoringEndpoint) +func (b *Monitor) monitoringDrop(process, pipelineID string) string { + return monitoringDrop(b.generateMonitoringEndpoint(process, pipelineID)) } func monitoringDrop(path string) (drop string) { diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config.go b/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config/config.go similarity index 67% rename from x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config.go rename to x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config/config.go index c5e2eba6a57b..7f5a5b4bffe8 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/config/config.go @@ -2,10 +2,11 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -package monitoring +package config -// Config describes a configuration of a monitoring -type Config struct { +// MonitoringConfig describes a configuration of a monitoring +type MonitoringConfig struct { + Enabled bool `yaml:"enabled" config:"enabled"` MonitorLogs bool `yaml:"logs" config:"logs"` MonitorMetrics bool `yaml:"metrics" config:"metrics"` } diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/monitor.go b/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/monitor.go index dd4744de8ee4..67fd1a96aee4 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/monitor.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/monitor.go @@ -6,27 +6,35 @@ package monitoring import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/beats" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop" ) // Monitor is a monitoring interface providing information about the way // how application is monitored type Monitor interface { - EnrichArgs([]string) []string - Prepare(uid, gid int) error - Cleanup() error - LogPath() string - MetricsPath() string - MetricsPathPrefixed() string + LogPath(process, pipelineID string) string + MetricsPath(process, pipelineID string) string + MetricsPathPrefixed(process, pipelineID string) string + + Prepare(process, pipelineID string, uid, gid int) error + EnrichArgs(string, string, []string) []string + Cleanup(process, pipelineID string) error + Reload(cfg *config.Config) error + IsMonitoringEnabled() bool + WatchLogs() bool + WatchMetrics() bool +} + +type wrappedConfig struct { + DownloadConfig *artifact.Config `yaml:"download" config:"download"` } // NewMonitor creates a monitor based on a process configuration. -func NewMonitor(isMonitorable bool, process, pipelineID string, downloadConfig *artifact.Config, monitorLogs, monitorMetrics bool) Monitor { - if !isMonitorable { - return noop.NewMonitor() +func NewMonitor(config *config.Config) (Monitor, error) { + cfg := &wrappedConfig{} + if err := config.Unpack(&cfg); err != nil { + return nil, err } - - // so far we support only beats monitoring - return beats.NewMonitor(process, pipelineID, downloadConfig, monitorLogs, monitorMetrics) + return beats.NewMonitor(cfg.DownloadConfig), nil } diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop/noop_monitor.go b/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop/noop_monitor.go index 93e8c2c46dcf..f3b49602f692 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop/noop_monitor.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/monitoring/noop/noop_monitor.go @@ -4,6 +4,8 @@ package noop +import "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + // Monitor is a monitoring interface providing information about the way // how beat is monitored type Monitor struct { @@ -16,33 +18,45 @@ func NewMonitor() *Monitor { // EnrichArgs enriches arguments provided to application, in order to enable // monitoring -func (b *Monitor) EnrichArgs(args []string) []string { +func (b *Monitor) EnrichArgs(_ string, _ string, args []string) []string { return args } // Cleanup cleans up all drops. -func (b *Monitor) Cleanup() error { +func (b *Monitor) Cleanup(string, string) error { return nil } // Prepare executes steps in order for monitoring to work correctly -func (b *Monitor) Prepare(uid, gid int) error { +func (b *Monitor) Prepare(string, string, int, int) error { return nil } // LogPath describes a path where application stores logs. Empty if // application is not monitorable -func (b *Monitor) LogPath() string { +func (b *Monitor) LogPath(string, string) string { return "" } // MetricsPath describes a location where application exposes metrics // collectable by metricbeat. -func (b *Monitor) MetricsPath() string { +func (b *Monitor) MetricsPath(string, string) string { return "" } // MetricsPathPrefixed return metrics path prefixed with http+ prefix. -func (b *Monitor) MetricsPathPrefixed() string { +func (b *Monitor) MetricsPathPrefixed(string, string) string { return "" } + +// Reload reloads state based on configuration. +func (b *Monitor) Reload(cfg *config.Config) error { return nil } + +// IsMonitoringEnabled returns true if monitoring is configured. +func (b *Monitor) IsMonitoringEnabled() bool { return false } + +// WatchLogs return true if monitoring is configured and monitoring logs is enabled. +func (b *Monitor) WatchLogs() bool { return false } + +// WatchMetrics return true if monitoring is configured and monitoring metrics is enabled. +func (b *Monitor) WatchMetrics() bool { return false } diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/start.go b/x-pack/elastic-agent/pkg/core/plugin/app/start.go index e7f8c3f677aa..00684753a0d3 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/start.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/start.go @@ -57,7 +57,7 @@ func (a *Application) Start(ctx context.Context, cfg map[string]interface{}) (er } }() - if err := a.monitor.Prepare(a.uid, a.gid); err != nil { + if err := a.monitor.Prepare(a.name, a.pipelineID, a.uid, a.gid); err != nil { return err } @@ -80,7 +80,7 @@ func (a *Application) Start(ctx context.Context, cfg map[string]interface{}) (er a.limiter.Add() } - spec.Args = a.monitor.EnrichArgs(spec.Args) + spec.Args = a.monitor.EnrichArgs(a.name, a.pipelineID, spec.Args) // specify beat name to avoid data lock conflicts // as for https://github.com/elastic/beats/v7/pull/14030 more than one instance