Skip to content

Commit

Permalink
Stop re-using processors defined in the config (#34761)
Browse files Browse the repository at this point in the history
* Stop re-using processors defined in the config

After introducing the `SafeProcessor` wrapper in
#34647 we started returning
errors when a processor is being used after its `Close` function has
been called.

This led to dropped events and error spam in logs but also confirmed
that the root cause of the problem was not just a race condition on
`Close` but re-used processors somewhere.

After a long investigation such code that's re-using processors was
finally found.

This is the change that removes re-using the processors and
instantiates them on each input restart.

* Fix linter issues

* Add changelog entry

(cherry picked from commit 5cfe62c)
  • Loading branch information
rdner authored and mergify[bot] committed Mar 7, 2023
1 parent a17995e commit 9adb1bd
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550]
- Gracefully handle Windows event channel not found errors in winlog input. {issue}30201[30201] {pull}34605[34605]
- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327]
- Fix errors and panics due to re-used processors {pull}34761[34761]

*Heartbeat*

Expand Down
33 changes: 17 additions & 16 deletions filebeat/channel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/add_formatted_index"
"github.com/elastic/beats/v7/libbeat/publisher/pipetool"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -127,27 +128,27 @@ func newCommonConfigEditor(
return nil, err
}

var indexProcessor processors.Processor
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
}

userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}

serviceType := config.ServiceType
if serviceType == "" {
serviceType = config.Module
}

return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) {
var indexProcessor processors.Processor
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err := fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return clientCfg, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
}

userProcessors, err := processors.New(config.Processors)
if err != nil {
return clientCfg, err
}

meta := clientCfg.Processing.Meta.Clone()
fields := clientCfg.Processing.Fields.Clone()

Expand Down Expand Up @@ -191,6 +192,6 @@ func newCommonConfigEditor(

func setOptional(to mapstr.M, key string, value string) {
if value != "" {
to.Put(key, value)
_, _ = to.Put(key, value)
}
}
2 changes: 2 additions & 0 deletions libbeat/processors/safe_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

var ErrClosed = errors.New("attempt to use a closed processor")
Expand All @@ -45,6 +46,7 @@ func (p *SafeProcessor) Close() (err error) {
if atomic.CompareAndSwapUint32(&p.closed, 0, 1) {
return Close(p.Processor)
}
logp.L().Warnf("tried to close already closed %q processor", p.Processor.String())
return nil
}

Expand Down

0 comments on commit 9adb1bd

Please sign in to comment.