From 53f2eab716d42e6316a9e8ed47475910c03eb69b Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 26 Jun 2020 10:15:02 -0400 Subject: [PATCH 1/4] Fix reference leak in TCP and Unix socket inputs The tcp and unix input sources were leaking references causing a memory leak. When an accepted connection ended inputsource/common.Closer was supposed to delete the pointer that it held to the connection, but due to a code error `delete` was being called on the wrong map. --- CHANGELOG.next.asciidoc | 1 + filebeat/inputsource/common/closeref.go | 4 +- filebeat/inputsource/common/closeref_test.go | 54 ++++++++++++++++++++ 3 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 filebeat/inputsource/common/closeref_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7a9746fa51a..aeffe760f77 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -196,6 +196,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `netflow` module to support 7 bytepad for IPFIX template. {issue}18098[18098] - Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149] - Fix date and timestamp formats for fortigate module {pull}19316[19316] +- Fix memory leak in tcp and unix imput sources. {pull}19459[19459] *Heartbeat* diff --git a/filebeat/inputsource/common/closeref.go b/filebeat/inputsource/common/closeref.go index ec53d2639a4..b7ea97cd6a2 100644 --- a/filebeat/inputsource/common/closeref.go +++ b/filebeat/inputsource/common/closeref.go @@ -69,6 +69,8 @@ func (c *Closer) Close() { // propagate close to children. if c.children != nil { for child := range c.children { + // Remove parent to prevent circular references (and deadlock). + child.parent = nil child.Close() } c.children = nil @@ -78,7 +80,7 @@ func (c *Closer) Close() { c.mu.Unlock() if c.parent != nil { - c.removeChild(c) + c.parent.removeChild(c) } } diff --git a/filebeat/inputsource/common/closeref_test.go b/filebeat/inputsource/common/closeref_test.go new file mode 100644 index 00000000000..83bb941a011 --- /dev/null +++ b/filebeat/inputsource/common/closeref_test.go @@ -0,0 +1,54 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCloser(t *testing.T) { + var closeCount int + closerFunc := func() { + closeCount++ + } + + t.Run("parent closes children", func(t *testing.T) { + closeCount = 0 + parent := NewCloser(closerFunc) + WithCloser(parent, closerFunc) + + parent.Close() + assert.Equal(t, 2, closeCount) + }) + + t.Run("children are released from parent", func(t *testing.T) { + closeCount = 0 + parent := NewCloser(closerFunc) + child := WithCloser(parent, closerFunc) + + child.Close() + assert.Equal(t, 1, closeCount) + + // Inspect internal state to verify all children were removed. + parent.mu.Lock() + assert.Len(t, parent.children, 0) + parent.mu.Unlock() + }) +} From 4b2f1045812a4b6623506d8b6f50ceae511f596b Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 26 Jun 2020 16:12:25 -0400 Subject: [PATCH 2/4] Replace common.Closer with context.Context The inputsource/common.Closer was managing bidirectional links between parents and children. Anytime you closed an instance it would close all of its children and also remove itself from its parents list of children (this is where the bug was). Every instance has its own mutex. While recursively closing children it was easy to run into a deadlock because the parent holds a lock while closing its children and then the child must edit the parent to remove itself so it also tries to acquire the parent lock. Instead of modifying the common.Closer I replaced it with a cancellable context.Context which is designed to propagate signals from parent to children and requires less code. --- filebeat/inputsource/common/closeref.go | 139 ------------------- filebeat/inputsource/common/closeref_test.go | 54 ------- filebeat/inputsource/common/handler.go | 12 +- filebeat/inputsource/common/listener.go | 28 ++-- 4 files changed, 22 insertions(+), 211 deletions(-) delete mode 100644 filebeat/inputsource/common/closeref.go delete mode 100644 filebeat/inputsource/common/closeref_test.go diff --git a/filebeat/inputsource/common/closeref.go b/filebeat/inputsource/common/closeref.go deleted file mode 100644 index b7ea97cd6a2..00000000000 --- a/filebeat/inputsource/common/closeref.go +++ /dev/null @@ -1,139 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package common - -import ( - "sync" - - "github.com/pkg/errors" -) - -// CloserFunc is the function called by the Closer on `Close()`. -type CloserFunc func() - -// ErrClosed is returned when the Closer is closed. -var ErrClosed = errors.New("closer is closed") - -// CloseRef implements a subset of the context.Context interface and it's use to synchronize -// the shutdown of multiple go-routines. -type CloseRef interface { - Done() <-chan struct{} - Err() error -} - -// Closer implements a shutdown strategy when dealing with multiples go-routines, it creates a tree -// of Closer, when you call `Close()` on a parent the `Close()` method will be called on the current -// closer and any of the childs it may have and will remove the current node from the parent. -// -// NOTE: The `Close()` is reentrant but will propage the close only once. -type Closer struct { - mu sync.Mutex - done chan struct{} - err error - parent *Closer - children map[*Closer]struct{} - callback CloserFunc -} - -// Close closes the closes and propagates the close to any child, on close the close callback will -// be called, this can be used for custom cleanup like closing a socket. -func (c *Closer) Close() { - c.mu.Lock() - if c.err != nil { - c.mu.Unlock() - return - } - - // Close the channel first so that all processing in Handle() ends first - close(c.done) - - if c.callback != nil { - c.callback() - } - - // propagate close to children. - if c.children != nil { - for child := range c.children { - // Remove parent to prevent circular references (and deadlock). - child.parent = nil - child.Close() - } - c.children = nil - } - - c.err = ErrClosed - c.mu.Unlock() - - if c.parent != nil { - c.parent.removeChild(c) - } -} - -// Done returns the synchronization channel, the channel will be closed if `Close()` was called on -// the current node or any parent it may have. -func (c *Closer) Done() <-chan struct{} { - return c.done -} - -// SetCallback sets the underlying callback function invoked -// when the Closer is Closed. -func (c *Closer) SetCallback(callback CloserFunc) { - c.callback = callback -} - -// Err returns an error if the Closer was already closed. -func (c *Closer) Err() error { - c.mu.Lock() - err := c.err - c.mu.Unlock() - return err -} - -func (c *Closer) removeChild(child *Closer) { - c.mu.Lock() - delete(c.children, child) - c.mu.Unlock() -} - -func (c *Closer) addChild(child *Closer) { - c.mu.Lock() - if c.children == nil { - c.children = make(map[*Closer]struct{}) - } - c.children[child] = struct{}{} - c.mu.Unlock() -} - -// WithCloser wraps a new closer into a child of an existing closer. -func WithCloser(parent *Closer, fn CloserFunc) *Closer { - child := &Closer{ - done: make(chan struct{}), - parent: parent, - callback: fn, - } - parent.addChild(child) - return child -} - -// NewCloser creates a new Closer. -func NewCloser(fn CloserFunc) *Closer { - return &Closer{ - done: make(chan struct{}), - callback: fn, - } -} diff --git a/filebeat/inputsource/common/closeref_test.go b/filebeat/inputsource/common/closeref_test.go deleted file mode 100644 index 83bb941a011..00000000000 --- a/filebeat/inputsource/common/closeref_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package common - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestCloser(t *testing.T) { - var closeCount int - closerFunc := func() { - closeCount++ - } - - t.Run("parent closes children", func(t *testing.T) { - closeCount = 0 - parent := NewCloser(closerFunc) - WithCloser(parent, closerFunc) - - parent.Close() - assert.Equal(t, 2, closeCount) - }) - - t.Run("children are released from parent", func(t *testing.T) { - closeCount = 0 - parent := NewCloser(closerFunc) - child := WithCloser(parent, closerFunc) - - child.Close() - assert.Equal(t, 1, closeCount) - - // Inspect internal state to verify all children were removed. - parent.mu.Lock() - assert.Len(t, parent.children, 0) - parent.mu.Unlock() - }) -} diff --git a/filebeat/inputsource/common/handler.go b/filebeat/inputsource/common/handler.go index 84786086f4e..a55ee1755d5 100644 --- a/filebeat/inputsource/common/handler.go +++ b/filebeat/inputsource/common/handler.go @@ -19,6 +19,7 @@ package common import ( "bufio" + "context" "net" "github.com/pkg/errors" @@ -31,7 +32,7 @@ import ( type HandlerFactory func(config ListenerConfig) ConnectionHandler // ConnectionHandler interface provides mechanisms for handling of incoming connections -type ConnectionHandler func(CloseRef, net.Conn) error +type ConnectionHandler func(context.Context, net.Conn) error // MetadataFunc defines callback executed when a line is read from the split handler. type MetadataFunc func(net.Conn) inputsource.NetworkMetadata @@ -39,7 +40,7 @@ type MetadataFunc func(net.Conn) inputsource.NetworkMetadata // SplitHandlerFactory allows creation of a handler that has splitting capabilities. func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory { return func(config ListenerConfig) ConnectionHandler { - return ConnectionHandler(func(closer CloseRef, conn net.Conn) error { + return ConnectionHandler(func(ctx context.Context, conn net.Conn) error { metadata := metadataCallback(conn) maxMessageSize := uint64(config.MaxMessageSize) @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me scanner.Buffer(buffer, int(maxMessageSize)) for { select { - case <-closer.Done(): + case <-ctx.Done(): break default: } - // Ensure that if the Conn is already closed then dont attempt to scan again - if closer.Err() == ErrClosed { - break - } - if !scanner.Scan() { break } diff --git a/filebeat/inputsource/common/listener.go b/filebeat/inputsource/common/listener.go index 9d686f922a6..4692e474930 100644 --- a/filebeat/inputsource/common/listener.go +++ b/filebeat/inputsource/common/listener.go @@ -20,6 +20,7 @@ package common import ( "bufio" "bytes" + "context" "net" "strings" "sync" @@ -51,9 +52,9 @@ type Listener struct { config *ListenerConfig family Family wg sync.WaitGroup - done chan struct{} log *logp.Logger - closer *Closer + ctx context.Context + cancel context.CancelFunc clientsCount atomic.Int handlerFactory HandlerFactory listenerFactory ListenerFactory @@ -63,10 +64,8 @@ type Listener struct { func NewListener(family Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener { return &Listener{ config: config, - done: make(chan struct{}), family: family, log: logp.NewLogger(string(family)).With("address", location), - closer: NewCloser(nil), handlerFactory: handlerFactory, listenerFactory: listenerFactory, } @@ -80,7 +79,12 @@ func (l *Listener) Start() error { return err } - l.closer.SetCallback(func() { l.Listener.Close() }) + l.ctx, l.cancel = context.WithCancel(context.Background()) + go func() { + <-l.ctx.Done() + l.Listener.Close() + }() + l.log.Info("Started listening for " + l.family.String() + " connection") l.wg.Add(1) @@ -101,7 +105,7 @@ func (l *Listener) run() { conn, err := l.Listener.Accept() if err != nil { select { - case <-l.closer.Done(): + case <-l.ctx.Done(): return default: l.log.Debugw("Can not accept the connection", "error", err) @@ -110,13 +114,17 @@ func (l *Listener) run() { } handler := l.handlerFactory(*l.config) - closer := WithCloser(l.closer, func() { conn.Close() }) + ctx, cancel := context.WithCancel(l.ctx) + go func() { + <-ctx.Done() + conn.Close() + }() l.wg.Add(1) go func() { defer logp.Recover("recovering from a " + l.family.String() + " client crash") defer l.wg.Done() - defer closer.Close() + defer cancel() l.registerHandler() defer l.unregisterHandler() @@ -128,7 +136,7 @@ func (l *Listener) run() { l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load()) } - err := handler(closer, conn) + err := handler(ctx, conn) if err != nil { l.log.Debugw("client error", "error", err) } @@ -148,7 +156,7 @@ func (l *Listener) run() { // Stop stops accepting new incoming connections and Close any active clients func (l *Listener) Stop() { l.log.Info("Stopping" + l.family.String() + "server") - l.closer.Close() + l.cancel() l.wg.Wait() l.log.Info(l.family.String() + " server stopped") } From b524da03f23260040492f68f620f2c55af4dc74c Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 29 Jun 2020 15:45:43 -0400 Subject: [PATCH 3/4] Use ctxtool.WithFunc --- CHANGELOG.next.asciidoc | 2 +- filebeat/inputsource/common/listener.go | 11 ++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index aeffe760f77..51ad3eaea8b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -196,7 +196,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `netflow` module to support 7 bytepad for IPFIX template. {issue}18098[18098] - Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149] - Fix date and timestamp formats for fortigate module {pull}19316[19316] -- Fix memory leak in tcp and unix imput sources. {pull}19459[19459] +- Fix memory leak in tcp and unix input sources. {pull}19459[19459] *Heartbeat* diff --git a/filebeat/inputsource/common/listener.go b/filebeat/inputsource/common/listener.go index 4692e474930..c3ed0362887 100644 --- a/filebeat/inputsource/common/listener.go +++ b/filebeat/inputsource/common/listener.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert/ctxtool" ) // Family represents the type of connection we're handling @@ -113,17 +114,12 @@ func (l *Listener) run() { } } - handler := l.handlerFactory(*l.config) - ctx, cancel := context.WithCancel(l.ctx) - go func() { - <-ctx.Done() - conn.Close() - }() - l.wg.Add(1) go func() { defer logp.Recover("recovering from a " + l.family.String() + " client crash") defer l.wg.Done() + + ctx, cancel := ctxtool.WithFunc(l.ctx, func() { conn.Close() }) defer cancel() l.registerHandler() @@ -136,6 +132,7 @@ func (l *Listener) run() { l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load()) } + handler := l.handlerFactory(*l.config) err := handler(ctx, conn) if err != nil { l.log.Debugw("client error", "error", err) From b028bb114392c2d9a8a8c681fcaba690fa0731f6 Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Mon, 29 Jun 2020 17:14:38 -0400 Subject: [PATCH 4/4] Update go.sum --- go.sum | 1 + 1 file changed, 1 insertion(+) diff --git a/go.sum b/go.sum index 86a09edca2e..e1ff648fc32 100644 --- a/go.sum +++ b/go.sum @@ -727,6 +727,7 @@ go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc= go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=