From 59dbbb07fe22006fce66c29544eca39f7112d050 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 7 Mar 2023 19:47:28 +0100 Subject: [PATCH 1/3] Stop re-using processors defined in the config After introducing the `SafeProcessor` wrapper in https://github.com/elastic/beats/pull/34647 we started returning errors when a processor is being used after its `Close` function has been called. This led to dropped events and error spam in logs but also confirmed that the root cause of the problem was not just a race condition on `Close` but re-used processors somewhere. After a long investigation such code that's re-using processors was finally found. This is the change that removes re-using the processors and instantiates them on each input restart. --- filebeat/channel/runner.go | 30 ++++++++++++++-------------- libbeat/processors/safe_processor.go | 2 ++ 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index 643bbe60412..72bbf666bce 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -127,27 +127,27 @@ func newCommonConfigEditor( return nil, err } - var indexProcessor processors.Processor - if !config.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) - timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields) - if err != nil { - return nil, err - } - indexProcessor = add_formatted_index.New(timestampFormat) - } - - userProcessors, err := processors.New(config.Processors) - if err != nil { - return nil, err - } - serviceType := config.ServiceType if serviceType == "" { serviceType = config.Module } return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { + var indexProcessor processors.Processor + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return clientCfg, err + } + indexProcessor = add_formatted_index.New(timestampFormat) + } + + userProcessors, err := processors.New(config.Processors) + if err != nil { + return clientCfg, err + } + meta := clientCfg.Processing.Meta.Clone() fields := clientCfg.Processing.Fields.Clone() diff --git a/libbeat/processors/safe_processor.go b/libbeat/processors/safe_processor.go index 0204e9c971a..86bf36e2175 100644 --- a/libbeat/processors/safe_processor.go +++ b/libbeat/processors/safe_processor.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) var ErrClosed = errors.New("attempt to use a closed processor") @@ -45,6 +46,7 @@ func (p *SafeProcessor) Close() (err error) { if atomic.CompareAndSwapUint32(&p.closed, 0, 1) { return Close(p.Processor) } + logp.L().Warnf("tried to close already closed %q processor", p.Processor.String()) return nil } From c90f3de638fea4861f26e26473e58e8133de9f80 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 7 Mar 2023 20:08:54 +0100 Subject: [PATCH 2/3] Fix linter issues --- filebeat/channel/runner.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index 72bbf666bce..792c3055285 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -191,6 +192,6 @@ func newCommonConfigEditor( func setOptional(to mapstr.M, key string, value string) { if value != "" { - to.Put(key, value) + _, _ = to.Put(key, value) } } From ce675d5396e32a9ff3c04156929ee035b01bd35e Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Tue, 7 Mar 2023 20:10:26 +0100 Subject: [PATCH 3/3] Add changelog entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e8daa5248fd..ad8d6610705 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -96,6 +96,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550] - Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605] - Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327] +- Fix errors and panics due to re-used processors {pull}34761[34761] *Heartbeat*