Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leaks with metadata processors #16349

Merged
merged 7 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Explicitly detect missing variables in autodiscover configuration, log them at the debug level. {issue}20568[20568] {pull}20898[20898]
- Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197]
- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21259[21258]
- Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type ClientEventer interface {

type ProcessorList interface {
Processor
Close() error
All() []Processor
}

Expand Down
5 changes: 5 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
if err != nil {
return err
}
defer func() {
if err := b.processing.Close(); err != nil {
logp.Warn("Failed to close global processing: %v", err)
}
}()

// Windows: Mark service as stopped.
// After this is run, a Beat service is considered by the OS to be stopped
Expand Down
7 changes: 3 additions & 4 deletions libbeat/common/docker/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool
// Extra check to confirm that Docker is available
_, err = client.Info(context.Background())
if err != nil {
client.Close()
return nil, err
}

Expand Down Expand Up @@ -395,14 +396,12 @@ func (w *watcher) cleanupWorker() {
log := w.log

for {
// Wait a full period
time.Sleep(w.cleanupTimeout)

select {
case <-w.ctx.Done():
w.stopped.Done()
return
default:
// Wait a full period
case <-time.After(w.cleanupTimeout):
// Check entries for timeout
var toDelete []string
timeout := time.Now().Add(-w.cleanupTimeout)
Expand Down
12 changes: 12 additions & 0 deletions libbeat/processors/add_docker_metadata/add_docker_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,18 @@ func (d *addDockerMetadata) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

func (d *addDockerMetadata) Close() error {
if d.cgroups != nil {
d.cgroups.StopJanitor()
}
d.watcher.Stop()
err := processors.Close(d.sourceProcessor)
if err != nil {
return errors.Wrap(err, "closing source processor of add_docker_metadata")
}
return nil
jsoriano marked this conversation as resolved.
Show resolved Hide resolved
}

func (d *addDockerMetadata) String() string {
return fmt.Sprintf("%v=[match_fields=[%v] match_pids=[%v]]",
processorName, strings.Join(d.fields, ", "), strings.Join(d.pidFields, ", "))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build linux darwin windows
// +build integration

package add_docker_metadata

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/docker"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/processors"
dockertest "github.com/elastic/beats/v7/libbeat/tests/docker"
"github.com/elastic/beats/v7/libbeat/tests/resources"
)

func TestAddDockerMetadata(t *testing.T) {
goroutines := resources.NewGoroutinesChecker()
defer goroutines.Check(t)

client, err := docker.NewClient(defaultConfig().Host, nil, nil)
require.NoError(t, err)

// Docker clients can affect the goroutines checker because they keep
// idle keep-alive connections, so we explicitly close them.
// These idle connections in principle wouldn't represent leaks even if
// the client is not explicitly closed because they are eventually closed.
defer client.Close()

// Start a container to have some data to enrich events
testClient, err := dockertest.NewClient()
require.NoError(t, err)
// Explicitly close client to don't affect goroutines checker
defer testClient.Close()

image := "busybox"
cmd := []string{"sleep", "60"}
labels := map[string]string{"label": "foo"}
id, err := testClient.ContainerStart(image, cmd, labels)
require.NoError(t, err)
defer testClient.ContainerRemove(id)

info, err := testClient.ContainerInspect(id)
require.NoError(t, err)
pid := info.State.Pid

config, err := common.NewConfigFrom(map[string]interface{}{
"match_fields": []string{"cid"},
})
watcherConstructor := newWatcherWith(client)
processor, err := buildDockerMetadataProcessor(logp.L(), config, watcherConstructor)
require.NoError(t, err)

t.Run("match container by container id", func(t *testing.T) {
input := &beat.Event{Fields: common.MapStr{
"cid": id,
}}
result, err := processor.Run(input)
require.NoError(t, err)

resultLabels, _ := result.Fields.GetValue("container.labels")
expectedLabels := common.MapStr{"label": "foo"}
assert.Equal(t, expectedLabels, resultLabels)
assert.Equal(t, id, result.Fields["cid"])
})

t.Run("match container by process id", func(t *testing.T) {
input := &beat.Event{Fields: common.MapStr{
"cid": id,
"process.pid": pid,
}}
result, err := processor.Run(input)
require.NoError(t, err)

resultLabels, _ := result.Fields.GetValue("container.labels")
expectedLabels := common.MapStr{"label": "foo"}
assert.Equal(t, expectedLabels, resultLabels)
assert.Equal(t, id, result.Fields["cid"])
})

t.Run("don't enrich non existing container", func(t *testing.T) {
input := &beat.Event{Fields: common.MapStr{
"cid": "notexists",
}}
result, err := processor.Run(input)
require.NoError(t, err)
assert.Equal(t, input.Fields, result.Fields)
})

err = processors.Close(processor)
require.NoError(t, err)
}

func newWatcherWith(client docker.Client) docker.WatcherConstructor {
return func(log *logp.Logger, host string, tls *docker.TLSConfig, storeShortID bool) (docker.Watcher, error) {
return docker.NewWatcherWithClient(log, client, 60*time.Second, storeShortID)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

// +build linux darwin windows
// +build !integration

package add_docker_metadata

Expand Down
32 changes: 24 additions & 8 deletions libbeat/processors/add_kubernetes_metadata/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ type cache struct {
timeout time.Duration
deleted map[string]time.Time // key -> when should this obj be deleted
metadata map[string]common.MapStr
done chan struct{}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also wait for the cache shutdown/Close to be 'completed'?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that for this processor it is ok not to wait, it only needs to stop a goroutine.

}

func newCache(cleanupTimeout time.Duration) *cache {
c := &cache{
timeout: cleanupTimeout,
deleted: make(map[string]time.Time),
metadata: make(map[string]common.MapStr),
done: make(chan struct{}),
}
go c.cleanup()
return c
Expand Down Expand Up @@ -67,15 +69,29 @@ func (c *cache) set(key string, data common.MapStr) {
}

func (c *cache) cleanup() {
ticker := time.Tick(timeout)
for now := range ticker {
c.Lock()
for k, t := range c.deleted {
if now.After(t) {
delete(c.deleted, k)
delete(c.metadata, k)
if timeout <= 0 {
return
}

ticker := time.NewTicker(timeout)
defer ticker.Stop()
for {
select {
case <-c.done:
return
case now := <-ticker.C:
c.Lock()
for k, t := range c.deleted {
if now.After(t) {
delete(c.deleted, k)
delete(c.metadata, k)
}
}
c.Unlock()
}
c.Unlock()
}
}

func (c *cache) stop() {
close(c.done)
}
10 changes: 10 additions & 0 deletions libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,16 @@ func (k *kubernetesAnnotator) Run(event *beat.Event) (*beat.Event, error) {
return event, nil
}

func (k *kubernetesAnnotator) Close() error {
if k.watcher != nil {
k.watcher.Stop()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed it, but does the watcher use the case? If so, does this Stop here block until the watcher is done before we continue closing the cache?

Instead of channels consider to use context.Context for cancellation. Grouping tasks and waiting for completion can be done e.g. via unison.TaskGroup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, the watcher actually uses the cache. But I don't think this is a problem by now, the cache can be used even after being stopped, it only doesn't clean expired entries.

Changing the watcher to use context cancellation instead of Start/Stop will require some refactors.

I will give a try to improve this, let me know if you don't consider it so blocking 🙂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, the watcher actually uses the cache.

Sorry, I was wrong, the informer used by add_kuberentes_metadata has a cache, but is a different one, the cache stopped here is not used by the watcher.

}
if k.cache != nil {
k.cache.stop()
}
return nil
}

func (k *kubernetesAnnotator) addPod(pod *kubernetes.Pod) {
metadata := k.indexers.GetMetadata(pod)
for _, m := range metadata {
Expand Down
46 changes: 34 additions & 12 deletions libbeat/processors/add_process_metadata/add_process_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ var (
)

type addProcessMetadata struct {
config config
provider processMetadataProvider
cidProvider cidProvider
log *logp.Logger
mappings common.MapStr
config config
provider processMetadataProvider
cgroupsCache *common.Cache
cidProvider cidProvider
log *logp.Logger
mappings common.MapStr
}

type processMetadata struct {
Expand All @@ -81,16 +82,22 @@ type cidProvider interface {
}

func init() {
processors.RegisterPlugin(processorName, New)
processors.RegisterPlugin(processorName, NewWithCache)
jsprocessor.RegisterPlugin("AddProcessMetadata", New)
}

// New constructs a new add_process_metadata processor.
func New(cfg *common.Config) (processors.Processor, error) {
return newProcessMetadataProcessorWithProvider(cfg, &procCache)
return newProcessMetadataProcessorWithProvider(cfg, &procCache, false)
}

func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider) (proc processors.Processor, err error) {
// NewWithCache construct a new add_process_metadata processor with cache for container IDs.
// Resulting processor implements `Close()` to release the cache resources.
func NewWithCache(cfg *common.Config) (processors.Processor, error) {
return newProcessMetadataProcessorWithProvider(cfg, &procCache, true)
}

func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider processMetadataProvider, withCache bool) (proc processors.Processor, err error) {
// Logging (each processor instance has a unique ID).
var (
id = int(instanceID.Inc())
Expand Down Expand Up @@ -118,21 +125,25 @@ func newProcessMetadataProcessorWithProvider(cfg *common.Config, provider proces
}
// don't use cgroup.ProcessCgroupPaths to save it from doing the work when container id disabled
if ok := containsValue(mappings, "container.id"); ok {
if config.CgroupCacheExpireTime != 0 {
if withCache && config.CgroupCacheExpireTime != 0 {
p.log.Debug("Initializing cgroup cache")
evictionListener := func(k common.Key, v common.Value) {
p.log.Debugf("Evicted cached cgroups for PID=%v", k)
}

cgroupsCache := common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, cgroupsCache)
p.cgroupsCache = common.NewCacheWithRemovalListener(config.CgroupCacheExpireTime, 100, evictionListener)
p.cgroupsCache.StartJanitor(config.CgroupCacheExpireTime)
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, p.cgroupsCache)
} else {
p.cidProvider = newCidProvider(config.HostPath, config.CgroupPrefixes, config.CgroupRegex, processCgroupPaths, nil)
}

}

if withCache {
return &addProcessMetadataCloser{p}, nil
}

return &p, nil
}

Expand Down Expand Up @@ -253,6 +264,17 @@ func (p *addProcessMetadata) getContainerID(pid int) (string, error) {
return cid, nil
}

type addProcessMetadataCloser struct {
addProcessMetadata
}

func (p *addProcessMetadataCloser) Close() error {
if p.addProcessMetadata.cgroupsCache != nil {
p.addProcessMetadata.cgroupsCache.StopJanitor()
}
return nil
}

// String returns the processor representation formatted as a string
func (p *addProcessMetadata) String() string {
return fmt.Sprintf("%v=[match_pids=%v, mappings=%v, ignore_missing=%v, overwrite_fields=%v, restricted_fields=%v, host_path=%v, cgroup_prefixes=%v]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ func TestAddProcessMetadata(t *testing.T) {
t.Fatal(err)
}

proc, err := newProcessMetadataProcessorWithProvider(config, testProcs)
proc, err := newProcessMetadataProcessorWithProvider(config, testProcs, true)
if test.initErr == nil {
if err != nil {
t.Fatal(err)
Expand Down
Loading