From 9adb1bd7555e4e099db25b751a4bfefabe8a05c1 Mon Sep 17 00:00:00 2001 From: Denis Date: Tue, 7 Mar 2023 23:16:21 +0100 Subject: [PATCH] Stop re-using processors defined in the config (#34761) * Stop re-using processors defined in the config After introducing the `SafeProcessor` wrapper in https://github.com/elastic/beats/pull/34647 we started returning errors when a processor is being used after its `Close` function has been called. This led to dropped events and error spam in logs but also confirmed that the root cause of the problem was not just a race condition on `Close` but re-used processors somewhere. After a long investigation such code that's re-using processors was finally found. This is the change that removes re-using the processors and instantiates them on each input restart. * Fix linter issues * Add changelog entry (cherry picked from commit 5cfe62cb2f371e89f1af9e5e9c4f7144624d029c) --- CHANGELOG.next.asciidoc | 1 + filebeat/channel/runner.go | 33 ++++++++++++++-------------- libbeat/processors/safe_processor.go | 2 ++ 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 96d3e6f7757..594f9be5a09 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550] - Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605] - Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327] +- Fix errors and panics due to re-used processors {pull}34761[34761] *Heartbeat* diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index 643bbe60412..792c3055285 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -127,27 +128,27 @@ func newCommonConfigEditor( return nil, err } - var indexProcessor processors.Processor - if !config.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) - timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields) - if err != nil { - return nil, err - } - indexProcessor = add_formatted_index.New(timestampFormat) - } - - userProcessors, err := processors.New(config.Processors) - if err != nil { - return nil, err - } - serviceType := config.ServiceType if serviceType == "" { serviceType = config.Module } return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { + var indexProcessor processors.Processor + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return clientCfg, err + } + indexProcessor = add_formatted_index.New(timestampFormat) + } + + userProcessors, err := processors.New(config.Processors) + if err != nil { + return clientCfg, err + } + meta := clientCfg.Processing.Meta.Clone() fields := clientCfg.Processing.Fields.Clone() @@ -191,6 +192,6 @@ func newCommonConfigEditor( func setOptional(to mapstr.M, key string, value string) { if value != "" { - to.Put(key, value) + _, _ = to.Put(key, value) } } diff --git a/libbeat/processors/safe_processor.go b/libbeat/processors/safe_processor.go index 0204e9c971a..86bf36e2175 100644 --- a/libbeat/processors/safe_processor.go +++ b/libbeat/processors/safe_processor.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) var ErrClosed = errors.New("attempt to use a closed processor") @@ -45,6 +46,7 @@ func (p *SafeProcessor) Close() (err error) { if atomic.CompareAndSwapUint32(&p.closed, 0, 1) { return Close(p.Processor) } + logp.L().Warnf("tried to close already closed %q processor", p.Processor.String()) return nil }