diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e2a17179f89..cdea4bbe94a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -231,6 +231,7 @@ field. You can revert this change by configuring tags for the module and omittin - Fix date and timestamp formats for fortigate module {pull}19316[19316] - Add missing `default_field: false` to aws filesets fields.yml. {pull}19568[19568] - Fix tls mapping in suricata module {issue}19492[19492] {pull}19494[19494] +- Fix memory leak in tcp and unix input sources. {pull}19459[19459] *Heartbeat* diff --git a/filebeat/inputsource/common/handler.go b/filebeat/inputsource/common/handler.go index 84786086f4e..a55ee1755d5 100644 --- a/filebeat/inputsource/common/handler.go +++ b/filebeat/inputsource/common/handler.go @@ -19,6 +19,7 @@ package common import ( "bufio" + "context" "net" "github.com/pkg/errors" @@ -31,7 +32,7 @@ import ( type HandlerFactory func(config ListenerConfig) ConnectionHandler // ConnectionHandler interface provides mechanisms for handling of incoming connections -type ConnectionHandler func(CloseRef, net.Conn) error +type ConnectionHandler func(context.Context, net.Conn) error // MetadataFunc defines callback executed when a line is read from the split handler. type MetadataFunc func(net.Conn) inputsource.NetworkMetadata @@ -39,7 +40,7 @@ type MetadataFunc func(net.Conn) inputsource.NetworkMetadata // SplitHandlerFactory allows creation of a handler that has splitting capabilities. func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory { return func(config ListenerConfig) ConnectionHandler { - return ConnectionHandler(func(closer CloseRef, conn net.Conn) error { + return ConnectionHandler(func(ctx context.Context, conn net.Conn) error { metadata := metadataCallback(conn) maxMessageSize := uint64(config.MaxMessageSize) @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me scanner.Buffer(buffer, int(maxMessageSize)) for { select { - case <-closer.Done(): + case <-ctx.Done(): break default: } - // Ensure that if the Conn is already closed then dont attempt to scan again - if closer.Err() == ErrClosed { - break - } - if !scanner.Scan() { break } diff --git a/filebeat/inputsource/common/listener.go b/filebeat/inputsource/common/listener.go index f4890ccc767..0798b6aaa11 100644 --- a/filebeat/inputsource/common/listener.go +++ b/filebeat/inputsource/common/listener.go @@ -78,6 +78,14 @@ func (l *Listener) Start() error { return err } + l.ctx, l.cancel = context.WithCancel(context.Background()) + go func() { + <-l.ctx.Done() + l.Listener.Close() + }() + + l.log.Info("Started listening for " + l.family.String() + " connection") + l.wg.Add(1) go func() { defer l.wg.Done()