Skip to content

Commit

Permalink
Cherry-pick #19459 to 7.8: [Filebeat] Fix reference leak in TCP and U…
Browse files Browse the repository at this point in the history
…nix socket inputs (#19502)

* [Filebeat] Fix reference leak in TCP and Unix socket inputs (#19459)

The tcp and unix input sources were leaking references causing a memory leak.
When an accepted connection ended inputsource/common.Closer was
supposed to delete the pointer that it held to the connection, but due to a code
error `delete` was being called on the wrong map.

Instead of modifying the common.Closer I replaced it with a cancellable context.Context which
is designed to propagate signals from parent to children and requires less code.

(cherry picked from commit 61f4846)

* Update vendor and notice
  • Loading branch information
andrewkroh authored Jul 16, 2020
1 parent 31b3a4a commit 66293c5
Show file tree
Hide file tree
Showing 37 changed files with 2,099 additions and 1,093 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix a rate limit related issue in httpjson input for Okta module. {issue}18530[18530] {pull}18534[18534]
- Fix tls mapping in suricata module {issue}19492[19492] {pull}19494[19494]
- Fix Cisco ASA dissect pattern for 313008 & 313009 messages. {pull}19149[19149]
- Fix memory leak in tcp and unix input sources. {pull}19459[19459]
- Fix bug with empty filter values in system/service {pull}19812[19812]

*Heartbeat*
Expand Down
48 changes: 46 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,15 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

--------------------------------------------------------------------
Dependency: github.com/elastic/go-concert
Version: v0.0.3
License type (autodetected): Apache-2.0
./vendor/github.com/elastic/go-concert/LICENSE:
--------------------------------------------------------------------
Apache License 2.0


--------------------------------------------------------------------
Dependency: github.com/elastic/go-libaudit
Version: v0.4.0
Expand Down Expand Up @@ -7853,7 +7862,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/lint
Revision: fdd1cda4f05f
Revision: 910be7a94367
License type (autodetected): BSD-3-Clause
./vendor/golang.org/x/lint/LICENSE:
--------------------------------------------------------------------
Expand Down Expand Up @@ -7885,6 +7894,41 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/mod
Version: v0.1.1
Revision: c90efee705ee
License type (autodetected): BSD-3-Clause
./vendor/golang.org/x/mod/LICENSE:
--------------------------------------------------------------------
Copyright (c) 2009 The Go Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/net
Revision: 16171245cfb2
Expand Down Expand Up @@ -8091,7 +8135,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: golang.org/x/tools
Revision: 7b8e75db28f4
Revision: b320d3a0f5a2
License type (autodetected): BSD-3-Clause
./vendor/golang.org/x/tools/LICENSE:
--------------------------------------------------------------------
Expand Down
12 changes: 4 additions & 8 deletions filebeat/inputsource/common/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common

import (
"bufio"
"context"
"net"

"github.com/pkg/errors"
Expand All @@ -31,15 +32,15 @@ import (
type HandlerFactory func(config ListenerConfig) ConnectionHandler

// ConnectionHandler interface provides mechanisms for handling of incoming connections
type ConnectionHandler func(CloseRef, net.Conn) error
type ConnectionHandler func(context.Context, net.Conn) error

// MetadataFunc defines callback executed when a line is read from the split handler.
type MetadataFunc func(net.Conn) inputsource.NetworkMetadata

// SplitHandlerFactory allows creation of a handler that has splitting capabilities.
func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback MetadataFunc, callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) HandlerFactory {
return func(config ListenerConfig) ConnectionHandler {
return ConnectionHandler(func(closer CloseRef, conn net.Conn) error {
return ConnectionHandler(func(ctx context.Context, conn net.Conn) error {
metadata := metadataCallback(conn)
maxMessageSize := uint64(config.MaxMessageSize)

Expand All @@ -60,16 +61,11 @@ func SplitHandlerFactory(family Family, logger *logp.Logger, metadataCallback Me
scanner.Buffer(buffer, int(maxMessageSize))
for {
select {
case <-closer.Done():
case <-ctx.Done():
break
default:
}

// Ensure that if the Conn is already closed then dont attempt to scan again
if closer.Err() == ErrClosed {
break
}

if !scanner.Scan() {
break
}
Expand Down
29 changes: 17 additions & 12 deletions filebeat/inputsource/common/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package common
import (
"bufio"
"bytes"
"context"
"net"
"strings"
"sync"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/ctxtool"
)

// Family represents the type of connection we're handling
Expand All @@ -51,9 +53,9 @@ type Listener struct {
config *ListenerConfig
family Family
wg sync.WaitGroup
done chan struct{}
log *logp.Logger
closer *Closer
ctx context.Context
cancel context.CancelFunc
clientsCount atomic.Int
handlerFactory HandlerFactory
listenerFactory ListenerFactory
Expand All @@ -63,10 +65,8 @@ type Listener struct {
func NewListener(family Family, location string, handlerFactory HandlerFactory, listenerFactory ListenerFactory, config *ListenerConfig) *Listener {
return &Listener{
config: config,
done: make(chan struct{}),
family: family,
log: logp.NewLogger(string(family)).With("address", location),
closer: NewCloser(nil),
handlerFactory: handlerFactory,
listenerFactory: listenerFactory,
}
Expand All @@ -80,7 +80,12 @@ func (l *Listener) Start() error {
return err
}

l.closer.SetCallback(func() { l.Listener.Close() })
l.ctx, l.cancel = context.WithCancel(context.Background())
go func() {
<-l.ctx.Done()
l.Listener.Close()
}()

l.log.Info("Started listening for " + l.family.String() + " connection")

l.wg.Add(1)
Expand All @@ -101,22 +106,21 @@ func (l *Listener) run() {
conn, err := l.Listener.Accept()
if err != nil {
select {
case <-l.closer.Done():
case <-l.ctx.Done():
return
default:
l.log.Debugw("Can not accept the connection", "error", err)
continue
}
}

handler := l.handlerFactory(*l.config)
closer := WithCloser(l.closer, func() { conn.Close() })

l.wg.Add(1)
go func() {
defer logp.Recover("recovering from a " + l.family.String() + " client crash")
defer l.wg.Done()
defer closer.Close()

ctx, cancel := ctxtool.WithFunc(l.ctx, func() { conn.Close() })
defer cancel()

l.registerHandler()
defer l.unregisterHandler()
Expand All @@ -128,7 +132,8 @@ func (l *Listener) run() {
l.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", l.clientsCount.Load())
}

err := handler(closer, conn)
handler := l.handlerFactory(*l.config)
err := handler(ctx, conn)
if err != nil {
l.log.Debugw("client error", "error", err)
}
Expand All @@ -148,7 +153,7 @@ func (l *Listener) run() {
// Stop stops accepting new incoming connections and Close any active clients
func (l *Listener) Stop() {
l.log.Info("Stopping" + l.family.String() + "server")
l.closer.Close()
l.cancel()
l.wg.Wait()
l.log.Info(l.family.String() + " server stopped")
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.5.0
github.com/elastic/go-concert v0.0.3
github.com/elastic/go-libaudit v0.4.0
github.com/elastic/go-licenser v0.2.1
github.com/elastic/go-lookslike v0.3.0
Expand Down Expand Up @@ -148,14 +149,14 @@ require (
go.uber.org/multierr v1.3.0
go.uber.org/zap v1.14.0
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f
golang.org/x/lint v0.0.0-20200130185559-910be7a94367
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb
google.golang.org/grpc v1.27.1
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ github.com/elastic/ecs v1.5.0 h1:/VEIBsRU4ecq2+U3RPfKNc6bFyomP6qnthYEcQZu8GU=
github.com/elastic/ecs v1.5.0/go.mod h1:pgiLbQsijLOJvFR8OTILLu0Ni/R/foUNg0L+T6mU9b4=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng=
github.com/elastic/go-concert v0.0.3 h1:f0F4WOi8tBOFIgwA7YbHRQ+Ok8vR+/qFrG7vYvbpX5Q=
github.com/elastic/go-concert v0.0.3/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM=
github.com/elastic/go-libaudit v0.4.0 h1:pxLCycMJKW91W8ZmZT74DQmryTZuXryKESo6sXdu1XY=
github.com/elastic/go-libaudit v0.4.0/go.mod h1:lNJ7gX+arohEQTwqinAc8xycVuFNqsaunba1mwcBdvE=
github.com/elastic/go-licenser v0.2.1 h1:K76YI6XR2LRpewLGwhrTqasXZcNJG2yHY4/jit/IXGY=
Expand Down Expand Up @@ -694,6 +696,8 @@ go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
Expand Down Expand Up @@ -731,10 +735,13 @@ golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f h1:J5lckAjkw6qYlOZNj90mLYNTEKDvWeuc1yieZ8qUzUE=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367 h1:0IiAsCRByjO2QjX7ZPkw5oU9x+n1YqRL802rjC0c3Aw=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee h1:WG0RUwxtNT4qqaXX3DPA8zHFNm/D9xaBpxzHt1WcA/E=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20170114055629-f2499483f923/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -840,9 +847,13 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4 h1:Toz2IK7k8rbltAXwNAxKcn9OzqyNfMUhUNjz3sL0NMk=
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2 h1:0sfSpGSa544Fwnbot3Oxq/U6SXqjty6Jy/3wRhVS7ig=
golang.org/x/tools v0.0.0-20200216192241-b320d3a0f5a2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
Loading

0 comments on commit 66293c5

Please sign in to comment.