diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 32b2a5008e40..e49d207e293d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -74,6 +74,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Fixed an issue where the proctitle value was being truncated. - Fixed an issue where values were incorrectly interpretted as hex data. - Fixed parsing of the `key` value when multiple keys are present. +- Fix possible resource leak if file_integrity module is used with config + reloading on Windows or Linux. {pull}6198[6198] *Filebeat* diff --git a/auditbeat/module/file_integrity/eventreader_fsnotify.go b/auditbeat/module/file_integrity/eventreader_fsnotify.go index a079a920b8c9..cac541626a87 100644 --- a/auditbeat/module/file_integrity/eventreader_fsnotify.go +++ b/auditbeat/module/file_integrity/eventreader_fsnotify.go @@ -51,19 +51,22 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) { if err := r.watcher.Start(); err != nil { return nil, errors.Wrap(err, "unable to start watcher") } - go r.consumeEvents() + go r.consumeEvents(done) r.log.Infow("Started fsnotify watcher", "file_path", r.config.Paths, "recursive", r.config.Recursive) return r.eventC, nil } -func (r *reader) consumeEvents() { +func (r *reader) consumeEvents(done <-chan struct{}) { defer close(r.eventC) defer r.watcher.Close() for { select { + case <-done: + r.log.Debug("fsnotify reader terminated") + return case event := <-r.watcher.EventChannel(): if event.Name == "" || r.config.IsExcludedPath(event.Name) { continue diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 52a5f32d53b9..c638a09a87a1 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -224,16 +224,26 @@ type capturingReporterV2 struct { eventsC chan mb.Event } +// report writes an event to the output channel and returns true. If the output +// is closed it returns false. +func (r *capturingReporterV2) report(event mb.Event) bool { + select { + case <-r.doneC: + // Publisher is stopped. + return false + case r.eventsC <- event: + return true + } +} + // Event stores the passed-in event into the events array func (r *capturingReporterV2) Event(event mb.Event) bool { - r.eventsC <- event - return true + return r.report(event) } // Error stores the given error into the errors array. func (r *capturingReporterV2) Error(err error) bool { - r.eventsC <- mb.Event{Error: err} - return true + return r.report(mb.Event{Error: err}) } // Done returns the Done channel for this reporter. @@ -255,6 +265,9 @@ func RunPushMetricSetV2(timeout time.Duration, waitEvents int, metricSet mb.Push go func() { defer wg.Done() defer close(r.eventsC) + if closer, ok := metricSet.(mb.Closer); ok { + defer closer.Close() + } metricSet.Run(r) }()