diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e8daa5248fd..ad8d6610705 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -96,6 +96,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550] - Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605] - Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327] +- Fix errors and panics due to re-used processors {pull}34761[34761] *Heartbeat* diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index 643bbe60412..792c3055285 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/add_formatted_index" "github.com/elastic/beats/v7/libbeat/publisher/pipetool" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -127,27 +128,27 @@ func newCommonConfigEditor( return nil, err } - var indexProcessor processors.Processor - if !config.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) - timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields) - if err != nil { - return nil, err - } - indexProcessor = add_formatted_index.New(timestampFormat) - } - - userProcessors, err := processors.New(config.Processors) - if err != nil { - return nil, err - } - serviceType := config.ServiceType if serviceType == "" { serviceType = config.Module } return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { + var indexProcessor processors.Processor + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return clientCfg, err + } + indexProcessor = add_formatted_index.New(timestampFormat) + } + + userProcessors, err := processors.New(config.Processors) + if err != nil { + return clientCfg, err + } + meta := clientCfg.Processing.Meta.Clone() fields := clientCfg.Processing.Fields.Clone() @@ -191,6 +192,6 @@ func newCommonConfigEditor( func setOptional(to mapstr.M, key string, value string) { if value != "" { - to.Put(key, value) + _, _ = to.Put(key, value) } } diff --git a/libbeat/processors/safe_processor.go b/libbeat/processors/safe_processor.go index 0204e9c971a..86bf36e2175 100644 --- a/libbeat/processors/safe_processor.go +++ b/libbeat/processors/safe_processor.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) var ErrClosed = errors.New("attempt to use a closed processor") @@ -45,6 +46,7 @@ func (p *SafeProcessor) Close() (err error) { if atomic.CompareAndSwapUint32(&p.closed, 0, 1) { return Close(p.Processor) } + logp.L().Warnf("tried to close already closed %q processor", p.Processor.String()) return nil }