Skip to content

Commit

Permalink
Fix performance issues with processors scaling under agent (#35031)
Browse files Browse the repository at this point in the history
* fix performance issues with processors scaling under agent

* make linter happy

* fix test

* add comment

* move around defaultProcessors

* fix tests, add diagnostics

* fix default processor on filebeat

* change log line

* Update CHANGELOG.next.asciidoc

Adding changelog entry

---------

Co-authored-by: Pierre HILBERT <[email protected]>
(cherry picked from commit ea1293f)

# Conflicts:
#	libbeat/management/management.go
  • Loading branch information
fearful-symmetry authored and mergify[bot] committed Apr 12, 2023
1 parent 5be2585 commit 3dcbbac
Show file tree
Hide file tree
Showing 22 changed files with 212 additions and 76 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
*Affecting all Beats*
- Support for multiline zookeeper logs {issue}2496[2496]
- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964]
- Fix performance issues when we have a lot of inputs starting and stopping by allowing to disable global processors under fleet. {issue}35000[35000] {pull}35031[35031]

*Auditbeat*

Expand Down
7 changes: 4 additions & 3 deletions auditbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/metricbeat/beater"
"github.com/elastic/beats/v7/metricbeat/mb/module"
Expand Down Expand Up @@ -53,13 +54,13 @@ var withECSVersion = processing.WithFields(mapstr.M{
})

// AuditbeatSettings contains the default settings for auditbeat
func AuditbeatSettings() instance.Settings {
func AuditbeatSettings(globals processors.PluginConfig) instance.Settings {
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
return instance.Settings{
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, globals, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
}
}

Expand All @@ -76,5 +77,5 @@ func Initialize(settings instance.Settings) *cmd.BeatsRootCmd {
}

func init() {
RootCmd = Initialize(AuditbeatSettings())
RootCmd = Initialize(AuditbeatSettings(nil))
}
2 changes: 1 addition & 1 deletion heartbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var withECSVersion = processing.WithFields(mapstr.M{
func HeartbeatSettings() instance.Settings {
return instance.Settings{
Name: Name,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithAgentMeta()),
HasDashboards: false,
}
}
Expand Down
17 changes: 17 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,26 @@ func (b *Beat) configure(settings Settings) error {
}
b.processing, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)

b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors",
"global_processors.txt", "text/plain", b.agentDiagnosticHook)

return err
}

// agentDiagnosticHook is the callback function sent to the agent manager RegisterDiagnosticHook function
// right now, this only returns information on the global processors; however, in the future, we might find it useful
// to expand this to other components of the beat state.
// To anyone refactoring: be careful to make sure the callback is registered after the global processors are initialized
func (b *Beat) agentDiagnosticHook() []byte {
list := b.processing.Processors()

var debugBytes []byte
for _, proc := range list {
debugBytes = append(debugBytes, []byte(proc+"\n")...)
}
return debugBytes
}

func (b *Beat) loadMeta(metaPath string) error {
type meta struct {
UUID uuid.UUID `json:"uuid"`
Expand Down
18 changes: 18 additions & 0 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ type Manager interface {

// SetPayload Allows to add additional metadata to future requests made by the manager.
SetPayload(map[string]interface{})

// RegisterDiagnosticHook registers a callback for elastic-agent diagnostics
RegisterDiagnosticHook(name string, description string, filename string, contentType string, hook client.DiagnosticHook)
}

// PluginFunc for creating FactoryFunc if it matches a config
Expand Down Expand Up @@ -190,4 +193,19 @@ func (n *nilManager) RegisterAction(action client.Action) {}

func (n *nilManager) UnregisterAction(action client.Action) {}

<<<<<<< HEAD
func (n *nilManager) SetPayload(map[string]interface{}) {}
=======
// Enabled returns false because management is disabled.
// the nilManager is still used for shutdown on some cases,
// but that does not mean the Beat is being managed externally,
// hence it will always return false.
func (n *fallbackManager) Enabled() bool { return false }
func (n *fallbackManager) Start() error { return nil }
func (n *fallbackManager) CheckRawConfig(cfg *config.C) error { return nil }
func (n *fallbackManager) RegisterAction(action client.Action) {}
func (n *fallbackManager) UnregisterAction(action client.Action) {}
func (n *fallbackManager) SetPayload(map[string]interface{}) {}
func (n *fallbackManager) RegisterDiagnosticHook(_ string, _ string, _ string, _ string, _ client.DiagnosticHook) {
}
>>>>>>> ea1293fdb0 (Fix performance issues with processors scaling under agent (#35031))
4 changes: 2 additions & 2 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func makeReporter(beat beat.Info, settings report.Settings, cfg *conf.C) (report
outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

processing, err := processing.MakeDefaultSupport(true)(beat, log, conf.NewConfig())
processing, err := processing.MakeDefaultSupport(true, nil)(beat, log, conf.NewConfig())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -282,7 +282,7 @@ func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration,
clusterUUID = getClusterUUID()
}
if clusterUUID != "" {
meta.Put("cluster_uuid", clusterUUID)
_, _ = meta.Put("cluster_uuid", clusterUUID)
}

r.client.Publish(beat.Event{
Expand Down
23 changes: 21 additions & 2 deletions libbeat/processors/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,29 @@

package processors

import "github.com/elastic/elastic-agent-libs/config"
import (
"fmt"

"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

// PluginConfig represents the list of processors.
type PluginConfig []*config.C

// fields that should be always exported
// MandatoryExportedFields are fields that should be always exported
var MandatoryExportedFields = []string{"type"}

// NewPluginConfigFromList creates a PluginConfig from a list of raw processor config objects
func NewPluginConfigFromList(raw []mapstr.M) (PluginConfig, error) {
processors := make([]*config.C, len(raw))
for i := 0; i < len(raw); i++ {
cfg, err := config.NewConfigFrom(raw[i])
if err != nil {
return nil, fmt.Errorf("error creating processor config: %w", err)
}
processors[i] = cfg
}

return processors, nil
}
6 changes: 3 additions & 3 deletions libbeat/publisher/pipeline/stress/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ func RunTests(
) error {
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return fmt.Errorf("unpacking config failed: %v", err)
return fmt.Errorf("unpacking config failed: %w", err)
}

log := logp.L()

processing, err := processing.MakeDefaultSupport(false)(info, log, cfg)
processing, err := processing.MakeDefaultSupport(false, nil)(info, log, cfg)
if err != nil {
return err
}
Expand All @@ -81,7 +81,7 @@ func RunTests(
},
)
if err != nil {
return fmt.Errorf("loading pipeline failed: %+v", err)
return fmt.Errorf("loading pipeline failed: %w", err)
}
defer func() {
log.Info("Stop pipeline")
Expand Down
32 changes: 28 additions & 4 deletions libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/elastic/beats/v7/libbeat/asset"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/fleetmode"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/features"
"github.com/elastic/beats/v7/libbeat/mapping"
Expand Down Expand Up @@ -77,14 +78,14 @@ type builtinModifier func(beat.Info) mapstr.M
// MakeDefaultBeatSupport automatically adds the `ecs.version`, `host.name` and `agent.X` fields
// to each event.
func MakeDefaultBeatSupport(normalize bool) SupportFactory {
return MakeDefaultSupport(normalize, WithECS, WithHost, WithAgentMeta())
return MakeDefaultSupport(normalize, nil, WithECS, WithHost, WithAgentMeta())
}

// MakeDefaultObserverSupport creates a new SupportFactory based on NewDefaultSupport.
// MakeDefaultObserverSupport automatically adds the `ecs.version` and `observer.X` fields
// to each event.
func MakeDefaultObserverSupport(normalize bool) SupportFactory {
return MakeDefaultSupport(normalize, WithECS, WithObserverMeta())
return MakeDefaultSupport(normalize, nil, WithECS, WithObserverMeta())
}

// MakeDefaultSupport creates a new SupportFactory for use with the publisher pipeline.
Expand All @@ -94,8 +95,11 @@ func MakeDefaultObserverSupport(normalize bool) SupportFactory {
// and `processor` settings to the event processing pipeline to be generated.
// Use WithFields, WithBeatMeta, and other to declare the builtin fields to be added
// to each event. Builtin fields can be modified using global `processors`, and `fields` only.
// the fleetDefaultProcessors argument will set the given global-level processors if the beat is currently running under fleet,
// and no other global-level processors are set.
func MakeDefaultSupport(
normalize bool,
fleetDefaultProcessors processors.PluginConfig,
modifiers ...modifier,
) SupportFactory {
return func(info beat.Info, log *logp.Logger, beatCfg *config.C) (Supporter, error) {
Expand All @@ -107,8 +111,19 @@ func MakeDefaultSupport(
if err := beatCfg.Unpack(&cfg); err != nil {
return nil, err
}
// don't try to "merge" the two lists somehow, if the supportFactory caller requests its own processors, use those
// also makes it easier to disable global processors if needed, since they're otherwise hardcoded
var rawProcessors processors.PluginConfig
// don't check the array directly, use HasField, that way processors can easily be bypassed with -E processors=[]
if fleetmode.Enabled() && !beatCfg.HasField("processors") {
log.Debugf("In fleet mode with no processors specified, defaulting to global processors")
rawProcessors = fleetDefaultProcessors

} else {
rawProcessors = cfg.Processors
}

processors, err := processors.New(cfg.Processors)
processors, err := processors.New(rawProcessors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %w", err)
}
Expand All @@ -125,7 +140,7 @@ func WithFields(fields mapstr.M) modifier {
}

// WithECS modifier adds `ecs.version` builtin fields to a processing pipeline.
var WithECS modifier = WithFields(mapstr.M{
var WithECS = WithFields(mapstr.M{
"ecs": mapstr.M{
"version": ecs.Version,
},
Expand Down Expand Up @@ -243,6 +258,15 @@ func newBuilder(
return b, nil
}

// Processors returns a string description of the processor config
func (b *builder) Processors() []string {
procList := []string{}
for _, proc := range b.processors.list {
procList = append(procList, proc.String())
}
return procList
}

// Create combines the builder configuration with the client settings
// in order to build the event processing pipeline.
//
Expand Down
34 changes: 28 additions & 6 deletions libbeat/publisher/processing/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,30 @@ import (
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

_ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_docker_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_host_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata"
)

func TestGenerateProcessorList(t *testing.T) {
inputCfg := []mapstr.M{
{"add_host_metadata": nil},
{"add_cloud_metadata": nil},
{"add_docker_metadata": nil},
{"add_kubernetes_metadata": nil},
}

plugins, err := processors.NewPluginConfigFromList(inputCfg)
require.NoError(t, err)

processors, err := processors.New(plugins)
require.NoError(t, err)
// make sure the processor init got the config formatted in a way it expected
require.Equal(t, 4, len(processors.List))
}

func TestProcessorsConfigs(t *testing.T) {
defaultInfo := beat.Info{
Beat: "test",
Expand Down Expand Up @@ -258,7 +280,7 @@ func TestProcessorsConfigs(t *testing.T) {

factory := test.factory
if factory == nil {
factory = MakeDefaultSupport(true)
factory = MakeDefaultSupport(true, nil)
}

support, err := factory(info, logp.L(), cfg)
Expand Down Expand Up @@ -343,7 +365,7 @@ func TestNormalization(t *testing.T) {
t.Run(name, func(t *testing.T) {
t.Parallel()

s, err := MakeDefaultSupport(test.normalize)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(test.normalize, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

prog, err := s.Create(beat.ProcessingConfig{}, false)
Expand All @@ -364,7 +386,7 @@ func TestNormalization(t *testing.T) {
}

func BenchmarkNormalization(b *testing.B) {
s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(b, err)

prog, err := s.Create(beat.ProcessingConfig{}, false)
Expand All @@ -378,7 +400,7 @@ func BenchmarkNormalization(b *testing.B) {
}

func TestAlwaysDrop(t *testing.T) {
s, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
s, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

prog, err := s.Create(beat.ProcessingConfig{}, true)
Expand All @@ -393,7 +415,7 @@ func TestAlwaysDrop(t *testing.T) {
}

func TestDynamicFields(t *testing.T) {
factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
factory, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

dynFields := mapstr.NewPointer(mapstr.M{})
Expand All @@ -416,7 +438,7 @@ func TestDynamicFields(t *testing.T) {
}

func TestProcessingClose(t *testing.T) {
factory, err := MakeDefaultSupport(true)(beat.Info{}, logp.L(), config.NewConfig())
factory, err := MakeDefaultSupport(true, nil)(beat.Info{}, logp.L(), config.NewConfig())
require.NoError(t, err)

// Inject a processor in the builder that we can check if has been closed.
Expand Down
4 changes: 4 additions & 0 deletions libbeat/publisher/processing/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ type SupportFactory func(info beat.Info, log *logp.Logger, cfg *config.C) (Suppo
// If `drop` is set, then the processor generated must always drop all events.
// A Supporter needs to be closed with `Close()` to release its global resources.
type Supporter interface {
// Create a running processor interface based on the given config
Create(cfg beat.ProcessingConfig, drop bool) (beat.Processor, error)
// Processors returns a list of config strings for the given processor, for debug purposes
Processors() []string
// Close the processor supporter
Close() error
}
2 changes: 1 addition & 1 deletion metricbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func MetricbeatSettings() instance.Settings {
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, nil, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
}
}

Expand Down
7 changes: 4 additions & 3 deletions packetbeat/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
cmd "github.com/elastic/beats/v7/libbeat/cmd"
"github.com/elastic/beats/v7/libbeat/cmd/instance"
"github.com/elastic/beats/v7/libbeat/ecs"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
"github.com/elastic/beats/v7/packetbeat/beater"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand All @@ -49,7 +50,7 @@ var withECSVersion = processing.WithFields(mapstr.M{
var RootCmd *cmd.BeatsRootCmd

// PacketbeatSettings contains the default settings for packetbeat
func PacketbeatSettings() instance.Settings {
func PacketbeatSettings(globals processors.PluginConfig) instance.Settings {
runFlags := pflag.NewFlagSet(Name, pflag.ExitOnError)
runFlags.AddGoFlag(flag.CommandLine.Lookup("I"))
runFlags.AddGoFlag(flag.CommandLine.Lookup("t"))
Expand All @@ -61,7 +62,7 @@ func PacketbeatSettings() instance.Settings {
RunFlags: runFlags,
Name: Name,
HasDashboards: true,
Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
Processing: processing.MakeDefaultSupport(true, globals, withECSVersion, processing.WithHost, processing.WithAgentMeta()),
InputQueueSize: 400,
}
}
Expand All @@ -74,5 +75,5 @@ func Initialize(settings instance.Settings) *cmd.BeatsRootCmd {
}

func init() {
RootCmd = Initialize(PacketbeatSettings())
RootCmd = Initialize(PacketbeatSettings(nil))
}
Loading

0 comments on commit 3dcbbac

Please sign in to comment.