From f1b595066bde461f8268e13752be111a2ded5413 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 9 May 2019 12:45:32 +0200 Subject: [PATCH 1/4] Fix goroutine leak on initialization failures of log input --- CHANGELOG.next.asciidoc | 1 + filebeat/input/log/input.go | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 4b8f0c108ce..2cff6a14010 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add missing Kubernetes metadata fields to Filebeat CoreDNS module, and fix a documentation error. {pull}11591[11591] - Reduce memory usage if long lines are truncated to fit `max_bytes` limit. The line buffer is copied into a smaller buffer now. This allows the runtime to release unused memory earlier. {pull}11524[11524] - Fix memory leak in Filebeat pipeline acker. {pull}12063[12063] +- Fix goroutine leak caused on initialization failures of log input. {pull}12125[12125] *Heartbeat* diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 3841fc23973..ff7ff10090f 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -76,6 +76,7 @@ func NewInput( outlet channel.Connector, context input.Context, ) (input.Input, error) { + cleanupNeeded := true // Note: underlying output. // The input and harvester do have different requirements @@ -87,11 +88,21 @@ func NewInput( if err != nil { return nil, err } + defer func() { + if cleanupNeeded { + out.Close() + } + }() // stateOut will only be unblocked if the beat is shut down. // otherwise it can block on a full publisher pipeline, so state updates // can be forwarded correctly to the registrar. stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) + defer func() { + if cleanupNeeded { + stateOut.Close() + } + }() meta := context.Meta if len(meta) == 0 { @@ -137,6 +148,7 @@ func NewInput( logp.Info("Configured paths: %v", p.config.Paths) + cleanupNeeded = false return p, nil } From 7792ab675b2fab24a0b1205735baffd9688774ee Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 9 May 2019 13:12:15 +0200 Subject: [PATCH 2/4] Create outputs once config is checked --- filebeat/input/log/input.go | 64 +++++++++++++++---------------------- 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index ff7ff10090f..701ec384522 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -76,48 +76,18 @@ func NewInput( outlet channel.Connector, context input.Context, ) (input.Input, error) { - cleanupNeeded := true - - // Note: underlying output. - // The input and harvester do have different requirements - // on the timings the outlets must be closed/unblocked. - // The outlet generated here is the underlying outlet, only closed - // once all workers have been shut down. - // For state updates and events, separate sub-outlets will be used. - out, err := outlet(cfg, context.DynamicFields) - if err != nil { - return nil, err - } - defer func() { - if cleanupNeeded { - out.Close() - } - }() - - // stateOut will only be unblocked if the beat is shut down. - // otherwise it can block on a full publisher pipeline, so state updates - // can be forwarded correctly to the registrar. - stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) - defer func() { - if cleanupNeeded { - stateOut.Close() - } - }() - meta := context.Meta if len(meta) == 0 { meta = nil } p := &Input{ - config: defaultConfig, - cfg: cfg, - harvesters: harvester.NewRegistry(), - outlet: out, - stateOutlet: stateOut, - states: file.NewStates(), - done: context.Done, - meta: meta, + config: defaultConfig, + cfg: cfg, + harvesters: harvester.NewRegistry(), + states: file.NewStates(), + done: context.Done, + meta: meta, } if err := cfg.Unpack(&p.config); err != nil { @@ -132,7 +102,7 @@ func NewInput( // Create empty harvester to check if configs are fine // TODO: Do config validation instead - _, err = p.createHarvester(file.State{}, nil) + _, err := p.createHarvester(file.State{}, nil) if err != nil { return nil, err } @@ -146,9 +116,27 @@ func NewInput( return nil, err } + // Note: underlying output. + // The input and harvester do have different requirements + // on the timings the outlets must be closed/unblocked. + // The outlet generated here is the underlying outlet, only closed + // once all workers have been shut down. + // For state updates and events, separate sub-outlets will be used. + out, err := outlet(cfg, context.DynamicFields) + if err != nil { + return nil, err + } + + // stateOut will only be unblocked if the beat is shut down. + // otherwise it can block on a full publisher pipeline, so state updates + // can be forwarded correctly to the registrar. + stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) + + p.outlet = out + p.stateOutlet = stateOut + logp.Info("Configured paths: %v", p.config.Paths) - cleanupNeeded = false return p, nil } From 245f79f527ba32e0139a2cade0507eff3f8b285d Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 9 May 2019 13:59:48 +0200 Subject: [PATCH 3/4] Revert "Create outputs once config is checked" This reverts commit 7792ab675b2fab24a0b1205735baffd9688774ee. --- filebeat/input/log/input.go | 64 ++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 701ec384522..ff7ff10090f 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -76,18 +76,48 @@ func NewInput( outlet channel.Connector, context input.Context, ) (input.Input, error) { + cleanupNeeded := true + + // Note: underlying output. + // The input and harvester do have different requirements + // on the timings the outlets must be closed/unblocked. + // The outlet generated here is the underlying outlet, only closed + // once all workers have been shut down. + // For state updates and events, separate sub-outlets will be used. + out, err := outlet(cfg, context.DynamicFields) + if err != nil { + return nil, err + } + defer func() { + if cleanupNeeded { + out.Close() + } + }() + + // stateOut will only be unblocked if the beat is shut down. + // otherwise it can block on a full publisher pipeline, so state updates + // can be forwarded correctly to the registrar. + stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) + defer func() { + if cleanupNeeded { + stateOut.Close() + } + }() + meta := context.Meta if len(meta) == 0 { meta = nil } p := &Input{ - config: defaultConfig, - cfg: cfg, - harvesters: harvester.NewRegistry(), - states: file.NewStates(), - done: context.Done, - meta: meta, + config: defaultConfig, + cfg: cfg, + harvesters: harvester.NewRegistry(), + outlet: out, + stateOutlet: stateOut, + states: file.NewStates(), + done: context.Done, + meta: meta, } if err := cfg.Unpack(&p.config); err != nil { @@ -102,7 +132,7 @@ func NewInput( // Create empty harvester to check if configs are fine // TODO: Do config validation instead - _, err := p.createHarvester(file.State{}, nil) + _, err = p.createHarvester(file.State{}, nil) if err != nil { return nil, err } @@ -116,27 +146,9 @@ func NewInput( return nil, err } - // Note: underlying output. - // The input and harvester do have different requirements - // on the timings the outlets must be closed/unblocked. - // The outlet generated here is the underlying outlet, only closed - // once all workers have been shut down. - // For state updates and events, separate sub-outlets will be used. - out, err := outlet(cfg, context.DynamicFields) - if err != nil { - return nil, err - } - - // stateOut will only be unblocked if the beat is shut down. - // otherwise it can block on a full publisher pipeline, so state updates - // can be forwarded correctly to the registrar. - stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) - - p.outlet = out - p.stateOutlet = stateOut - logp.Info("Configured paths: %v", p.config.Paths) + cleanupNeeded = false return p, nil } From 0c897e42327957a59b9de4d1b1bfde800bad997c Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 9 May 2019 15:20:16 +0200 Subject: [PATCH 4/4] Imitate cleanup package --- filebeat/input/log/input.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index ff7ff10090f..ea19c20033b 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -77,6 +77,11 @@ func NewInput( context input.Context, ) (input.Input, error) { cleanupNeeded := true + cleanupIfNeeded := func(f func() error) { + if cleanupNeeded { + f() + } + } // Note: underlying output. // The input and harvester do have different requirements @@ -88,21 +93,13 @@ func NewInput( if err != nil { return nil, err } - defer func() { - if cleanupNeeded { - out.Close() - } - }() + defer cleanupIfNeeded(out.Close) // stateOut will only be unblocked if the beat is shut down. // otherwise it can block on a full publisher pipeline, so state updates // can be forwarded correctly to the registrar. stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) - defer func() { - if cleanupNeeded { - stateOut.Close() - } - }() + defer cleanupIfNeeded(stateOut.Close) meta := context.Meta if len(meta) == 0 {