From e871627e159701ec7ffdb9f8c192847bb7416041 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 5 Apr 2023 13:09:42 -0700 Subject: [PATCH 1/9] fix performance issues with processors scaling under agent --- auditbeat/cmd/root.go | 7 ++-- heartbeat/cmd/root.go | 2 +- .../report/elasticsearch/elasticsearch.go | 2 +- libbeat/processors/config.go | 23 ++++++++++-- libbeat/publisher/pipeline/stress/run.go | 2 +- libbeat/publisher/processing/default.go | 22 +++++++++--- libbeat/publisher/processing/default_test.go | 35 +++++++++++++++---- metricbeat/cmd/root.go | 2 +- packetbeat/cmd/root.go | 7 ++-- winlogbeat/cmd/root.go | 2 +- x-pack/auditbeat/cmd/root.go | 10 ++++-- x-pack/filebeat/cmd/agent.go | 3 +- x-pack/filebeat/cmd/root.go | 9 +++++ x-pack/metricbeat/cmd/agent.go | 3 +- x-pack/metricbeat/cmd/root.go | 8 ++++- x-pack/osquerybeat/cmd/root.go | 11 +++--- x-pack/packetbeat/cmd/root.go | 10 ++++-- 17 files changed, 120 insertions(+), 38 deletions(-) diff --git a/auditbeat/cmd/root.go b/auditbeat/cmd/root.go index d47e49ea36b..0ddc7b8674d 100644 --- a/auditbeat/cmd/root.go +++ b/auditbeat/cmd/root.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd/instance" "github.com/elastic/beats/v7/libbeat/ecs" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/metricbeat/beater" "github.com/elastic/beats/v7/metricbeat/mb/module" @@ -53,13 +54,13 @@ var withECSVersion = processing.WithFields(mapstr.M{ }) // AuditbeatSettings contains the default settings for auditbeat -func AuditbeatSettings() instance.Settings { +func AuditbeatSettings(globals processors.PluginConfig) instance.Settings { runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError) return instance.Settings{ RunFlags: runFlags, Name: Name, HasDashboards: true, - Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()), + Processing: processing.MakeDefaultSupport(true, globals, withECSVersion, processing.WithHost, processing.WithAgentMeta()), } } @@ -76,5 +77,5 @@ func Initialize(settings instance.Settings) *cmd.BeatsRootCmd { } func init() { - RootCmd = Initialize(AuditbeatSettings()) + RootCmd = Initialize(AuditbeatSettings(nil)) } diff --git a/heartbeat/cmd/root.go b/heartbeat/cmd/root.go index d3374648414..9ea81966b57 100644 --- a/heartbeat/cmd/root.go +++ b/heartbeat/cmd/root.go @@ -54,7 +54,7 @@ var withECSVersion = processing.WithFields(mapstr.M{ func HeartbeatSettings() instance.Settings { return instance.Settings{ Name: Name, - Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithAgentMeta()), + Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithAgentMeta()), HasDashboards: false, } } diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 8c9bca18750..bf29ae872f1 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -149,7 +149,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *conf.C) (report outClient := outputs.NewFailoverClient(clients) outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max) - processing, err := processing.MakeDefaultSupport(true)(beat, log, conf.NewConfig()) + processing, err := processing.MakeDefaultSupport(true, nil)(beat, log, conf.NewConfig()) if err != nil { return nil, err } diff --git a/libbeat/processors/config.go b/libbeat/processors/config.go index 1afd91f3ba8..7717d99adb5 100644 --- a/libbeat/processors/config.go +++ b/libbeat/processors/config.go @@ -17,10 +17,29 @@ package processors -import "github.com/elastic/elastic-agent-libs/config" +import ( + "fmt" + + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" +) // PluginConfig represents the list of processors. type PluginConfig []*config.C -// fields that should be always exported +// MandatoryExportedFields are fields that should be always exported var MandatoryExportedFields = []string{"type"} + +// NewPluginConfigFromList creates a PluginConfig from a list of raw processor config objects +func NewPluginConfigFromList(raw []mapstr.M) (PluginConfig, error) { + processors := make([]*config.C, len(raw)) + for i := 0; i < len(raw); i++ { + cfg, err := config.NewConfigFrom(raw[i]) + if err != nil { + return nil, fmt.Errorf("error creating processor config: %w", err) + } + processors[i] = cfg + } + + return processors, nil +} diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index 6f990e0c429..a3ef5451f53 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -61,7 +61,7 @@ func RunTests( log := logp.L() - processing, err := processing.MakeDefaultSupport(false)(info, log, cfg) + processing, err := processing.MakeDefaultSupport(false, nil)(info, log, cfg) if err != nil { return err } diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index 9b90e61fa35..c87173ef266 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/libbeat/asset" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/fleetmode" "github.com/elastic/beats/v7/libbeat/ecs" "github.com/elastic/beats/v7/libbeat/features" "github.com/elastic/beats/v7/libbeat/mapping" @@ -77,14 +78,14 @@ type builtinModifier func(beat.Info) mapstr.M // MakeDefaultBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields // to each event. func MakeDefaultBeatSupport(normalize bool) SupportFactory { - return MakeDefaultSupport(normalize, WithECS, WithHost, WithAgentMeta()) + return MakeDefaultSupport(normalize, nil, WithECS, WithHost, WithAgentMeta()) } // MakeDefaultObserverSupport creates a new SupportFactory based on NewDefaultSupport. // MakeDefaultObserverSupport automatically adds the `ecs.version` and `observer.X` fields // to each event. func MakeDefaultObserverSupport(normalize bool) SupportFactory { - return MakeDefaultSupport(normalize, WithECS, WithObserverMeta()) + return MakeDefaultSupport(normalize, nil, WithECS, WithObserverMeta()) } // MakeDefaultSupport creates a new SupportFactory for use with the publisher pipeline. @@ -94,8 +95,11 @@ func MakeDefaultObserverSupport(normalize bool) SupportFactory { // and `processor` settings to the event processing pipeline to be generated. // Use WithFields, WithBeatMeta, and other to declare the builtin fields to be added // to each event. Builtin fields can be modified using global `processors`, and `fields` only. +// the fleetDefaultProcessors argument will set the given global-level processors if the beat is currently running under fleet, +// and no other global-level processors are set. func MakeDefaultSupport( normalize bool, + fleetDefaultProcessors processors.PluginConfig, modifiers ...modifier, ) SupportFactory { return func(info beat.Info, log *logp.Logger, beatCfg *config.C) (Supporter, error) { @@ -107,8 +111,18 @@ func MakeDefaultSupport( if err := beatCfg.Unpack(&cfg); err != nil { return nil, err } + // don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those + // also makes it easier to disable global processors if needed, since they're otherwise hardcoded + var rawProcessors processors.PluginConfig + if fleetmode.Enabled() && !beatCfg.HasField("processors") { + log.Debugf("In fleet with processors specified, defaulting to global processors") + rawProcessors = fleetDefaultProcessors + + } else { + rawProcessors = cfg.Processors + } - processors, err := processors.New(cfg.Processors) + processors, err := processors.New(rawProcessors) if err != nil { return nil, fmt.Errorf("error initializing processors: %w", err) } @@ -125,7 +139,7 @@ func WithFields(fields mapstr.M) modifier { } // WithECS modifier adds `ecs.version` builtin fields to a processing pipeline. -var WithECS modifier = WithFields(mapstr.M{ +var WithECS = WithFields(mapstr.M{ "ecs": mapstr.M{ "version": ecs.Version, }, diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 7907063a796..7300ba9cbb7 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -33,8 +33,31 @@ import ( "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + + _ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata" + _ "github.com/elastic/beats/v7/libbeat/processors/add_docker_metadata" + _ "github.com/elastic/beats/v7/libbeat/processors/add_host_metadata" + _ "github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata" ) +func TestGenerateProcessorList(t *testing.T) { + _ = logp.DevelopmentSetup() + inputCfg := []mapstr.M{ + {"add_host_metadata": nil}, + {"add_cloud_metadata": nil}, + {"add_docker_metadata": nil}, + {"add_kubernetes_metadata": nil}, + } + + plugins, err := processors.NewPluginConfigFromList(inputCfg) + require.NoError(t, err) + + processors, err := processors.New(plugins) + require.NoError(t, err) + // make sure the processor init got the config formatted in a way it expected + require.Equal(t, 4, processors.List) +} + func TestProcessorsConfigs(t *testing.T) { defaultInfo := beat.Info{ Beat: "test", @@ -258,7 +281,7 @@ func TestProcessorsConfigs(t *testing.T) { factory := test.factory if factory == nil { - factory = MakeDefaultSupport(true) + factory = MakeDefaultSupport(true, nil) } support, err := factory(info, logp.L(), cfg) @@ -343,7 +366,7 @@ func TestNormalization(t *testing.T) { t.Run(name, func(t *testing.T) { t.Parallel() - s, err := MakeDefaultSupport(test.normalize)(beat.Info{}, logp.L(), config.NewConfig()) + s, err := MakeDefaultSupport(test.normalize, nil)(beat.Info{}, logp.L(), config.NewConfig()) require.NoError(t, err) prog, err := s.Create(beat.ProcessingConfig{}, false) @@ -364,7 +387,7 @@ func TestNormalization(t *testing.T) { } func BenchmarkNormalization(b *testing.B) { - s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig()) + s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig()) require.NoError(b, err) prog, err := s.Create(beat.ProcessingConfig{}, false) @@ -378,7 +401,7 @@ func BenchmarkNormalization(b *testing.B) { } func TestAlwaysDrop(t *testing.T) { - s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig()) + s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig()) require.NoError(t, err) prog, err := s.Create(beat.ProcessingConfig{}, true) @@ -393,7 +416,7 @@ func TestAlwaysDrop(t *testing.T) { } func TestDynamicFields(t *testing.T) { - factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig()) + factory, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig()) require.NoError(t, err) dynFields := mapstr.NewPointer(mapstr.M{}) @@ -416,7 +439,7 @@ func TestDynamicFields(t *testing.T) { } func TestProcessingClose(t *testing.T) { - factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig()) + factory, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig()) require.NoError(t, err) // Inject a processor in the builder that we can check if has been closed. diff --git a/metricbeat/cmd/root.go b/metricbeat/cmd/root.go index ffc3169ad02..e124aacaa7f 100644 --- a/metricbeat/cmd/root.go +++ b/metricbeat/cmd/root.go @@ -58,7 +58,7 @@ func MetricbeatSettings() instance.Settings { RunFlags: runFlags, Name: Name, HasDashboards: true, - Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()), + Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithHost, processing.WithAgentMeta()), } } diff --git a/packetbeat/cmd/root.go b/packetbeat/cmd/root.go index 845129266fa..4260a2e39e2 100644 --- a/packetbeat/cmd/root.go +++ b/packetbeat/cmd/root.go @@ -25,6 +25,7 @@ import ( cmd "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd/instance" "github.com/elastic/beats/v7/libbeat/ecs" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/packetbeat/beater" "github.com/elastic/elastic-agent-libs/mapstr" @@ -49,7 +50,7 @@ var withECSVersion = processing.WithFields(mapstr.M{ var RootCmd *cmd.BeatsRootCmd // PacketbeatSettings contains the default settings for packetbeat -func PacketbeatSettings() instance.Settings { +func PacketbeatSettings(globals processors.PluginConfig) instance.Settings { runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError) runFlags.AddGoFlag(flag.CommandLine.Lookup("I")) runFlags.AddGoFlag(flag.CommandLine.Lookup("t")) @@ -61,7 +62,7 @@ func PacketbeatSettings() instance.Settings { RunFlags: runFlags, Name: Name, HasDashboards: true, - Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()), + Processing: processing.MakeDefaultSupport(true, globals, withECSVersion, processing.WithHost, processing.WithAgentMeta()), InputQueueSize: 400, } } @@ -74,5 +75,5 @@ func Initialize(settings instance.Settings) *cmd.BeatsRootCmd { } func init() { - RootCmd = Initialize(PacketbeatSettings()) + RootCmd = Initialize(PacketbeatSettings(nil)) } diff --git a/winlogbeat/cmd/root.go b/winlogbeat/cmd/root.go index f8aaf2edf07..6b020aa839c 100644 --- a/winlogbeat/cmd/root.go +++ b/winlogbeat/cmd/root.go @@ -52,7 +52,7 @@ func WinlogbeatSettings() instance.Settings { return instance.Settings{ Name: Name, HasDashboards: true, - Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithAgentMeta()), + Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithAgentMeta()), } } diff --git a/x-pack/auditbeat/cmd/root.go b/x-pack/auditbeat/cmd/root.go index 65320d5da11..60382602060 100644 --- a/x-pack/auditbeat/cmd/root.go +++ b/x-pack/auditbeat/cmd/root.go @@ -11,6 +11,7 @@ import ( auditbeatcmd "github.com/elastic/beats/v7/auditbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -30,8 +31,7 @@ var RootCmd *cmd.BeatsRootCmd // auditbeatCfg is a callback registered with central management to perform any needed config transformations // before agent configs are sent to a beat func auditbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { - procs := defaultProcessors() - modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo, procs...) + modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo) if err != nil { return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) } @@ -55,7 +55,11 @@ func auditbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) func init() { management.ConfigTransform.SetTransform(auditbeatCfg) - settings := auditbeatcmd.AuditbeatSettings() + globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors()) + if err != nil { // these are hard-coded, shouldn't fail + panic(fmt.Errorf("error creating global processors: %w", err)) + } + settings := auditbeatcmd.AuditbeatSettings(globalProcs) settings.ElasticLicensed = true RootCmd = auditbeatcmd.Initialize(settings) } diff --git a/x-pack/filebeat/cmd/agent.go b/x-pack/filebeat/cmd/agent.go index ed1be3aeaa0..c925c362dcc 100644 --- a/x-pack/filebeat/cmd/agent.go +++ b/x-pack/filebeat/cmd/agent.go @@ -15,8 +15,7 @@ import ( ) func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { - procs := defaultProcessors() - modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo, procs...) + modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo) if err != nil { return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) } diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index c6f55a02379..e2e8cab87e6 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -5,8 +5,12 @@ package cmd import ( + "fmt" + fbcmd "github.com/elastic/beats/v7/filebeat/cmd" cmd "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/x-pack/libbeat/management" // Register the includes. @@ -21,6 +25,11 @@ const Name = fbcmd.Name func Filebeat() *cmd.BeatsRootCmd { management.ConfigTransform.SetTransform(filebeatCfg) settings := fbcmd.FilebeatSettings() + globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors()) + if err != nil { // these are hard-coded, shouldn't fail + panic(fmt.Errorf("error creating global processors: %w", err)) + } + settings.Processing = processing.MakeDefaultSupport(true, globalProcs) settings.ElasticLicensed = true command := fbcmd.Filebeat(inputs.Init, settings) return command diff --git a/x-pack/metricbeat/cmd/agent.go b/x-pack/metricbeat/cmd/agent.go index 8ee225d4f1c..bc161b6f606 100644 --- a/x-pack/metricbeat/cmd/agent.go +++ b/x-pack/metricbeat/cmd/agent.go @@ -16,8 +16,7 @@ import ( ) func metricbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { - procs := defaultProcessors() - modules, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo, procs...) + modules, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo) if err != nil { return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) } diff --git a/x-pack/metricbeat/cmd/root.go b/x-pack/metricbeat/cmd/root.go index 99fdd9cf49d..07670b2f472 100644 --- a/x-pack/metricbeat/cmd/root.go +++ b/x-pack/metricbeat/cmd/root.go @@ -6,12 +6,14 @@ package cmd import ( "flag" + "fmt" "github.com/spf13/pflag" "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd/instance" "github.com/elastic/beats/v7/libbeat/ecs" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/metricbeat/beater" mbcmd "github.com/elastic/beats/v7/metricbeat/cmd" @@ -47,12 +49,16 @@ func init() { management.ConfigTransform.SetTransform(metricbeatCfg) var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError) runFlags.AddGoFlag(flag.CommandLine.Lookup("system.hostfs")) + globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors()) + if err != nil { // these are hard-coded, shouldn't fail + panic(fmt.Errorf("error creating global processors: %w", err)) + } settings := instance.Settings{ RunFlags: runFlags, Name: Name, HasDashboards: true, ElasticLicensed: true, - Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()), + Processing: processing.MakeDefaultSupport(true, globalProcs, withECSVersion, processing.WithHost, processing.WithAgentMeta()), } RootCmd = cmd.GenRootCmdWithSettings(beater.DefaultCreator(), settings) RootCmd.AddCommand(cmd.GenModulesCmd(Name, "", mbcmd.BuildModulesManager)) diff --git a/x-pack/osquerybeat/cmd/root.go b/x-pack/osquerybeat/cmd/root.go index 26c99a43cf8..75bee5a6552 100644 --- a/x-pack/osquerybeat/cmd/root.go +++ b/x-pack/osquerybeat/cmd/root.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/cli" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/ecs" + "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -43,9 +44,13 @@ var RootCmd = Osquerybeat() func Osquerybeat() *cmd.BeatsRootCmd { management.ConfigTransform.SetTransform(osquerybeatCfg) + globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors()) + if err != nil { // these are hard-coded, shouldn't fail + panic(fmt.Errorf("error creating global processors: %w", err)) + } settings := instance.Settings{ Name: Name, - Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()), + Processing: processing.MakeDefaultSupport(true, globalProcs, withECSVersion, processing.WithHost, processing.WithAgentMeta()), ElasticLicensed: true, } command := cmd.GenRootCmdWithSettings(beater.New, settings) @@ -97,9 +102,7 @@ func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo rawIn.Streams = streams - procs := defaultProcessors() - - modules, err := management.CreateInputsFromStreams(rawIn, "osquery", agentInfo, procs...) + modules, err := management.CreateInputsFromStreams(rawIn, "osquery", agentInfo) if err != nil { return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) } diff --git a/x-pack/packetbeat/cmd/root.go b/x-pack/packetbeat/cmd/root.go index 54cc76f05cb..f77bd827bf2 100644 --- a/x-pack/packetbeat/cmd/root.go +++ b/x-pack/packetbeat/cmd/root.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/processors" packetbeatCmd "github.com/elastic/beats/v7/packetbeat/cmd" "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -32,8 +33,7 @@ var RootCmd *cmd.BeatsRootCmd // configuration generated from a raw Elastic Agent config func packetbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { //grab and properly format the input streams - procs := defaultProcessors() - inputStreams, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo, procs...) + inputStreams, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo) if err != nil { return nil, fmt.Errorf("error generating new stream config: %w", err) } @@ -55,7 +55,11 @@ func init() { // Register packetbeat with central management to perform any needed config // transformations before agent configs are sent to the beat during reload. management.ConfigTransform.SetTransform(packetbeatCfg) - settings := packetbeatCmd.PacketbeatSettings() + globalProcs, err := processors.NewPluginConfigFromList(defaultProcessors()) + if err != nil { // these are hard-coded, shouldn't fail + panic(fmt.Errorf("error creating global processors: %w", err)) + } + settings := packetbeatCmd.PacketbeatSettings(globalProcs) settings.ElasticLicensed = true RootCmd = packetbeatCmd.Initialize(settings) } From 763a11bbba66991022057f9ac3321b302a8c496d Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 5 Apr 2023 14:31:05 -0700 Subject: [PATCH 2/9] make linter happy --- libbeat/monitoring/report/elasticsearch/elasticsearch.go | 2 +- libbeat/publisher/pipeline/stress/run.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index bf29ae872f1..112bec5045c 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -282,7 +282,7 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, clusterUUID = getClusterUUID() } if clusterUUID != "" { - meta.Put("cluster_uuid", clusterUUID) + _, _ = meta.Put("cluster_uuid", clusterUUID) } r.client.Publish(beat.Event{ diff --git a/libbeat/publisher/pipeline/stress/run.go b/libbeat/publisher/pipeline/stress/run.go index a3ef5451f53..ee118d502eb 100644 --- a/libbeat/publisher/pipeline/stress/run.go +++ b/libbeat/publisher/pipeline/stress/run.go @@ -56,7 +56,7 @@ func RunTests( ) error { config := defaultConfig if err := cfg.Unpack(&config); err != nil { - return fmt.Errorf("unpacking config failed: %v", err) + return fmt.Errorf("unpacking config failed: %w", err) } log := logp.L() @@ -81,7 +81,7 @@ func RunTests( }, ) if err != nil { - return fmt.Errorf("loading pipeline failed: %+v", err) + return fmt.Errorf("loading pipeline failed: %w", err) } defer func() { log.Info("Stop pipeline") From fffd782cefbed1fcecd35f2c6ae3d53edc64a3fb Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 5 Apr 2023 15:23:41 -0700 Subject: [PATCH 3/9] fix test --- libbeat/publisher/processing/default_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 7300ba9cbb7..9aa12a5d143 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -55,7 +55,7 @@ func TestGenerateProcessorList(t *testing.T) { processors, err := processors.New(plugins) require.NoError(t, err) // make sure the processor init got the config formatted in a way it expected - require.Equal(t, 4, processors.List) + require.Equal(t, 4, len(processors.List)) } func TestProcessorsConfigs(t *testing.T) { From 5c92aa92c3d1a9783de048d5c2952df88a9a9763 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 5 Apr 2023 15:27:28 -0700 Subject: [PATCH 4/9] add comment --- libbeat/publisher/processing/default.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index c87173ef266..c5787a05d4d 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -114,6 +114,7 @@ func MakeDefaultSupport( // don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those // also makes it easier to disable global processors if needed, since they're otherwise hardcoded var rawProcessors processors.PluginConfig + // don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[] if fleetmode.Enabled() && !beatCfg.HasField("processors") { log.Debugf("In fleet with processors specified, defaulting to global processors") rawProcessors = fleetDefaultProcessors From bdc0d5e2fc5a93ca7f0a95aac956584073ac626a Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 5 Apr 2023 16:09:11 -0700 Subject: [PATCH 5/9] move around defaultProcessors --- x-pack/filebeat/cmd/agent.go | 20 -------------------- x-pack/filebeat/cmd/root.go | 21 +++++++++++++++++++++ x-pack/metricbeat/cmd/agent.go | 15 --------------- x-pack/metricbeat/cmd/root.go | 14 ++++++++++++++ 4 files changed, 35 insertions(+), 35 deletions(-) diff --git a/x-pack/filebeat/cmd/agent.go b/x-pack/filebeat/cmd/agent.go index c925c362dcc..9a3f787eab6 100644 --- a/x-pack/filebeat/cmd/agent.go +++ b/x-pack/filebeat/cmd/agent.go @@ -11,7 +11,6 @@ import ( "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" - "github.com/elastic/elastic-agent-libs/mapstr" ) func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { @@ -36,22 +35,3 @@ func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ( return configList, nil } - -func defaultProcessors() []mapstr.M { - // processors: - // - add_host_metadata: - // when.not.contains.tags: forwarded - // - add_cloud_metadata: ~ - // - add_docker_metadata: ~ - // - add_kubernetes_metadata: ~ - return []mapstr.M{ - { - "add_host_metadata": mapstr.M{ - "when.not.contains.tags": "forwarded", - }, - }, - {"add_cloud_metadata": nil}, - {"add_docker_metadata": nil}, - {"add_kubernetes_metadata": nil}, - } -} diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index e2e8cab87e6..5c6c9885f15 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -12,6 +12,7 @@ import ( "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/publisher/processing" "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-libs/mapstr" // Register the includes. _ "github.com/elastic/beats/v7/x-pack/filebeat/include" @@ -19,6 +20,7 @@ import ( _ "github.com/elastic/beats/v7/x-pack/libbeat/include" ) +// Name is the name of the beat const Name = fbcmd.Name // Filebeat build the beat root command for executing filebeat and it's subcommands. @@ -34,3 +36,22 @@ func Filebeat() *cmd.BeatsRootCmd { command := fbcmd.Filebeat(inputs.Init, settings) return command } + +func defaultProcessors() []mapstr.M { + // processors: + // - add_host_metadata: + // when.not.contains.tags: forwarded + // - add_cloud_metadata: ~ + // - add_docker_metadata: ~ + // - add_kubernetes_metadata: ~ + return []mapstr.M{ + { + "add_host_metadata": mapstr.M{ + "when.not.contains.tags": "forwarded", + }, + }, + {"add_cloud_metadata": nil}, + {"add_docker_metadata": nil}, + {"add_kubernetes_metadata": nil}, + } +} diff --git a/x-pack/metricbeat/cmd/agent.go b/x-pack/metricbeat/cmd/agent.go index bc161b6f606..99fd85fb671 100644 --- a/x-pack/metricbeat/cmd/agent.go +++ b/x-pack/metricbeat/cmd/agent.go @@ -12,7 +12,6 @@ import ( "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" - "github.com/elastic/elastic-agent-libs/mapstr" ) func metricbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { @@ -37,17 +36,3 @@ func metricbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) return configList, nil } - -func defaultProcessors() []mapstr.M { - // processors: - // - add_host_metadata: ~ - // - add_cloud_metadata: ~ - // - add_docker_metadata: ~ - // - add_kubernetes_metadata: ~ - return []mapstr.M{ - {"add_host_metadata": nil}, - {"add_cloud_metadata": nil}, - {"add_docker_metadata": nil}, - {"add_kubernetes_metadata": nil}, - } -} diff --git a/x-pack/metricbeat/cmd/root.go b/x-pack/metricbeat/cmd/root.go index 07670b2f472..455eb335e19 100644 --- a/x-pack/metricbeat/cmd/root.go +++ b/x-pack/metricbeat/cmd/root.go @@ -64,3 +64,17 @@ func init() { RootCmd.AddCommand(cmd.GenModulesCmd(Name, "", mbcmd.BuildModulesManager)) RootCmd.TestCmd.AddCommand(test.GenTestModulesCmd(Name, "", beater.DefaultTestModulesCreator())) } + +func defaultProcessors() []mapstr.M { + // processors: + // - add_host_metadata: ~ + // - add_cloud_metadata: ~ + // - add_docker_metadata: ~ + // - add_kubernetes_metadata: ~ + return []mapstr.M{ + {"add_host_metadata": nil}, + {"add_cloud_metadata": nil}, + {"add_docker_metadata": nil}, + {"add_kubernetes_metadata": nil}, + } +} From 31f6848e4418fc12516318d6c410a03d7be7f897 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 6 Apr 2023 11:41:28 -0700 Subject: [PATCH 6/9] fix tests, add diagnostics --- libbeat/cmd/instance/beat.go | 17 +++++++++++++++++ libbeat/management/management.go | 5 +++++ libbeat/publisher/processing/default.go | 9 +++++++++ libbeat/publisher/processing/default_test.go | 1 - libbeat/publisher/processing/processing.go | 4 ++++ x-pack/libbeat/management/managerV2.go | 5 +++++ 6 files changed, 40 insertions(+), 1 deletion(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 57064b2e708..830ec37aec2 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -847,9 +847,26 @@ func (b *Beat) configure(settings Settings) error { } b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) + b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors", + "global_processors.txt", "text/plain", b.agentDiagnosticHook) + return err } +// agentDiagnosticHook is the callback function sent to the agent manager RegisterDiagnosticHook function +// right now, this only returns information on the global processors; however, in the future, we might find it useful +// to expand this to other components of the beat state. +// To anyone refactoring: be careful to make sure the callback is registered after the global processors are initialized +func (b *Beat) agentDiagnosticHook() []byte { + list := b.processing.Processors() + + var debugBytes []byte + for _, proc := range list { + debugBytes = append(debugBytes, []byte(proc+"\n")...) + } + return debugBytes +} + func (b *Beat) loadMeta(metaPath string) error { type meta struct { UUID uuid.UUID `json:"uuid"` diff --git a/libbeat/management/management.go b/libbeat/management/management.go index d1aad3a171b..88faa48f540 100644 --- a/libbeat/management/management.go +++ b/libbeat/management/management.go @@ -101,6 +101,9 @@ type Manager interface { // SetPayload Allows to add additional metadata to future requests made by the manager. SetPayload(map[string]interface{}) + + // RegisterDiagnosticHook registers a callback for elastic-agent diagnostics + RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) } // ManagerFactory is the factory type for creating a config manager @@ -192,3 +195,5 @@ func (n *fallbackManager) CheckRawConfig(cfg *config.C) error { return nil } func (n *fallbackManager) RegisterAction(action client.Action) {} func (n *fallbackManager) UnregisterAction(action client.Action) {} func (n *fallbackManager) SetPayload(map[string]interface{}) {} +func (n *fallbackManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ client.DiagnosticHook) { +} diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index c5787a05d4d..c993bbb446b 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -258,6 +258,15 @@ func newBuilder( return b, nil } +// Processors returns a string description of the processor config +func (b *builder) Processors() []string { + procList := []string{} + for _, proc := range b.processors.list { + procList = append(procList, proc.String()) + } + return procList +} + // Create combines the builder configuration with the client settings // in order to build the event processing pipeline. // diff --git a/libbeat/publisher/processing/default_test.go b/libbeat/publisher/processing/default_test.go index 9aa12a5d143..ef58dc97b8e 100644 --- a/libbeat/publisher/processing/default_test.go +++ b/libbeat/publisher/processing/default_test.go @@ -41,7 +41,6 @@ import ( ) func TestGenerateProcessorList(t *testing.T) { - _ = logp.DevelopmentSetup() inputCfg := []mapstr.M{ {"add_host_metadata": nil}, {"add_cloud_metadata": nil}, diff --git a/libbeat/publisher/processing/processing.go b/libbeat/publisher/processing/processing.go index c8d44dc56cd..e5d6be9581b 100644 --- a/libbeat/publisher/processing/processing.go +++ b/libbeat/publisher/processing/processing.go @@ -35,6 +35,10 @@ type SupportFactory func(info beat.Info, log *logp.Logger, cfg *config.C) (Suppo // If `drop` is set, then the processor generated must always drop all events. // A Supporter needs to be closed with `Close()` to release its global resources. type Supporter interface { + // Create a running processor interface based on the given config Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error) + // Processors returns a list of config strings for the given processor, for debug purposes + Processors() []string + // Close the processor supporter Close() error } diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 03abdfd11e8..922170177d7 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -165,6 +165,11 @@ func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agen // Beats central management interface implementation // ================================ +// RegisterDiagnosticHook will register a diagnostic callback function when elastic-agent asks for a diagnostics dump +func (cm *BeatV2Manager) RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook) { + cm.client.RegisterDiagnosticHook(name, description, filename, contentType, hook) +} + // UpdateStatus updates the manager with the current status for the beat. func (cm *BeatV2Manager) UpdateStatus(status lbmanagement.Status, msg string) { cm.mx.Lock() From 2269060363be903e11e35532400486f8417152ed Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 11 Apr 2023 11:11:47 -0700 Subject: [PATCH 7/9] fix default processor on filebeat --- x-pack/filebeat/cmd/root.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index 5c6c9885f15..9489fdb8a93 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -31,7 +31,7 @@ func Filebeat() *cmd.BeatsRootCmd { if err != nil { // these are hard-coded, shouldn't fail panic(fmt.Errorf("error creating global processors: %w", err)) } - settings.Processing = processing.MakeDefaultSupport(true, globalProcs) + settings.Processing = processing.MakeDefaultSupport(true, globalProcs, processing.WithECS, processing.WithHost, processing.WithAgentMeta()) settings.ElasticLicensed = true command := fbcmd.Filebeat(inputs.Init, settings) return command From 70d2d69a3eee2ee532f3a9ae03abb6791ffd71b0 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 11 Apr 2023 12:47:25 -0700 Subject: [PATCH 8/9] change log line --- libbeat/publisher/processing/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/publisher/processing/default.go b/libbeat/publisher/processing/default.go index c993bbb446b..06e7ccf9587 100644 --- a/libbeat/publisher/processing/default.go +++ b/libbeat/publisher/processing/default.go @@ -116,7 +116,7 @@ func MakeDefaultSupport( var rawProcessors processors.PluginConfig // don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[] if fleetmode.Enabled() && !beatCfg.HasField("processors") { - log.Debugf("In fleet with processors specified, defaulting to global processors") + log.Debugf("In fleet mode with no processors specified, defaulting to global processors") rawProcessors = fleetDefaultProcessors } else { From 9467c5090045b341252fffb573d54f0d6e6ee203 Mon Sep 17 00:00:00 2001 From: Pierre HILBERT Date: Wed, 12 Apr 2023 14:31:41 +0200 Subject: [PATCH 9/9] Update CHANGELOG.next.asciidoc Adding changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f9adc3a1424..42e2c9ea4d2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -62,6 +62,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}34674[34674] - The Elasticsearch output now splits large requests instead of dropping them when it receives a StatusRequestEntityTooLarge error. {pull}34911[34911] - Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964] +- Fix performance issues when we have a lot of inputs starting and stopping by allowing to disable global processors under fleet. {issue}35000[35000] {pull}35031[35031] *Auditbeat*