Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #19459 to 7.x: [Filebeat] Fix reference leak in TCP and Unix socket inputs #19501

Merged
merged 1 commit into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Fix date and timestamp formats for fortigate module {pull}19316[19316]
- Add missing `default_field: false` to aws filesets fields.yml. {pull}19568[19568]
- Fix tls mapping in suricata module {issue}19492[19492] {pull}19494[19494]
- Fix memory leak in tcp and unix input sources. {pull}19459[19459]

*Heartbeat*

Expand Down
12 changes: 4 additions & 8 deletions filebeat/inputsource/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common

import (
"bufio"
"context"
"net"

"github.com/pkg/errors"
Expand All @@ -31,15 +32,15 @@ import (
type HandlerFactory func(config ListenerConfig) ConnectionHandler

// ConnectionHandler interface provides mechanisms for handling of incoming connections
type ConnectionHandler func(CloseRef, net.Conn) error
type ConnectionHandler func(context.Context, net.Conn) error

// MetadataFunc defines callback executed when a line is read from the split handler.
type MetadataFunc func(net.Conn) inputsource.NetworkMetadata

// SplitHandlerFactory allows creation of a handler that has splitting capabilities.
func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory {
return func(config ListenerConfig) ConnectionHandler {
return ConnectionHandler(func(closer CloseRef, conn net.Conn) error {
return ConnectionHandler(func(ctx context.Context, conn net.Conn) error {
metadata := metadataCallback(conn)
maxMessageSize := uint64(config.MaxMessageSize)

Expand All @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me
scanner.Buffer(buffer, int(maxMessageSize))
for {
select {
case <-closer.Done():
case <-ctx.Done():
break
default:
}

// Ensure that if the Conn is already closed then dont attempt to scan again
if closer.Err() == ErrClosed {
break
}

if !scanner.Scan() {
break
}
Expand Down
8 changes: 8 additions & 0 deletions filebeat/inputsource/common/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ func (l *Listener) Start() error {
return err
}

l.ctx, l.cancel = context.WithCancel(context.Background())
go func() {
<-l.ctx.Done()
l.Listener.Close()
}()

l.log.Info("Started listening for " + l.family.String() + " connection")

l.wg.Add(1)
go func() {
defer l.wg.Done()
Expand Down