From bb79569dcf4723acd06eeba00d0baf8974961d8f Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Fri, 16 Oct 2020 15:54:00 +0200 Subject: [PATCH 01/13] [Ingest Manager] Use local temp instead of system one (#21883) [Ingest Manager] Use local temp instead of system one (#21883) --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + .../pkg/agent/application/paths/paths.go | 16 ++++++++++++++++ .../artifact/install/atomic/atomic_installer.go | 4 +++- .../install/atomic/atomic_installer_test.go | 6 ++++-- 4 files changed, 24 insertions(+), 3 deletions(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index deae2522773..d01c8a1c7bf 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -16,6 +16,7 @@ - Include inputs in action store actions {pull}21298[21298] - Fix issue where inputs without processors defined would panic {pull}21628[21628] - Partial extracted beat result in failure to spawn beat {issue}21718[21718] +- Use local temp instead of system one {pull}21883[21883] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go index b646f3796ba..fca3dbd8828 100644 --- a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go +++ b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go @@ -10,14 +10,20 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) +const ( + tempSubdir = "tmp" +) + var ( topPath string configPath string logsPath string + tmpCreator sync.Once ) func init() { @@ -37,6 +43,16 @@ func Top() string { return topPath } +// TempDir returns agent temp dir located within data dir. +func TempDir() string { + tmpDir := filepath.Join(Data(), tempSubdir) + tmpCreator.Do(func() { + // create tempdir as it probably don't exists + os.MkdirAll(tmpDir, 0750) + }) + return tmpDir +} + // Home returns a directory where binary lives func Home() string { return versionedHome(topPath) diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index 5e26436bfc4..3dc0dbe232a 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -9,6 +9,8 @@ import ( "io/ioutil" "os" "path/filepath" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) type embeddedInstaller interface { @@ -31,7 +33,7 @@ func NewInstaller(i embeddedInstaller) (*Installer, error) { // Install performs installation of program in a specific version. func (i *Installer) Install(ctx context.Context, programName, version, installDir string) error { // tar installer uses Dir of installDir to determine location of unpack - tempDir, err := ioutil.TempDir(os.TempDir(), "elastic-agent-install") + tempDir, err := ioutil.TempDir(paths.TempDir(), "elastic-agent-install") if err != nil { return err } diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go index d6266659b7d..a0bfa213ca7 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go @@ -14,6 +14,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) func TestOKInstall(t *testing.T) { @@ -25,7 +27,7 @@ func TestOKInstall(t *testing.T) { assert.NoError(t, err) ctx := context.Background() - installDir := filepath.Join(os.TempDir(), "install_dir") + installDir := filepath.Join(paths.TempDir(), "install_dir") wg.Add(1) go func() { @@ -59,7 +61,7 @@ func TestContextCancelledInstall(t *testing.T) { assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) - installDir := filepath.Join(os.TempDir(), "install_dir") + installDir := filepath.Join(paths.TempDir(), "install_dir") wg.Add(1) go func() { From 1f08e354d6b847da920fc96e113b54df26c47ffd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 16 Oct 2020 16:00:49 +0200 Subject: [PATCH 02/13] Add tests of reader of filestream input (#21814) ## What does this PR do? This PR adds tests for `logFile` in the `filestream` input. This element of the architecture is responsible for reading directly from the disk and closing the reader if the state or the position meets the configured criteria. Conditions tested in the PR: - file is removed - file is renamed - file is truncated - file is inactive for a time - file reader reaches EOF - timeout of the file reader is reached --- filebeat/input/filestream/filestream.go | 10 +- filebeat/input/filestream/filestream_test.go | 136 ++++++++++++++++++ .../filestream/filestream_test_non_windows.go | 104 ++++++++++++++ 3 files changed, 243 insertions(+), 7 deletions(-) create mode 100644 filebeat/input/filestream/filestream_test.go create mode 100644 filebeat/input/filestream/filestream_test_non_windows.go diff --git a/filebeat/input/filestream/filestream.go b/filebeat/input/filestream/filestream.go index 4d42bbf6242..1a559c67e06 100644 --- a/filebeat/input/filestream/filestream.go +++ b/filebeat/input/filestream/filestream.go @@ -138,20 +138,16 @@ func (f *logFile) Read(buf []byte) (int, error) { } func (f *logFile) startFileMonitoringIfNeeded() { - if f.closeInactive == 0 && f.closeAfterInterval == 0 { - return - } - - if f.closeInactive > 0 { + if f.closeInactive > 0 || f.closeRemoved || f.closeRenamed { f.tg.Go(func(ctx unison.Canceler) error { - f.closeIfTimeout(ctx) + f.periodicStateCheck(ctx) return nil }) } if f.closeAfterInterval > 0 { f.tg.Go(func(ctx unison.Canceler) error { - f.periodicStateCheck(ctx) + f.closeIfTimeout(ctx) return nil }) } diff --git a/filebeat/input/filestream/filestream_test.go b/filebeat/input/filestream/filestream_test.go new file mode 100644 index 00000000000..329fa0ad55f --- /dev/null +++ b/filebeat/input/filestream/filestream_test.go @@ -0,0 +1,136 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "io" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestLogFileTimedClosing(t *testing.T) { + testCases := map[string]struct { + inactive time.Duration + closeEOF bool + afterInterval time.Duration + expectedErr error + }{ + "read from file and close inactive": { + inactive: 2 * time.Second, + expectedErr: ErrClosed, + }, + "read from file and close after interval": { + afterInterval: 3 * time.Second, + expectedErr: ErrClosed, + }, + "read from file and close on EOF": { + closeEOF: true, + expectedErr: io.EOF, + }, + } + + for name, test := range testCases { + test := test + + f := createTestLogFile() + defer f.Close() + defer os.Remove(f.Name()) + + t.Run(name, func(t *testing.T) { + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Inactive: test.inactive, + }, + Reader: readerCloserConfig{ + OnEOF: test.closeEOF, + AfterInterval: test.afterInterval, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, test.expectedErr, err) + }) + } +} + +func TestLogFileTruncated(t *testing.T) { + f := createTestLogFile() + defer f.Close() + defer os.Remove(f.Name()) + + reader, err := newFileReader(logp.L(), context.TODO(), f, readerConfig{}, closerConfig{}) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = f.Truncate(0) + if err != nil { + t.Fatalf("error while truncating file: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, ErrFileTruncate, err) +} + +func createTestLogFile() *os.File { + f, err := ioutil.TempFile("", "filestream_reader_test") + if err != nil { + panic(err) + } + content := []byte("first log line\nanother interesting line\na third log message\n") + if _, err := f.Write(content); err != nil { + panic(err) + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + panic(err) + } + return f +} + +func readUntilError(reader *logFile) error { + buf := make([]byte, 1024) + _, err := reader.Read(buf) + for err == nil { + buf := make([]byte, 1024) + _, err = reader.Read(buf) + } + return err +} diff --git a/filebeat/input/filestream/filestream_test_non_windows.go b/filebeat/input/filestream/filestream_test_non_windows.go new file mode 100644 index 00000000000..9c2b33ed3de --- /dev/null +++ b/filebeat/input/filestream/filestream_test_non_windows.go @@ -0,0 +1,104 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package filestream + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +// these tests are separated as one cannot delete/rename files +// while another process is working with it on Windows +func TestLogFileRenamed(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + renamedFile := f.Name() + ".renamed" + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Renamed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = os.Rename(f.Name(), renamedFile) + if err != nil { + t.Fatalf("error while renaming file: %+v", err) + } + + err = readUntilError(reader) + os.Remove(renamedFile) + + assert.Equal(t, ErrClosed, err) +} + +func TestLogFileRemoved(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Removed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = os.Remove(f.Name()) + if err != nil { + t.Fatalf("error while remove file: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, ErrClosed, err) +} From 9333376466d33542354479862ecfebce2723d177 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Fri, 16 Oct 2020 16:06:09 +0100 Subject: [PATCH 03/13] [CI] lint stage doesn't produce test reports (#21888) --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 6eef1b2d0a8..70cefee034b 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -74,7 +74,7 @@ pipeline { } steps { withGithubNotify(context: 'Lint') { - withBeatsEnv(archive: true, id: 'lint') { + withBeatsEnv(archive: false, id: 'lint') { dumpVariables() cmd(label: 'make check', script: 'make check') } From 73dbb23daa3b0d8ca0f3bb7d8f8e0e89baa2e32c Mon Sep 17 00:00:00 2001 From: Toby McLaughlin Date: Sat, 17 Oct 2020 01:45:18 +1030 Subject: [PATCH 04/13] [docs] Remove extra word in autodiscover docs (#21871) --- libbeat/docs/shared-autodiscover.asciidoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index c7993c29bef..df0ea4d2e02 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -24,7 +24,7 @@ start/stop events. This ensures you don't need to worry about state, but only de The Docker autodiscover provider watches for Docker containers to start and stop. -These are the available fields during within config templating. The `docker.*` fields will be available on each emitted event. +These are the fields available within config templating. The `docker.*` fields will be available on each emitted event. event: * host @@ -130,7 +130,7 @@ endif::[] The Kubernetes autodiscover provider watches for Kubernetes nodes, pods, services to start, update, and stop. -These are the available fields during within config templating. The `kubernetes.*` fields will be available on each emitted event. +These are the fields available within config templating. The `kubernetes.*` fields will be available on each emitted event. [float] ====== Generic fields: From f936a45b3863bff20d26d9bfbd410779ffe5dc65 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Fri, 16 Oct 2020 16:34:59 +0100 Subject: [PATCH 05/13] [CI] Add stage name in the step (#21887) --- Jenkinsfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile b/Jenkinsfile index 70cefee034b..52c579ab7f5 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -220,7 +220,7 @@ def target(Map args = [:]) { // make commands use -C while mage commands require the dir(folder) // let's support this scenario with the location variable. dir(isMage ? directory : '') { - cmd(label: "${command}", script: "${command}") + cmd(label: "${args.id?.trim() ? args.id : env.STAGE_NAME} - ${command}", script: "${command}") } } } From 4427fa59213839f075858266d94ca4f495e9d514 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Fri, 16 Oct 2020 17:54:27 +0200 Subject: [PATCH 06/13] Refactor docker watcher to fix flaky test and other small issues (#21851) Refactor docker watcher to fix some small issues and improve testability: * Actually release resources of previous connections when reconnecting. * Watcher uses a clock that can be mocked in tests for time-sensitive functionality. * Use nanoseconds-precision from events timestamps, this is important to avoid duplicated events on reconnection. * Fix logger initialization (it was being initialized as docker.docker). * Refactor test helpers to have more control on test watcher when needed. * Some other code refactors. --- CHANGELOG.next.asciidoc | 1 + libbeat/common/docker/watcher.go | 282 ++++++++++++++------------ libbeat/common/docker/watcher_test.go | 139 ++++++++----- 3 files changed, 242 insertions(+), 180 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e2b5844c192..51255305f42 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -186,6 +186,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197] - The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21259[21258] - Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349] +- Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851] *Auditbeat* diff --git a/libbeat/common/docker/watcher.go b/libbeat/common/docker/watcher.go index 2421c232eee..4145423209a 100644 --- a/libbeat/common/docker/watcher.go +++ b/libbeat/common/docker/watcher.go @@ -20,7 +20,8 @@ package docker import ( - "fmt" + "context" + "io" "net/http" "sync" "time" @@ -29,7 +30,6 @@ import ( "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/go-connections/tlsconfig" - "golang.org/x/net/context" "github.com/elastic/beats/v7/libbeat/common/bus" "github.com/elastic/beats/v7/libbeat/logp" @@ -39,7 +39,6 @@ import ( const ( shortIDLen = 12 dockerRequestTimeout = 10 * time.Second - dockerWatchRequestTimeout = 60 * time.Minute dockerEventsWatchPityTimerInterval = 10 * time.Second dockerEventsWatchPityTimerTimeout = 10 * time.Minute ) @@ -74,20 +73,30 @@ type TLSConfig struct { type watcher struct { sync.RWMutex - log *logp.Logger - client Client - ctx context.Context - stop context.CancelFunc - containers map[string]*Container - deleted map[string]time.Time // deleted annotations key -> last access time - cleanupTimeout time.Duration - lastValidTimestamp int64 - lastWatchReceivedEventTime time.Time - stopped sync.WaitGroup - bus bus.Bus - shortID bool // whether to store short ID in "containers" too + log *logp.Logger + client Client + ctx context.Context + stop context.CancelFunc + containers map[string]*Container + deleted map[string]time.Time // deleted annotations key -> last access time + cleanupTimeout time.Duration + clock clock + stopped sync.WaitGroup + bus bus.Bus + shortID bool // whether to store short ID in "containers" too } +// clock is an interface used to provide mocked time on testing +type clock interface { + Now() time.Time +} + +// systemClock implements the clock interface using the system clock via the time package +type systemClock struct{} + +// Now returns the current time +func (*systemClock) Now() time.Time { return time.Now() } + // Container info retrieved by the watcher type Container struct { ID string @@ -147,8 +156,6 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool // NewWatcherWithClient creates a new Watcher from a given Docker client func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) { - log = log.Named("docker") - ctx, cancel := context.WithCancel(context.Background()) return &watcher{ log: log, @@ -160,6 +167,7 @@ func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.D cleanupTimeout: cleanupTimeout, bus: bus.New(log, "docker"), shortID: storeShortID, + clock: &systemClock{}, }, nil } @@ -177,7 +185,7 @@ func (w *watcher) Container(ID string) *Container { // Update last access time if it's deleted if ok { w.Lock() - w.deleted[container.ID] = time.Now() + w.deleted[container.ID] = w.clock.Now() w.Unlock() } @@ -201,7 +209,6 @@ func (w *watcher) Containers() map[string]*Container { func (w *watcher) Start() error { // Do initial scan of existing containers w.log.Debug("Start docker containers scanner") - w.lastValidTimestamp = time.Now().Unix() w.Lock() defer w.Unlock() @@ -236,108 +243,124 @@ func (w *watcher) Start() error { func (w *watcher) Stop() { w.stop() + w.stopped.Wait() } func (w *watcher) watch() { - log := w.log + defer w.stopped.Done() filter := filters.NewArgs() filter.Add("type", "container") - for { + // Ticker to restart the watcher when no events are received after some time. + tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval) + defer tickChan.Stop() + + lastValidTimestamp := w.clock.Now() + + watch := func() bool { + lastReceivedEventTime := w.clock.Now() + + w.log.Debugf("Fetching events since %s", lastValidTimestamp) + options := types.EventsOptions{ - Since: fmt.Sprintf("%d", w.lastValidTimestamp), + Since: lastValidTimestamp.Format(time.RFC3339Nano), Filters: filter, } - log.Debugf("Fetching events since %s", options.Since) - ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout) + ctx, cancel := context.WithCancel(w.ctx) defer cancel() events, errors := w.client.Events(ctx, options) - - //ticker for timeout to restart watcher when no events are received - w.lastWatchReceivedEventTime = time.Now() - tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval) - defer tickChan.Stop() - - WATCH: for { select { case event := <-events: - log.Debugf("Got a new docker event: %v", event) - w.lastValidTimestamp = event.Time - w.lastWatchReceivedEventTime = time.Now() - - // Add / update - if event.Action == "start" || event.Action == "update" { - filter := filters.NewArgs() - filter.Add("id", event.Actor.ID) - - containers, err := w.listContainers(types.ContainerListOptions{ - Filters: filter, - }) - if err != nil || len(containers) != 1 { - log.Errorf("Error getting container info: %v", err) - continue - } - container := containers[0] - - w.Lock() - w.containers[event.Actor.ID] = container - if w.shortID { - w.containers[event.Actor.ID[:shortIDLen]] = container - } - // un-delete if it's flagged (in case of update or recreation) - delete(w.deleted, event.Actor.ID) - w.Unlock() - - w.bus.Publish(bus.Event{ - "start": true, - "container": container, - }) - } - - // Delete - if event.Action == "die" { - container := w.Container(event.Actor.ID) - if container != nil { - w.bus.Publish(bus.Event{ - "stop": true, - "container": container, - }) - } - - w.Lock() - w.deleted[event.Actor.ID] = time.Now() - w.Unlock() + w.log.Debugf("Got a new docker event: %v", event) + lastValidTimestamp = time.Unix(event.Time, event.TimeNano) + lastReceivedEventTime = w.clock.Now() + + switch event.Action { + case "start", "update": + w.containerUpdate(event) + case "die": + w.containerDelete(event) } - case err := <-errors: - // Restart watch call - if err == context.DeadlineExceeded { - log.Info("Context deadline exceeded for docker request, restarting watch call") - } else { - log.Errorf("Error watching for docker events: %+v", err) + switch err { + case io.EOF: + // Client disconnected, watch is not done, reconnect + w.log.Debug("EOF received in events stream, restarting watch call") + case context.DeadlineExceeded: + w.log.Debug("Context deadline exceeded for docker request, restarting watch call") + case context.Canceled: + // Parent context has been canceled, watch is done. + return true + default: + w.log.Errorf("Error watching for docker events: %+v", err) } - - time.Sleep(1 * time.Second) - break WATCH - + return false case <-tickChan.C: - if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout { - log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) - time.Sleep(1 * time.Second) - break WATCH + if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout { + w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) + return false } - case <-w.ctx.Done(): - log.Debug("Watcher stopped") - w.stopped.Done() - return + w.log.Debug("Watcher stopped") + return true } } + } + for { + done := watch() + if done { + return + } + // Wait before trying to reconnect + time.Sleep(1 * time.Second) + } +} + +func (w *watcher) containerUpdate(event events.Message) { + filter := filters.NewArgs() + filter.Add("id", event.Actor.ID) + + containers, err := w.listContainers(types.ContainerListOptions{ + Filters: filter, + }) + if err != nil || len(containers) != 1 { + w.log.Errorf("Error getting container info: %v", err) + return + } + container := containers[0] + + w.Lock() + w.containers[event.Actor.ID] = container + if w.shortID { + w.containers[event.Actor.ID[:shortIDLen]] = container + } + // un-delete if it's flagged (in case of update or recreation) + delete(w.deleted, event.Actor.ID) + w.Unlock() + + w.bus.Publish(bus.Event{ + "start": true, + "container": container, + }) +} + +func (w *watcher) containerDelete(event events.Message) { + container := w.Container(event.Actor.ID) + + w.Lock() + w.deleted[event.Actor.ID] = w.clock.Now() + w.Unlock() + + if container != nil { + w.bus.Publish(bus.Event{ + "stop": true, + "container": container, + }) } } @@ -393,49 +416,52 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain // Clean up deleted containers after they are not used anymore func (w *watcher) cleanupWorker() { - log := w.log + defer w.stopped.Done() for { select { case <-w.ctx.Done(): - w.stopped.Done() return // Wait a full period case <-time.After(w.cleanupTimeout): - // Check entries for timeout - var toDelete []string - timeout := time.Now().Add(-w.cleanupTimeout) - w.RLock() - for key, lastSeen := range w.deleted { - if lastSeen.Before(timeout) { - log.Debugf("Removing container %s after cool down timeout", key) - toDelete = append(toDelete, key) - } - } - w.RUnlock() - - // Delete timed out entries: - for _, key := range toDelete { - container := w.Container(key) - if container != nil { - w.bus.Publish(bus.Event{ - "delete": true, - "container": container, - }) - } - } + w.runCleanup() + } + } +} - w.Lock() - for _, key := range toDelete { - delete(w.deleted, key) - delete(w.containers, key) - if w.shortID { - delete(w.containers, key[:shortIDLen]) - } - } - w.Unlock() +func (w *watcher) runCleanup() { + // Check entries for timeout + var toDelete []string + timeout := w.clock.Now().Add(-w.cleanupTimeout) + w.RLock() + for key, lastSeen := range w.deleted { + if lastSeen.Before(timeout) { + w.log.Debugf("Removing container %s after cool down timeout", key) + toDelete = append(toDelete, key) + } + } + w.RUnlock() + + // Delete timed out entries: + for _, key := range toDelete { + container := w.Container(key) + if container != nil { + w.bus.Publish(bus.Event{ + "delete": true, + "container": container, + }) + } + } + + w.Lock() + for _, key := range toDelete { + delete(w.deleted, key) + delete(w.containers, key) + if w.shortID { + delete(w.containers, key[:shortIDLen]) } } + w.Unlock() } // ListenStart returns a bus listener to receive container started events, with a `container` key holding it diff --git a/libbeat/common/docker/watcher_test.go b/libbeat/common/docker/watcher_test.go index ec53fbdeb73..a0de0567af4 100644 --- a/libbeat/common/docker/watcher_test.go +++ b/libbeat/common/docker/watcher_test.go @@ -21,6 +21,7 @@ package docker import ( "errors" + "sync" "testing" "time" @@ -37,7 +38,7 @@ type MockClient struct { containers [][]types.Container // event list to send on Events call events []interface{} - + // done channel is closed when the client has sent all events done chan interface{} } @@ -71,7 +72,7 @@ func (m *MockClient) ContainerInspect(ctx context.Context, container string) (ty } func TestWatcherInitialization(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -90,7 +91,8 @@ func TestWatcherInitialization(t *testing.T) { }, }, }, - nil) + nil, + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -109,7 +111,7 @@ func TestWatcherInitialization(t *testing.T) { } func TestWatcherInitializationShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -128,7 +130,9 @@ func TestWatcherInitializationShortID(t *testing.T) { }, }, }, - nil, true) + nil, + true, + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -154,7 +158,7 @@ func TestWatcherInitializationShortID(t *testing.T) { } func TestWatcherAddEvents(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -188,7 +192,7 @@ func TestWatcherAddEvents(t *testing.T) { }, }, }, - ) + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -207,7 +211,7 @@ func TestWatcherAddEvents(t *testing.T) { } func TestWatcherAddEventsShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -242,7 +246,7 @@ func TestWatcherAddEventsShortID(t *testing.T) { }, }, true, - ) + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -261,7 +265,7 @@ func TestWatcherAddEventsShortID(t *testing.T) { } func TestWatcherUpdateEvent(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -295,7 +299,7 @@ func TestWatcherUpdateEvent(t *testing.T) { }, }, }, - ) + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -309,7 +313,7 @@ func TestWatcherUpdateEvent(t *testing.T) { } func TestWatcherUpdateEventShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -344,7 +348,7 @@ func TestWatcherUpdateEventShortID(t *testing.T) { }, }, true, - ) + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -358,9 +362,7 @@ func TestWatcherUpdateEventShortID(t *testing.T) { } func TestWatcherDie(t *testing.T) { - t.Skip("flaky test: https://github.com/elastic/beats/issues/7906") - - watcher := runWatcher(t, false, + watcher, clientDone := testWatcher(t, false, [][]types.Container{ []types.Container{ types.Container{ @@ -381,32 +383,37 @@ func TestWatcherDie(t *testing.T) { }, }, ) + + clock := newTestClock() + watcher.clock = clock + + stopListener := watcher.ListenStop() + + watcher.Start() defer watcher.Stop() // Check it doesn't get removed while we request meta for the container for i := 0; i < 18; i++ { watcher.Container("0332dbd79e20") - assert.Equal(t, 1, len(watcher.Containers())) - time.Sleep(50 * time.Millisecond) - } - - // Checks a max of 10s for the watcher containers to be updated - for i := 0; i < 100; i++ { - // Now it should get removed - time.Sleep(100 * time.Millisecond) - - if len(watcher.Containers()) == 0 { + clock.Sleep(watcher.cleanupTimeout / 2) + watcher.runCleanup() + if !assert.Equal(t, 1, len(watcher.Containers())) { break } } + // Wait to be sure that the delete event has been processed + <-clientDone + <-stopListener.Events() + + // Check that after the cleanup period the container is removed + clock.Sleep(watcher.cleanupTimeout + 1*time.Second) + watcher.runCleanup() assert.Equal(t, 0, len(watcher.Containers())) } func TestWatcherDieShortID(t *testing.T) { - t.Skip("flaky test: https://github.com/elastic/beats/issues/7906") - - watcher := runWatcherShortID(t, false, + watcher, clientDone := testWatcherShortID(t, false, [][]types.Container{ []types.Container{ types.Container{ @@ -428,33 +435,40 @@ func TestWatcherDieShortID(t *testing.T) { }, true, ) + + clock := newTestClock() + watcher.clock = clock + + stopListener := watcher.ListenStop() + + watcher.Start() defer watcher.Stop() // Check it doesn't get removed while we request meta for the container for i := 0; i < 18; i++ { watcher.Container("0332dbd79e20") - assert.Equal(t, 1, len(watcher.Containers())) - time.Sleep(50 * time.Millisecond) - } - - // Checks a max of 10s for the watcher containers to be updated - for i := 0; i < 100; i++ { - // Now it should get removed - time.Sleep(100 * time.Millisecond) - - if len(watcher.Containers()) == 0 { + clock.Sleep(watcher.cleanupTimeout / 2) + watcher.runCleanup() + if !assert.Equal(t, 1, len(watcher.Containers())) { break } } + // Wait to be sure that the delete event has been processed + <-clientDone + <-stopListener.Events() + + // Check that after the cleanup period the container is removed + clock.Sleep(watcher.cleanupTimeout + 1*time.Second) + watcher.runCleanup() assert.Equal(t, 0, len(watcher.Containers())) } -func runWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) *watcher { - return runWatcherShortID(t, kill, containers, events, false) +func testWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) (*watcher, chan interface{}) { + return testWatcherShortID(t, kill, containers, events, false) } -func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) *watcher { +func testWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) (*watcher, chan interface{}) { logp.TestingSetup() client := &MockClient{ @@ -472,16 +486,37 @@ func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, t.Fatal("'watcher' was supposed to be pointer to the watcher structure") } - err = watcher.Start() - if err != nil { - t.Fatal(err) - } + return watcher, client.done +} - <-client.done - if kill { - watcher.Stop() - watcher.stopped.Wait() - } +func runAndWait(w *watcher, done chan interface{}) *watcher { + w.Start() + <-done + w.Stop() + return w +} + +type testClock struct { + sync.Mutex + + now time.Time +} + +func newTestClock() *testClock { + return &testClock{now: time.Time{}} +} + +func (c *testClock) Now() time.Time { + c.Lock() + defer c.Unlock() + + c.now = c.now.Add(1) + return c.now +} + +func (c *testClock) Sleep(d time.Duration) { + c.Lock() + defer c.Unlock() - return watcher + c.now = c.now.Add(d) } From f2a1ba304a9074515ef1d6f618813f6dbb7f7011 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Fri, 16 Oct 2020 18:00:03 -0400 Subject: [PATCH 07/13] [libbeat] Fix potential deadlock in the disk queue + add more unit tests (#21930) --- .../publisher/queue/diskqueue/core_loop.go | 27 +- .../queue/diskqueue/core_loop_test.go | 370 +++++++++++++++++- .../publisher/queue/diskqueue/reader_loop.go | 8 +- 3 files changed, 400 insertions(+), 5 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index 638d9da2f40..77f4aadb47f 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -169,8 +169,16 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) { // A segment in the writing list can't be finished writing, // so we don't check the endOffset. segment = dq.segments.writing[0] + if response.err != nil { + // Errors reading a writing segment are awkward since we can't discard + // them until the writer loop is done with them. Instead we just seek + // to the end of the current data region. If we're lucky this lets us + // skip the intervening errors; if not, the segment will be cleaned up + // after the writer loop is done with it. + dq.segments.nextReadOffset = segment.endOffset + } } - segment.framesRead = uint64(dq.segments.nextReadFrameID - segment.firstFrameID) + segment.framesRead += response.frameCount // If there was an error, report it. if response.err != nil { @@ -346,6 +354,16 @@ func (dq *diskQueue) maybeReadPending() { // A read request is already pending return } + // Check if the next reading segment has already been completely read. (This + // can happen if it was being written and read simultaneously.) In this case + // we should move it to the acking list and proceed to the next segment. + if len(dq.segments.reading) > 0 && + dq.segments.nextReadOffset >= dq.segments.reading[0].endOffset { + dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0]) + dq.segments.reading = dq.segments.reading[1:] + dq.segments.nextReadOffset = 0 + } + // Get the next available segment from the reading or writing lists. segment := dq.segments.readingSegment() if segment == nil || dq.segments.nextReadOffset >= segmentOffset(segment.endOffset) { @@ -353,7 +371,12 @@ func (dq *diskQueue) maybeReadPending() { return } if dq.segments.nextReadOffset == 0 { - // If we're reading the beginning of this segment, assign its firstFrameID. + // If we're reading the beginning of this segment, assign its firstFrameID + // so we can recognize its acked frames later. + // The first segment we read might not have its initial nextReadOffset + // set to 0 if the segment was already partially read on a previous run. + // However that can only happen when nextReadFrameID == 0, so we don't + // need to do anything in that case. segment.firstFrameID = dq.segments.nextReadFrameID } request := readerLoopRequest{ diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go index b5f0d301d15..309a145968d 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop_test.go +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -17,7 +17,12 @@ package diskqueue -import "testing" +import ( + "fmt" + "testing" + + "github.com/elastic/beats/v7/libbeat/logp" +) func TestProducerWriteRequest(t *testing.T) { dq := &diskQueue{settings: DefaultSettings()} @@ -92,3 +97,366 @@ func TestHandleWriterLoopResponse(t *testing.T) { dq.segments.writing[0].endOffset) } } + +func TestHandleReaderLoopResponse(t *testing.T) { + // handleReaderLoopResponse should: + // - advance segments.{nextReadFrameID, nextReadOffset} by the values in + // response.{frameCount, byteCount} + // - advance the target segment's framesRead field by response.frameCount + // - if reading[0] encountered an error or was completely read, move it from + // the reading list to the acking list and reset nextReadOffset to zero + // - if writing[0] encountered an error, advance nextReadOffset to the + // segment's current endOffset (we can't discard the active writing + // segment like we do for errors in the reading list, but we can still + // mark the remaining data as processed) + + testCases := map[string]struct { + // The segment structure to start with before calling maybeReadPending + segments diskQueueSegments + response readerLoopResponse + + expectedFrameID frameID + expectedOffset segmentOffset + expectedACKingSegment *segmentID + }{ + "completely read first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 10, + byteCount: 1000, + }, + expectedFrameID: 15, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "read first half of first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 500, + }, + "read second half of first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + nextReadOffset: 500, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "read of first reading segment aborted by error": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 1, + byteCount: 100, + err: fmt.Errorf("something bad happened"), + }, + expectedFrameID: 6, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "completely read first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 10, + byteCount: 1000, + }, + expectedFrameID: 15, + expectedOffset: 1000, + }, + "read first half of first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 500, + }, + "read second half of first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadOffset: 500, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 1000, + }, + "error reading a writing segments skips remaining data": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 1, + byteCount: 100, + err: fmt.Errorf("something bad happened"), + }, + expectedFrameID: 6, + expectedOffset: 1000, + }, + } + + for description, test := range testCases { + dq := &diskQueue{ + logger: logp.L(), + settings: DefaultSettings(), + segments: test.segments, + } + dq.handleReaderLoopResponse(test.response) + + if dq.segments.nextReadFrameID != test.expectedFrameID { + t.Errorf("%s: expected nextReadFrameID = %d, got %d", + description, test.expectedFrameID, dq.segments.nextReadFrameID) + } + if dq.segments.nextReadOffset != test.expectedOffset { + t.Errorf("%s: expected nextReadOffset = %d, got %d", + description, test.expectedOffset, dq.segments.nextReadOffset) + } + if test.expectedACKingSegment != nil { + if len(dq.segments.acking) == 0 { + t.Errorf("%s: expected acking segment %d, got none", + description, *test.expectedACKingSegment) + } else if dq.segments.acking[0].id != *test.expectedACKingSegment { + t.Errorf("%s: expected acking segment %d, got %d", + description, *test.expectedACKingSegment, dq.segments.acking[0].id) + } + } else if len(dq.segments.acking) != 0 { + t.Errorf("%s: expected no acking segment, got %v", + description, *dq.segments.acking[0]) + } + } +} + +func TestMaybeReadPending(t *testing.T) { + // maybeReadPending should: + // - If any unread data is available in a reading or writing segment, + // send a readerLoopRequest for the full amount available in the + // first such segment. + // - When creating a readerLoopRequest that includes the beginning of + // a segment (startOffset == 0), set that segment's firstFrameID + // to segments.nextReadFrameID (so ACKs based on frame ID can be linked + // back to the segment that generated them). + // - If the first reading segment has already been completely read (which + // can happen if it was read while still in the writing list), move it to + // the acking list and set segments.nextReadOffset to 0. + + testCases := map[string]struct { + // The segment structure to start with before calling maybeReadPending + segments diskQueueSegments + // The request we expect to see on the reader loop's request channel, + // or nil if there should be none. + expectedRequest *readerLoopRequest + // The segment ID we expect to see in the acking list, or nil for none. + expectedACKingSegment *segmentID + }{ + "read one full segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + // The next read request should start with frame 5 + nextReadFrameID: 5, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startFrameID: 5, + startOffset: 0, + endOffset: 1000, + }, + }, + "read the end of a segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + // The next read request should start with frame 5 + nextReadFrameID: 5, + // Start reading at position 500 + nextReadOffset: 500, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startFrameID: 5, + // Should be reading from nextReadOffset (500) to the end of + // the segment (1000). + startOffset: 500, + endOffset: 1000, + }, + }, + "ignore writing segments if reading is available": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startOffset: 0, + endOffset: 1000, + }, + }, + "do nothing if no segments are available": { + segments: diskQueueSegments{}, + expectedRequest: nil, + }, + "read the writing segment if no reading segments are available": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + nextReadOffset: 500, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 2}, + startOffset: 500, + endOffset: 1000, + }, + }, + "do nothing if the writing segment has already been fully read": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + nextReadOffset: 1000, + }, + expectedRequest: nil, + }, + "skip the first reading segment if it's already been fully read": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + {id: 2, endOffset: 500}, + }, + nextReadOffset: 1000, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 2}, + startOffset: 0, + endOffset: 500, + }, + expectedACKingSegment: segmentIDRef(1), + }, + "move empty reading segment to the acking list if it's the only one": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadOffset: 1000, + }, + expectedRequest: nil, + expectedACKingSegment: segmentIDRef(1), + }, + } + + for description, test := range testCases { + dq := &diskQueue{ + settings: DefaultSettings(), + segments: test.segments, + readerLoop: &readerLoop{ + requestChan: make(chan readerLoopRequest, 1), + }, + } + firstFrameID := test.segments.nextReadFrameID + dq.maybeReadPending() + select { + case request := <-dq.readerLoop.requestChan: + if test.expectedRequest == nil { + t.Errorf("%s: expected no read request, got %v", + description, request) + break + } + if !equalReaderLoopRequests(request, *test.expectedRequest) { + t.Errorf("%s: expected request %v, got %v", + description, *test.expectedRequest, request) + } + if request.startOffset == 0 && + request.segment.firstFrameID != firstFrameID { + t.Errorf( + "%s: maybeReadPending should update firstFrameID", description) + } + default: + if test.expectedRequest != nil { + t.Errorf("%s: expected read request %v, got none", + description, test.expectedRequest) + } + } + if test.expectedACKingSegment != nil { + if len(dq.segments.acking) != 1 { + t.Errorf("%s: expected acking segment %v, got none", + description, *test.expectedACKingSegment) + } else if dq.segments.acking[0].id != *test.expectedACKingSegment { + t.Errorf("%s: expected acking segment %v, got %v", + description, *test.expectedACKingSegment, dq.segments.acking[0].id) + } + if dq.segments.nextReadOffset != 0 { + t.Errorf("%s: expected read offset 0 after acking segment, got %v", + description, dq.segments.nextReadOffset) + } + } else if len(dq.segments.acking) != 0 { + t.Errorf("%s: expected no acking segment, got %v", + description, *dq.segments.acking[0]) + } + } +} + +func segmentIDRef(id segmentID) *segmentID { + return &id +} + +func equalReaderLoopRequests( + r0 readerLoopRequest, r1 readerLoopRequest, +) bool { + // We compare segment ids rather than segment pointers because it's + // awkward to include the same pointer repeatedly in the test definition. + return r0.startOffset == r1.startOffset && + r0.endOffset == r1.endOffset && + r0.segment.id == r1.segment.id && + r0.startFrameID == r1.startFrameID +} diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index dc2bb95777f..5b30f03e81d 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -35,6 +35,8 @@ type readerLoopResponse struct { frameCount uint64 // The number of bytes successfully read from the requested segment file. + // If this is less than (endOffset - startOffset) from the original request, + // then err is guaranteed to be non-nil. byteCount uint64 // If there was an error in the segment file (i.e. inconsistent data), the @@ -100,7 +102,8 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon return readerLoopResponse{err: err} } defer handle.Close() - _, err = handle.Seek(segmentHeaderSize+int64(request.startOffset), 0) + _, err = handle.Seek( + segmentHeaderSize+int64(request.startOffset), os.SEEK_SET) if err != nil { return readerLoopResponse{err: err} } @@ -137,7 +140,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon } // We are done with this request if: - // - there was an error reading the frame, + // - there was an error reading the frame // - there are no more frames to read, or // - we have reached the end of the requested region if err != nil || frame == nil || byteCount >= targetLength { @@ -166,6 +169,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon // nextFrame reads and decodes one frame from the given file handle, as long // it does not exceed the given length bound. The returned frame leaves the // segment and frame IDs unset. +// The returned error will be set if and only if the returned frame is nil. func (rl *readerLoop) nextFrame( handle *os.File, maxLength uint64, ) (*readFrame, error) { From 7c7261054383c87014286df6249aa95cf4ea6a21 Mon Sep 17 00:00:00 2001 From: DeDe Morton Date: Fri, 16 Oct 2020 15:15:28 -0700 Subject: [PATCH 08/13] Add 7.7.1 relnotes to 7.8 docs (#21937) (#21941) * Add 7.7.1 changelog * Fix 15838 issue placement in CHANGELOG (#19105) Fix for https://github.com/elastic/beats/issues/15838 has first arrived in 7.6.1, not 7.5.0. Verification: https://github.com/elastic/beats/compare/v7.6.0...v7.6.1 * Add relnotes link Co-authored-by: Grzegorz Banasiak Co-authored-by: Grzegorz Banasiak --- CHANGELOG.asciidoc | 39 ++++++++++++++++++++++++++++++++++- libbeat/docs/release.asciidoc | 1 + 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c4d0f48005f..1dfbb2fb889 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -457,6 +457,38 @@ https://github.com/elastic/beats/compare/v7.7.0...v7.8.0[View commits] - Add support for event IDs 4673,4674,4697,4698,4699,4700,4701,4702,4768,4769,4770,4771,4776,4778,4779,4964 to the Security module. {pull}17517[17517] - Add registry and code signature information and ECS categorization fields for sysmon module. {pull}18058[18058] +[[release-notes-7.7.1]] +=== Beats version 7.7.1 +https://github.com/elastic/beats/compare/v7.7.0...v7.7.1[View commits] + +==== Bugfixes + +*Affecting all Beats* + +- Fix `keystore add` command hanging on Windows. {issue}18649[18649] {pull}18654[18654] + +*Filebeat* + +- Unescape filenames in SQS messages to resolve file paths correctly. {pull}18370[18370] +- Improve failure handler for Cisco ASA and FTD pipelines to avoid mapping temporary fields. {issue}18391[18391] {pull}18392[18392] +- Fix `source.address` field not being set for the Nginx `ingress_controller` fileset. {pull}18511[18511] +- Fix Google Cloud `audit` fileset to only take in fields that are explicitly defined by the fileset. {issue}18465[18465] {pull}18472[18472] +- Fix rate limit related issue in the `httpjson` input for the Okta module. {issue}18530[18530] {pull}18534[18534] +- Fix Cisco ASA and FTD parsing errors caused by NAT fields that contain a hostname instead of an IP. {issue}14034[14034] {pull}18376[18376] +- Fix PANW module to use correct mappings for bytes and packets counters. {issue}18522[18522] {pull}18525[18525] +- Fix Office 365 ingest failures caused by IP addresses surrounded by square brackets. {issue}18587[18587] {pull}18591[18591] + +*Metricbeat* + +- Fix `tags_filter` setting to work correctly for the AWS `cloudwatch` metricset. {pull}18524[18524] + +==== Added + +*Filebeat* + +- Add support for Google Application Default Credentials to the Google Pub/Sub input and Google Cloud modules. {pull}15668[15668] +- Make `decode_cef` processor GA. {pull}17944[17944] + [[release-notes-7.7.0]] === Beats version 7.7.0 https://github.com/elastic/beats/compare/v7.6.2...v7.7.0[View commits] @@ -729,6 +761,12 @@ https://github.com/elastic/beats/compare/v7.6.0...v7.6.1[View commits] - Fix timeout option of GCP functions. {issue}16282[16282] {pull}16287[16287] +==== Added + +*Winlogbeat* + +- Made the event parser more lenient w.r.t. invalid event log definition version numbers. {issue}15838[15838] + [[release-notes-7.6.0]] === Beats version 7.6.0 https://github.com/elastic/beats/compare/v7.5.1...v7.6.0[View commits] @@ -1101,7 +1139,6 @@ processing events. (CVE-2019-17596) See https://www.elastic.co/community/securit - Fill `event.provider`. {pull}13937[13937] - Add support for user management events to the Security module. {pull}13530[13530] -- Made the event parser more lenient w.r.t. invalid event log definition version numbers. {issue}15838[15838] ==== Deprecated diff --git a/libbeat/docs/release.asciidoc b/libbeat/docs/release.asciidoc index 24e0ee43651..90dd214787a 100644 --- a/libbeat/docs/release.asciidoc +++ b/libbeat/docs/release.asciidoc @@ -13,6 +13,7 @@ upgrade. * <> * <> * <> +* <> * <> * <> * <> From eeee0008b6f4d815b4ae54c88061db1bc00f4111 Mon Sep 17 00:00:00 2001 From: DeDe Morton Date: Fri, 16 Oct 2020 16:36:59 -0700 Subject: [PATCH 09/13] Apply name changes to elastic agent docs (#21549) * Apply name changes to elastic agent docs * Temporarily comment out image * Remove reviewer notes --- .../docs/elastic-agent-command-line.asciidoc | 18 +++++++++--------- ...lastic-agent-configuration-example.asciidoc | 6 +++--- .../docs/elastic-agent-configuration.asciidoc | 2 +- .../docs/run-elastic-agent.asciidoc | 14 +++++++------- .../docs/running-on-kubernetes.asciidoc | 2 +- .../docs/unenroll-elastic-agent.asciidoc | 2 +- 6 files changed, 22 insertions(+), 22 deletions(-) diff --git a/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc index e102d5b4787..49ddbdce466 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc @@ -46,10 +46,10 @@ elastic-agent enroll [--ca-sha256 ] === Options `kibana_url`:: -Required. URL of the {kib} endpoint where {ingest-manager} is running. +Required. URL of the {kib} endpoint where {fleet} is running. `enrollment_token`:: -Required. Enrollment token generated by {ingest-manager}. You can use the same +Required. Enrollment token generated by {fleet}. You can use the same enrollment token for multiple agents. `--ca-sha256 `:: @@ -60,7 +60,7 @@ verification. Comma-separated list of root certificates used for server verification. `--force`:: -Force overwrite of current configuration without prompting for confirmation. +Force overwrite of current policy without prompting for confirmation. This flag is helpful when using automation software or scripted deployments. `--help`:: @@ -125,9 +125,9 @@ elastic-agent help enroll [[elastic-agent-inspect-command]] == elastic-agent inspect -Show the current {agent} configuration. +Show the current {agent} policy. -If no parameters are specified, shows the full {agent} configuration. +If no parameters are specified, shows the full {agent} policy. [discrete] === Synopsis @@ -145,7 +145,7 @@ elastic-agent inspect output [--output ] [--program ] [discrete] === Options -`output`:: Display the current configuration for the output. This command +`output`:: Display the current policy for the output. This command accepts additional flags: + -- @@ -197,7 +197,7 @@ elastic-agent run [global-flags] These flags are valid whenever you run `elastic-agent` on the command line. `-c `:: -The configuration file to use. If not specified, {agent} uses +The policy file to use. If not specified, {agent} uses `{path.home}/elastic-agent.yml`. `--e`:: @@ -209,7 +209,7 @@ The environment in which the agent will run. //TODO: Clarify what we mean by environment by showing an example. `--path.config `:: -The directory where {agent} looks for its configuration file. The default +The directory where {agent} looks for its policy file. The default varies by platform. `--path.data `:: @@ -220,7 +220,7 @@ If not specified, {agent} uses `{path.home}/data`. `--path.home `:: The home directory of {agent}. `path.home` determines the location of the -configuration files and data directory. +policy files and data directory. + If not specified, {agent} uses the current working directory. diff --git a/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc index b5f0ed0aef6..cd4747b268e 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc @@ -1,10 +1,10 @@ -[[elastic-agent-configuration-example]] +[[elastic-agent-policy-example]] [role="xpack"] -= Configuration example += Policy example beta[] -The following example shows a full list of configuration options: +The following example shows a full list of policy options: [source,yaml] ---- diff --git a/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc index d72c572370c..98ba4a9b424 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc @@ -18,7 +18,7 @@ and send the logs and metrics to the same {es} instance. To alter this behavior, configure the output and other configuration settings. When running the agent standalone, specify configuration settings in the `elastic-agent.yml` file. When using {fleet}, do not modify settings in -the `elastic-agent.yml` file. Instead, use {ingest-manager} in {kib} to change +the `elastic-agent.yml` file. Instead, use {fleet} in {kib} to change settings. TIP: To get started quickly, you can use {fleet} to generate a standalone diff --git a/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc b/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc index 7c48084b8fb..34bb2481f7f 100644 --- a/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc +++ b/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc @@ -12,8 +12,8 @@ configure and manage the agent. == Run in {fleet} mode With _fleet mode_, you manage {agent} remotely. The agent uses a trusted {kib} -instance to retrieve configurations and report agent events. This trusted {kib} -instance must have {ingest-manager} and {fleet} enabled. +instance to retrieve policies and report agent events. This trusted {kib} +instance must have {fleet} enabled. To create a trusted communication channel between {agent} and {kib}, enroll the agent to {fleet}. @@ -22,14 +22,14 @@ To enroll an {agent} to {fleet}: . Stop {agent}, if it's already running. -. In {ingest-manager}, click **Settings** and change the defaults, if necessary. +. In {fleet}, click **Settings** and change the defaults, if necessary. For self-managed installations, set the URLs for {es} and {kib}, including the http ports, then save your changes. + [role="screenshot"] -image::images/kibana-ingest-manager-settings.png[{ingest-manager} settings] +//image::images/kibana-fleet-settings.png[{fleet} settings] -. Select **{fleet}**, then click **Add agent** to get an enrollment token. See +. Select **Agents**, then click **Add agent** to get an enrollment token. See <> for detailed steps. . Change to the directory where {agent} is installed, and enroll the agent to @@ -60,8 +60,8 @@ To start {agent} manually, run: include::{beats-repo-dir}/x-pack/elastic-agent/docs/tab-widgets/run-standalone-widget.asciidoc[] -Use the `-c` flag to specify the configuration file. If no configuration file is -specified, {agent} uses the default configuration, `elastic-agent.yml`, which is +Use the `-c` flag to specify the policy file. If no policy file is +specified, {agent} uses the default policy, `elastic-agent.yml`, which is located in the same directory as {agent}. For configuration options, see <>. diff --git a/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc b/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc index 19b4628fde9..fc211baabac 100644 --- a/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc +++ b/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc @@ -44,7 +44,7 @@ curl -L -O https://raw.githubusercontent.com/elastic/beats/{branch}/deploy/kuber By default, {agent} is enrolled to an existing Kibana deployment, if present using the specified credentials. FLEET_ENROLLMENT_TOKEN parameter is used to connect Agent to the -corresponding Ingest Management configuration. It is suggested to connect Daemonset Agents to a node scope configuration +corresponding {agent} policy. It is suggested to connect Daemonset Agents to a node scope configuration and Deployment Agent to a cluster scope configuration. Then Kubernetes package will be deployed enabling cluster scope datasets using cluster scope configuration while node scope datasets will be enabled under node scope configuration. diff --git a/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc b/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc index cd77fc3dde3..78c7fab9cf9 100644 --- a/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc +++ b/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc @@ -4,7 +4,7 @@ You can unenroll an agent to invalidate the API key used to connect to {es}. -. In {ingest-manager}, select **{fleet}**. +. In {fleet}, select **Agents**. . Under Agents, choose **Unenroll** from the **Actions** menu next to the agent you want to unenroll. From 9dc2f8c3e873a62b4d0aaac5abc63633c61fa56a Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Mon, 19 Oct 2020 11:17:33 +0300 Subject: [PATCH 10/13] Kubernetes leaderelection improvements (#21896) --- libbeat/autodiscover/providers/kubernetes/kubernetes.go | 8 ++++++-- libbeat/common/kubernetes/util.go | 6 +++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 190c646ef0c..e0c5dd103c0 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -249,9 +249,13 @@ func NewLeaderElectionManager( } else { id = "beats-leader-" + uuid.String() } + ns, err := kubernetes.InClusterNamespace() + if err != nil { + ns = "default" + } lease := metav1.ObjectMeta{ Name: cfg.LeaderLease, - Namespace: "default", + Namespace: ns, } metaUID := lease.GetObjectMeta().GetUID() lem.leaderElection = leaderelection.LeaderElectionConfig{ @@ -262,7 +266,7 @@ func NewLeaderElectionManager( Identity: id, }, }, - ReleaseOnCancel: true, + ReleaseOnCancel: false, LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index ff60a7fa591..a92c81e6d21 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -101,7 +101,7 @@ func DiscoverKubernetesNode(log *logp.Logger, host string, inCluster bool, clien } ctx := context.TODO() if inCluster { - ns, err := inClusterNamespace() + ns, err := InClusterNamespace() if err != nil { log.Errorf("kubernetes: Couldn't get namespace when beat is in cluster with error: %+v", err.Error()) return defaultNode @@ -158,9 +158,9 @@ func machineID() string { return "" } -// inClusterNamespace gets namespace from serviceaccount when beat is in cluster. +// InClusterNamespace gets namespace from serviceaccount when beat is in cluster. // code borrowed from client-go with some changes. -func inClusterNamespace() (string, error) { +func InClusterNamespace() (string, error) { // get namespace associated with the service account token, if available data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") if err != nil { From 12e77167fceb6015595a885a8bf8280cba51248d Mon Sep 17 00:00:00 2001 From: Chris Mark Date: Mon, 19 Oct 2020 12:31:23 +0300 Subject: [PATCH 11/13] Update docs.asciidoc (#21849) --- x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc b/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc index 4f7aa03a9ef..87a15d72a94 100644 --- a/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc @@ -21,7 +21,7 @@ them. Here is an example configuration that can be used for that purpose: metricbeat.autodiscover: providers: - type: kubernetes - include_annotations: ["prometheus.io.scrape"] + node: ${NODE_NAME} templates: - condition: contains: From 78856ca0404d7abb4d703a200ecefbbb2d436640 Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Mon, 19 Oct 2020 10:37:45 +0100 Subject: [PATCH 12/13] [CI] Use google storage to keep artifacts (#21910) --- Jenkinsfile | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 52c579ab7f5..4099e820f97 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -384,7 +384,7 @@ def archiveTestOutput(Map args = [:]) { script: 'rm -rf ve || true; find . -type d -name vendor -exec rm -r {} \\;') } else { log(level: 'INFO', text: 'Delete folders that are causing exceptions (See JENKINS-58421) is disabled for Windows.') } junitAndStore(allowEmptyResults: true, keepLongStdio: true, testResults: args.testResults, stashedTestReports: stashedTestReports, id: args.id) - tar(file: "test-build-artifacts-${args.id}.tgz", dir: '.', archive: true, allowMissing: true) + tarAndUploadArtifacts(file: "test-build-artifacts-${args.id}.tgz", location: '.') } catchError(buildResult: 'SUCCESS', message: 'Failed to archive the build test results', stageResult: 'SUCCESS') { def folder = cmd(label: 'Find system-tests', returnStdout: true, script: 'python .ci/scripts/search_system_tests.py').trim() @@ -393,12 +393,25 @@ def archiveTestOutput(Map args = [:]) { // TODO: nodeOS() should support ARM def os_suffix = isArm() ? 'linux' : nodeOS() def name = folder.replaceAll('/', '-').replaceAll('\\\\', '-').replaceAll('build', '').replaceAll('^-', '') + '-' + os_suffix - tar(file: "${name}.tgz", archive: true, dir: folder) + tarAndUploadArtifacts(file: "${name}.tgz", location: folder) } } } } +/** +* Wrapper to tar and upload artifacts to Google Storage to avoid killing the +* disk space of the jenkins instance +*/ +def tarAndUploadArtifacts(Map args = [:]) { + tar(file: args.file, dir: args.location, archive: false, allowMissing: true) + googleStorageUpload(bucket: "gs://${JOB_GCS_BUCKET}/${env.JOB_NAME}-${env.BUILD_ID}", + credentialsId: "${JOB_GCS_CREDENTIALS}", + pattern: "${args.file}", + sharedPublicly: true, + showInline: true) +} + /** * This method executes a closure with credentials for cloud test * environments. From ee7d3298eaa6cab43ab708bce88e86af8bfb67d0 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Mon, 19 Oct 2020 14:12:50 +0200 Subject: [PATCH 13/13] [Ingest Manager] Prevent reporting ecs version twice (#21616) [Ingest Manager] Prevent reporting ecs version twice (#21616) --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index d01c8a1c7bf..64d1a3b589b 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -15,6 +15,7 @@ - Copy Action store on upgrade {pull}21298[21298] - Include inputs in action store actions {pull}21298[21298] - Fix issue where inputs without processors defined would panic {pull}21628[21628] +- Prevent reporting ecs version twice {pull}21616[21616] - Partial extracted beat result in failure to spawn beat {issue}21718[21718] - Use local temp instead of system one {pull}21883[21883] diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index 53b9f377fcd..2b4617bc2bd 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -113,7 +113,6 @@ func (b *Monitor) EnrichArgs(process, pipelineID string, args []string, isSideca logFile = fmt.Sprintf("%s-json.log", logFile) appendix = append(appendix, "-E", "logging.json=true", - "-E", "logging.ecs=true", "-E", "logging.files.path="+loggingPath, "-E", "logging.files.name="+logFile, "-E", "logging.files.keepfiles=7",