diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index c4d0f48005f..1dfbb2fb889 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -457,6 +457,38 @@ https://github.com/elastic/beats/compare/v7.7.0...v7.8.0[View commits] - Add support for event IDs 4673,4674,4697,4698,4699,4700,4701,4702,4768,4769,4770,4771,4776,4778,4779,4964 to the Security module. {pull}17517[17517] - Add registry and code signature information and ECS categorization fields for sysmon module. {pull}18058[18058] +[[release-notes-7.7.1]] +=== Beats version 7.7.1 +https://github.com/elastic/beats/compare/v7.7.0...v7.7.1[View commits] + +==== Bugfixes + +*Affecting all Beats* + +- Fix `keystore add` command hanging on Windows. {issue}18649[18649] {pull}18654[18654] + +*Filebeat* + +- Unescape filenames in SQS messages to resolve file paths correctly. {pull}18370[18370] +- Improve failure handler for Cisco ASA and FTD pipelines to avoid mapping temporary fields. {issue}18391[18391] {pull}18392[18392] +- Fix `source.address` field not being set for the Nginx `ingress_controller` fileset. {pull}18511[18511] +- Fix Google Cloud `audit` fileset to only take in fields that are explicitly defined by the fileset. {issue}18465[18465] {pull}18472[18472] +- Fix rate limit related issue in the `httpjson` input for the Okta module. {issue}18530[18530] {pull}18534[18534] +- Fix Cisco ASA and FTD parsing errors caused by NAT fields that contain a hostname instead of an IP. {issue}14034[14034] {pull}18376[18376] +- Fix PANW module to use correct mappings for bytes and packets counters. {issue}18522[18522] {pull}18525[18525] +- Fix Office 365 ingest failures caused by IP addresses surrounded by square brackets. {issue}18587[18587] {pull}18591[18591] + +*Metricbeat* + +- Fix `tags_filter` setting to work correctly for the AWS `cloudwatch` metricset. {pull}18524[18524] + +==== Added + +*Filebeat* + +- Add support for Google Application Default Credentials to the Google Pub/Sub input and Google Cloud modules. {pull}15668[15668] +- Make `decode_cef` processor GA. {pull}17944[17944] + [[release-notes-7.7.0]] === Beats version 7.7.0 https://github.com/elastic/beats/compare/v7.6.2...v7.7.0[View commits] @@ -729,6 +761,12 @@ https://github.com/elastic/beats/compare/v7.6.0...v7.6.1[View commits] - Fix timeout option of GCP functions. {issue}16282[16282] {pull}16287[16287] +==== Added + +*Winlogbeat* + +- Made the event parser more lenient w.r.t. invalid event log definition version numbers. {issue}15838[15838] + [[release-notes-7.6.0]] === Beats version 7.6.0 https://github.com/elastic/beats/compare/v7.5.1...v7.6.0[View commits] @@ -1101,7 +1139,6 @@ processing events. (CVE-2019-17596) See https://www.elastic.co/community/securit - Fill `event.provider`. {pull}13937[13937] - Add support for user management events to the Security module. {pull}13530[13530] -- Made the event parser more lenient w.r.t. invalid event log definition version numbers. {issue}15838[15838] ==== Deprecated diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index e2b5844c192..51255305f42 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -186,6 +186,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197] - The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21259[21258] - Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349] +- Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851] *Auditbeat* diff --git a/Jenkinsfile b/Jenkinsfile index 6eef1b2d0a8..4099e820f97 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -74,7 +74,7 @@ pipeline { } steps { withGithubNotify(context: 'Lint') { - withBeatsEnv(archive: true, id: 'lint') { + withBeatsEnv(archive: false, id: 'lint') { dumpVariables() cmd(label: 'make check', script: 'make check') } @@ -220,7 +220,7 @@ def target(Map args = [:]) { // make commands use -C while mage commands require the dir(folder) // let's support this scenario with the location variable. dir(isMage ? directory : '') { - cmd(label: "${command}", script: "${command}") + cmd(label: "${args.id?.trim() ? args.id : env.STAGE_NAME} - ${command}", script: "${command}") } } } @@ -384,7 +384,7 @@ def archiveTestOutput(Map args = [:]) { script: 'rm -rf ve || true; find . -type d -name vendor -exec rm -r {} \\;') } else { log(level: 'INFO', text: 'Delete folders that are causing exceptions (See JENKINS-58421) is disabled for Windows.') } junitAndStore(allowEmptyResults: true, keepLongStdio: true, testResults: args.testResults, stashedTestReports: stashedTestReports, id: args.id) - tar(file: "test-build-artifacts-${args.id}.tgz", dir: '.', archive: true, allowMissing: true) + tarAndUploadArtifacts(file: "test-build-artifacts-${args.id}.tgz", location: '.') } catchError(buildResult: 'SUCCESS', message: 'Failed to archive the build test results', stageResult: 'SUCCESS') { def folder = cmd(label: 'Find system-tests', returnStdout: true, script: 'python .ci/scripts/search_system_tests.py').trim() @@ -393,12 +393,25 @@ def archiveTestOutput(Map args = [:]) { // TODO: nodeOS() should support ARM def os_suffix = isArm() ? 'linux' : nodeOS() def name = folder.replaceAll('/', '-').replaceAll('\\\\', '-').replaceAll('build', '').replaceAll('^-', '') + '-' + os_suffix - tar(file: "${name}.tgz", archive: true, dir: folder) + tarAndUploadArtifacts(file: "${name}.tgz", location: folder) } } } } +/** +* Wrapper to tar and upload artifacts to Google Storage to avoid killing the +* disk space of the jenkins instance +*/ +def tarAndUploadArtifacts(Map args = [:]) { + tar(file: args.file, dir: args.location, archive: false, allowMissing: true) + googleStorageUpload(bucket: "gs://${JOB_GCS_BUCKET}/${env.JOB_NAME}-${env.BUILD_ID}", + credentialsId: "${JOB_GCS_CREDENTIALS}", + pattern: "${args.file}", + sharedPublicly: true, + showInline: true) +} + /** * This method executes a closure with credentials for cloud test * environments. diff --git a/filebeat/input/filestream/filestream.go b/filebeat/input/filestream/filestream.go index 4d42bbf6242..1a559c67e06 100644 --- a/filebeat/input/filestream/filestream.go +++ b/filebeat/input/filestream/filestream.go @@ -138,20 +138,16 @@ func (f *logFile) Read(buf []byte) (int, error) { } func (f *logFile) startFileMonitoringIfNeeded() { - if f.closeInactive == 0 && f.closeAfterInterval == 0 { - return - } - - if f.closeInactive > 0 { + if f.closeInactive > 0 || f.closeRemoved || f.closeRenamed { f.tg.Go(func(ctx unison.Canceler) error { - f.closeIfTimeout(ctx) + f.periodicStateCheck(ctx) return nil }) } if f.closeAfterInterval > 0 { f.tg.Go(func(ctx unison.Canceler) error { - f.periodicStateCheck(ctx) + f.closeIfTimeout(ctx) return nil }) } diff --git a/filebeat/input/filestream/filestream_test.go b/filebeat/input/filestream/filestream_test.go new file mode 100644 index 00000000000..329fa0ad55f --- /dev/null +++ b/filebeat/input/filestream/filestream_test.go @@ -0,0 +1,136 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package filestream + +import ( + "context" + "io" + "io/ioutil" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +func TestLogFileTimedClosing(t *testing.T) { + testCases := map[string]struct { + inactive time.Duration + closeEOF bool + afterInterval time.Duration + expectedErr error + }{ + "read from file and close inactive": { + inactive: 2 * time.Second, + expectedErr: ErrClosed, + }, + "read from file and close after interval": { + afterInterval: 3 * time.Second, + expectedErr: ErrClosed, + }, + "read from file and close on EOF": { + closeEOF: true, + expectedErr: io.EOF, + }, + } + + for name, test := range testCases { + test := test + + f := createTestLogFile() + defer f.Close() + defer os.Remove(f.Name()) + + t.Run(name, func(t *testing.T) { + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Inactive: test.inactive, + }, + Reader: readerCloserConfig{ + OnEOF: test.closeEOF, + AfterInterval: test.afterInterval, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, test.expectedErr, err) + }) + } +} + +func TestLogFileTruncated(t *testing.T) { + f := createTestLogFile() + defer f.Close() + defer os.Remove(f.Name()) + + reader, err := newFileReader(logp.L(), context.TODO(), f, readerConfig{}, closerConfig{}) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = f.Truncate(0) + if err != nil { + t.Fatalf("error while truncating file: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, ErrFileTruncate, err) +} + +func createTestLogFile() *os.File { + f, err := ioutil.TempFile("", "filestream_reader_test") + if err != nil { + panic(err) + } + content := []byte("first log line\nanother interesting line\na third log message\n") + if _, err := f.Write(content); err != nil { + panic(err) + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + panic(err) + } + return f +} + +func readUntilError(reader *logFile) error { + buf := make([]byte, 1024) + _, err := reader.Read(buf) + for err == nil { + buf := make([]byte, 1024) + _, err = reader.Read(buf) + } + return err +} diff --git a/filebeat/input/filestream/filestream_test_non_windows.go b/filebeat/input/filestream/filestream_test_non_windows.go new file mode 100644 index 00000000000..9c2b33ed3de --- /dev/null +++ b/filebeat/input/filestream/filestream_test_non_windows.go @@ -0,0 +1,104 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package filestream + +import ( + "context" + "os" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/logp" +) + +// these tests are separated as one cannot delete/rename files +// while another process is working with it on Windows +func TestLogFileRenamed(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + renamedFile := f.Name() + ".renamed" + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Renamed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = os.Rename(f.Name(), renamedFile) + if err != nil { + t.Fatalf("error while renaming file: %+v", err) + } + + err = readUntilError(reader) + os.Remove(renamedFile) + + assert.Equal(t, ErrClosed, err) +} + +func TestLogFileRemoved(t *testing.T) { + f := createTestLogFile() + defer f.Close() + + reader, err := newFileReader( + logp.L(), + context.TODO(), + f, + readerConfig{}, + closerConfig{ + OnStateChange: stateChangeCloserConfig{ + CheckInterval: 1 * time.Second, + Removed: true, + }, + }, + ) + if err != nil { + t.Fatalf("error while creating logReader: %+v", err) + } + + buf := make([]byte, 1024) + _, err = reader.Read(buf) + assert.Nil(t, err) + + err = os.Remove(f.Name()) + if err != nil { + t.Fatalf("error while remove file: %+v", err) + } + + err = readUntilError(reader) + + assert.Equal(t, ErrClosed, err) +} diff --git a/libbeat/autodiscover/providers/kubernetes/kubernetes.go b/libbeat/autodiscover/providers/kubernetes/kubernetes.go index 190c646ef0c..e0c5dd103c0 100644 --- a/libbeat/autodiscover/providers/kubernetes/kubernetes.go +++ b/libbeat/autodiscover/providers/kubernetes/kubernetes.go @@ -249,9 +249,13 @@ func NewLeaderElectionManager( } else { id = "beats-leader-" + uuid.String() } + ns, err := kubernetes.InClusterNamespace() + if err != nil { + ns = "default" + } lease := metav1.ObjectMeta{ Name: cfg.LeaderLease, - Namespace: "default", + Namespace: ns, } metaUID := lease.GetObjectMeta().GetUID() lem.leaderElection = leaderelection.LeaderElectionConfig{ @@ -262,7 +266,7 @@ func NewLeaderElectionManager( Identity: id, }, }, - ReleaseOnCancel: true, + ReleaseOnCancel: false, LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, diff --git a/libbeat/common/docker/watcher.go b/libbeat/common/docker/watcher.go index 2421c232eee..4145423209a 100644 --- a/libbeat/common/docker/watcher.go +++ b/libbeat/common/docker/watcher.go @@ -20,7 +20,8 @@ package docker import ( - "fmt" + "context" + "io" "net/http" "sync" "time" @@ -29,7 +30,6 @@ import ( "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" "github.com/docker/go-connections/tlsconfig" - "golang.org/x/net/context" "github.com/elastic/beats/v7/libbeat/common/bus" "github.com/elastic/beats/v7/libbeat/logp" @@ -39,7 +39,6 @@ import ( const ( shortIDLen = 12 dockerRequestTimeout = 10 * time.Second - dockerWatchRequestTimeout = 60 * time.Minute dockerEventsWatchPityTimerInterval = 10 * time.Second dockerEventsWatchPityTimerTimeout = 10 * time.Minute ) @@ -74,20 +73,30 @@ type TLSConfig struct { type watcher struct { sync.RWMutex - log *logp.Logger - client Client - ctx context.Context - stop context.CancelFunc - containers map[string]*Container - deleted map[string]time.Time // deleted annotations key -> last access time - cleanupTimeout time.Duration - lastValidTimestamp int64 - lastWatchReceivedEventTime time.Time - stopped sync.WaitGroup - bus bus.Bus - shortID bool // whether to store short ID in "containers" too + log *logp.Logger + client Client + ctx context.Context + stop context.CancelFunc + containers map[string]*Container + deleted map[string]time.Time // deleted annotations key -> last access time + cleanupTimeout time.Duration + clock clock + stopped sync.WaitGroup + bus bus.Bus + shortID bool // whether to store short ID in "containers" too } +// clock is an interface used to provide mocked time on testing +type clock interface { + Now() time.Time +} + +// systemClock implements the clock interface using the system clock via the time package +type systemClock struct{} + +// Now returns the current time +func (*systemClock) Now() time.Time { return time.Now() } + // Container info retrieved by the watcher type Container struct { ID string @@ -147,8 +156,6 @@ func NewWatcher(log *logp.Logger, host string, tls *TLSConfig, storeShortID bool // NewWatcherWithClient creates a new Watcher from a given Docker client func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.Duration, storeShortID bool) (Watcher, error) { - log = log.Named("docker") - ctx, cancel := context.WithCancel(context.Background()) return &watcher{ log: log, @@ -160,6 +167,7 @@ func NewWatcherWithClient(log *logp.Logger, client Client, cleanupTimeout time.D cleanupTimeout: cleanupTimeout, bus: bus.New(log, "docker"), shortID: storeShortID, + clock: &systemClock{}, }, nil } @@ -177,7 +185,7 @@ func (w *watcher) Container(ID string) *Container { // Update last access time if it's deleted if ok { w.Lock() - w.deleted[container.ID] = time.Now() + w.deleted[container.ID] = w.clock.Now() w.Unlock() } @@ -201,7 +209,6 @@ func (w *watcher) Containers() map[string]*Container { func (w *watcher) Start() error { // Do initial scan of existing containers w.log.Debug("Start docker containers scanner") - w.lastValidTimestamp = time.Now().Unix() w.Lock() defer w.Unlock() @@ -236,108 +243,124 @@ func (w *watcher) Start() error { func (w *watcher) Stop() { w.stop() + w.stopped.Wait() } func (w *watcher) watch() { - log := w.log + defer w.stopped.Done() filter := filters.NewArgs() filter.Add("type", "container") - for { + // Ticker to restart the watcher when no events are received after some time. + tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval) + defer tickChan.Stop() + + lastValidTimestamp := w.clock.Now() + + watch := func() bool { + lastReceivedEventTime := w.clock.Now() + + w.log.Debugf("Fetching events since %s", lastValidTimestamp) + options := types.EventsOptions{ - Since: fmt.Sprintf("%d", w.lastValidTimestamp), + Since: lastValidTimestamp.Format(time.RFC3339Nano), Filters: filter, } - log.Debugf("Fetching events since %s", options.Since) - ctx, cancel := context.WithTimeout(w.ctx, dockerWatchRequestTimeout) + ctx, cancel := context.WithCancel(w.ctx) defer cancel() events, errors := w.client.Events(ctx, options) - - //ticker for timeout to restart watcher when no events are received - w.lastWatchReceivedEventTime = time.Now() - tickChan := time.NewTicker(dockerEventsWatchPityTimerInterval) - defer tickChan.Stop() - - WATCH: for { select { case event := <-events: - log.Debugf("Got a new docker event: %v", event) - w.lastValidTimestamp = event.Time - w.lastWatchReceivedEventTime = time.Now() - - // Add / update - if event.Action == "start" || event.Action == "update" { - filter := filters.NewArgs() - filter.Add("id", event.Actor.ID) - - containers, err := w.listContainers(types.ContainerListOptions{ - Filters: filter, - }) - if err != nil || len(containers) != 1 { - log.Errorf("Error getting container info: %v", err) - continue - } - container := containers[0] - - w.Lock() - w.containers[event.Actor.ID] = container - if w.shortID { - w.containers[event.Actor.ID[:shortIDLen]] = container - } - // un-delete if it's flagged (in case of update or recreation) - delete(w.deleted, event.Actor.ID) - w.Unlock() - - w.bus.Publish(bus.Event{ - "start": true, - "container": container, - }) - } - - // Delete - if event.Action == "die" { - container := w.Container(event.Actor.ID) - if container != nil { - w.bus.Publish(bus.Event{ - "stop": true, - "container": container, - }) - } - - w.Lock() - w.deleted[event.Actor.ID] = time.Now() - w.Unlock() + w.log.Debugf("Got a new docker event: %v", event) + lastValidTimestamp = time.Unix(event.Time, event.TimeNano) + lastReceivedEventTime = w.clock.Now() + + switch event.Action { + case "start", "update": + w.containerUpdate(event) + case "die": + w.containerDelete(event) } - case err := <-errors: - // Restart watch call - if err == context.DeadlineExceeded { - log.Info("Context deadline exceeded for docker request, restarting watch call") - } else { - log.Errorf("Error watching for docker events: %+v", err) + switch err { + case io.EOF: + // Client disconnected, watch is not done, reconnect + w.log.Debug("EOF received in events stream, restarting watch call") + case context.DeadlineExceeded: + w.log.Debug("Context deadline exceeded for docker request, restarting watch call") + case context.Canceled: + // Parent context has been canceled, watch is done. + return true + default: + w.log.Errorf("Error watching for docker events: %+v", err) } - - time.Sleep(1 * time.Second) - break WATCH - + return false case <-tickChan.C: - if time.Since(w.lastWatchReceivedEventTime) > dockerEventsWatchPityTimerTimeout { - log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) - time.Sleep(1 * time.Second) - break WATCH + if time.Since(lastReceivedEventTime) > dockerEventsWatchPityTimerTimeout { + w.log.Infof("No events received within %s, restarting watch call", dockerEventsWatchPityTimerTimeout) + return false } - case <-w.ctx.Done(): - log.Debug("Watcher stopped") - w.stopped.Done() - return + w.log.Debug("Watcher stopped") + return true } } + } + for { + done := watch() + if done { + return + } + // Wait before trying to reconnect + time.Sleep(1 * time.Second) + } +} + +func (w *watcher) containerUpdate(event events.Message) { + filter := filters.NewArgs() + filter.Add("id", event.Actor.ID) + + containers, err := w.listContainers(types.ContainerListOptions{ + Filters: filter, + }) + if err != nil || len(containers) != 1 { + w.log.Errorf("Error getting container info: %v", err) + return + } + container := containers[0] + + w.Lock() + w.containers[event.Actor.ID] = container + if w.shortID { + w.containers[event.Actor.ID[:shortIDLen]] = container + } + // un-delete if it's flagged (in case of update or recreation) + delete(w.deleted, event.Actor.ID) + w.Unlock() + + w.bus.Publish(bus.Event{ + "start": true, + "container": container, + }) +} + +func (w *watcher) containerDelete(event events.Message) { + container := w.Container(event.Actor.ID) + + w.Lock() + w.deleted[event.Actor.ID] = w.clock.Now() + w.Unlock() + + if container != nil { + w.bus.Publish(bus.Event{ + "stop": true, + "container": container, + }) } } @@ -393,49 +416,52 @@ func (w *watcher) listContainers(options types.ContainerListOptions) ([]*Contain // Clean up deleted containers after they are not used anymore func (w *watcher) cleanupWorker() { - log := w.log + defer w.stopped.Done() for { select { case <-w.ctx.Done(): - w.stopped.Done() return // Wait a full period case <-time.After(w.cleanupTimeout): - // Check entries for timeout - var toDelete []string - timeout := time.Now().Add(-w.cleanupTimeout) - w.RLock() - for key, lastSeen := range w.deleted { - if lastSeen.Before(timeout) { - log.Debugf("Removing container %s after cool down timeout", key) - toDelete = append(toDelete, key) - } - } - w.RUnlock() - - // Delete timed out entries: - for _, key := range toDelete { - container := w.Container(key) - if container != nil { - w.bus.Publish(bus.Event{ - "delete": true, - "container": container, - }) - } - } + w.runCleanup() + } + } +} - w.Lock() - for _, key := range toDelete { - delete(w.deleted, key) - delete(w.containers, key) - if w.shortID { - delete(w.containers, key[:shortIDLen]) - } - } - w.Unlock() +func (w *watcher) runCleanup() { + // Check entries for timeout + var toDelete []string + timeout := w.clock.Now().Add(-w.cleanupTimeout) + w.RLock() + for key, lastSeen := range w.deleted { + if lastSeen.Before(timeout) { + w.log.Debugf("Removing container %s after cool down timeout", key) + toDelete = append(toDelete, key) + } + } + w.RUnlock() + + // Delete timed out entries: + for _, key := range toDelete { + container := w.Container(key) + if container != nil { + w.bus.Publish(bus.Event{ + "delete": true, + "container": container, + }) + } + } + + w.Lock() + for _, key := range toDelete { + delete(w.deleted, key) + delete(w.containers, key) + if w.shortID { + delete(w.containers, key[:shortIDLen]) } } + w.Unlock() } // ListenStart returns a bus listener to receive container started events, with a `container` key holding it diff --git a/libbeat/common/docker/watcher_test.go b/libbeat/common/docker/watcher_test.go index ec53fbdeb73..a0de0567af4 100644 --- a/libbeat/common/docker/watcher_test.go +++ b/libbeat/common/docker/watcher_test.go @@ -21,6 +21,7 @@ package docker import ( "errors" + "sync" "testing" "time" @@ -37,7 +38,7 @@ type MockClient struct { containers [][]types.Container // event list to send on Events call events []interface{} - + // done channel is closed when the client has sent all events done chan interface{} } @@ -71,7 +72,7 @@ func (m *MockClient) ContainerInspect(ctx context.Context, container string) (ty } func TestWatcherInitialization(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -90,7 +91,8 @@ func TestWatcherInitialization(t *testing.T) { }, }, }, - nil) + nil, + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -109,7 +111,7 @@ func TestWatcherInitialization(t *testing.T) { } func TestWatcherInitializationShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -128,7 +130,9 @@ func TestWatcherInitializationShortID(t *testing.T) { }, }, }, - nil, true) + nil, + true, + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -154,7 +158,7 @@ func TestWatcherInitializationShortID(t *testing.T) { } func TestWatcherAddEvents(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -188,7 +192,7 @@ func TestWatcherAddEvents(t *testing.T) { }, }, }, - ) + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -207,7 +211,7 @@ func TestWatcherAddEvents(t *testing.T) { } func TestWatcherAddEventsShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -242,7 +246,7 @@ func TestWatcherAddEventsShortID(t *testing.T) { }, }, true, - ) + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -261,7 +265,7 @@ func TestWatcherAddEventsShortID(t *testing.T) { } func TestWatcherUpdateEvent(t *testing.T) { - watcher := runWatcher(t, true, + watcher := runAndWait(testWatcher(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -295,7 +299,7 @@ func TestWatcherUpdateEvent(t *testing.T) { }, }, }, - ) + )) assert.Equal(t, map[string]*Container{ "0332dbd79e20": &Container{ @@ -309,7 +313,7 @@ func TestWatcherUpdateEvent(t *testing.T) { } func TestWatcherUpdateEventShortID(t *testing.T) { - watcher := runWatcherShortID(t, true, + watcher := runAndWait(testWatcherShortID(t, true, [][]types.Container{ []types.Container{ types.Container{ @@ -344,7 +348,7 @@ func TestWatcherUpdateEventShortID(t *testing.T) { }, }, true, - ) + )) assert.Equal(t, map[string]*Container{ "1234567890123": &Container{ @@ -358,9 +362,7 @@ func TestWatcherUpdateEventShortID(t *testing.T) { } func TestWatcherDie(t *testing.T) { - t.Skip("flaky test: https://github.com/elastic/beats/issues/7906") - - watcher := runWatcher(t, false, + watcher, clientDone := testWatcher(t, false, [][]types.Container{ []types.Container{ types.Container{ @@ -381,32 +383,37 @@ func TestWatcherDie(t *testing.T) { }, }, ) + + clock := newTestClock() + watcher.clock = clock + + stopListener := watcher.ListenStop() + + watcher.Start() defer watcher.Stop() // Check it doesn't get removed while we request meta for the container for i := 0; i < 18; i++ { watcher.Container("0332dbd79e20") - assert.Equal(t, 1, len(watcher.Containers())) - time.Sleep(50 * time.Millisecond) - } - - // Checks a max of 10s for the watcher containers to be updated - for i := 0; i < 100; i++ { - // Now it should get removed - time.Sleep(100 * time.Millisecond) - - if len(watcher.Containers()) == 0 { + clock.Sleep(watcher.cleanupTimeout / 2) + watcher.runCleanup() + if !assert.Equal(t, 1, len(watcher.Containers())) { break } } + // Wait to be sure that the delete event has been processed + <-clientDone + <-stopListener.Events() + + // Check that after the cleanup period the container is removed + clock.Sleep(watcher.cleanupTimeout + 1*time.Second) + watcher.runCleanup() assert.Equal(t, 0, len(watcher.Containers())) } func TestWatcherDieShortID(t *testing.T) { - t.Skip("flaky test: https://github.com/elastic/beats/issues/7906") - - watcher := runWatcherShortID(t, false, + watcher, clientDone := testWatcherShortID(t, false, [][]types.Container{ []types.Container{ types.Container{ @@ -428,33 +435,40 @@ func TestWatcherDieShortID(t *testing.T) { }, true, ) + + clock := newTestClock() + watcher.clock = clock + + stopListener := watcher.ListenStop() + + watcher.Start() defer watcher.Stop() // Check it doesn't get removed while we request meta for the container for i := 0; i < 18; i++ { watcher.Container("0332dbd79e20") - assert.Equal(t, 1, len(watcher.Containers())) - time.Sleep(50 * time.Millisecond) - } - - // Checks a max of 10s for the watcher containers to be updated - for i := 0; i < 100; i++ { - // Now it should get removed - time.Sleep(100 * time.Millisecond) - - if len(watcher.Containers()) == 0 { + clock.Sleep(watcher.cleanupTimeout / 2) + watcher.runCleanup() + if !assert.Equal(t, 1, len(watcher.Containers())) { break } } + // Wait to be sure that the delete event has been processed + <-clientDone + <-stopListener.Events() + + // Check that after the cleanup period the container is removed + clock.Sleep(watcher.cleanupTimeout + 1*time.Second) + watcher.runCleanup() assert.Equal(t, 0, len(watcher.Containers())) } -func runWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) *watcher { - return runWatcherShortID(t, kill, containers, events, false) +func testWatcher(t *testing.T, kill bool, containers [][]types.Container, events []interface{}) (*watcher, chan interface{}) { + return testWatcherShortID(t, kill, containers, events, false) } -func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) *watcher { +func testWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, events []interface{}, enable bool) (*watcher, chan interface{}) { logp.TestingSetup() client := &MockClient{ @@ -472,16 +486,37 @@ func runWatcherShortID(t *testing.T, kill bool, containers [][]types.Container, t.Fatal("'watcher' was supposed to be pointer to the watcher structure") } - err = watcher.Start() - if err != nil { - t.Fatal(err) - } + return watcher, client.done +} - <-client.done - if kill { - watcher.Stop() - watcher.stopped.Wait() - } +func runAndWait(w *watcher, done chan interface{}) *watcher { + w.Start() + <-done + w.Stop() + return w +} + +type testClock struct { + sync.Mutex + + now time.Time +} + +func newTestClock() *testClock { + return &testClock{now: time.Time{}} +} + +func (c *testClock) Now() time.Time { + c.Lock() + defer c.Unlock() + + c.now = c.now.Add(1) + return c.now +} + +func (c *testClock) Sleep(d time.Duration) { + c.Lock() + defer c.Unlock() - return watcher + c.now = c.now.Add(d) } diff --git a/libbeat/common/kubernetes/util.go b/libbeat/common/kubernetes/util.go index ff60a7fa591..a92c81e6d21 100644 --- a/libbeat/common/kubernetes/util.go +++ b/libbeat/common/kubernetes/util.go @@ -101,7 +101,7 @@ func DiscoverKubernetesNode(log *logp.Logger, host string, inCluster bool, clien } ctx := context.TODO() if inCluster { - ns, err := inClusterNamespace() + ns, err := InClusterNamespace() if err != nil { log.Errorf("kubernetes: Couldn't get namespace when beat is in cluster with error: %+v", err.Error()) return defaultNode @@ -158,9 +158,9 @@ func machineID() string { return "" } -// inClusterNamespace gets namespace from serviceaccount when beat is in cluster. +// InClusterNamespace gets namespace from serviceaccount when beat is in cluster. // code borrowed from client-go with some changes. -func inClusterNamespace() (string, error) { +func InClusterNamespace() (string, error) { // get namespace associated with the service account token, if available data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") if err != nil { diff --git a/libbeat/docs/release.asciidoc b/libbeat/docs/release.asciidoc index 24e0ee43651..90dd214787a 100644 --- a/libbeat/docs/release.asciidoc +++ b/libbeat/docs/release.asciidoc @@ -13,6 +13,7 @@ upgrade. * <> * <> * <> +* <> * <> * <> * <> diff --git a/libbeat/docs/shared-autodiscover.asciidoc b/libbeat/docs/shared-autodiscover.asciidoc index c7993c29bef..df0ea4d2e02 100644 --- a/libbeat/docs/shared-autodiscover.asciidoc +++ b/libbeat/docs/shared-autodiscover.asciidoc @@ -24,7 +24,7 @@ start/stop events. This ensures you don't need to worry about state, but only de The Docker autodiscover provider watches for Docker containers to start and stop. -These are the available fields during within config templating. The `docker.*` fields will be available on each emitted event. +These are the fields available within config templating. The `docker.*` fields will be available on each emitted event. event: * host @@ -130,7 +130,7 @@ endif::[] The Kubernetes autodiscover provider watches for Kubernetes nodes, pods, services to start, update, and stop. -These are the available fields during within config templating. The `kubernetes.*` fields will be available on each emitted event. +These are the fields available within config templating. The `kubernetes.*` fields will be available on each emitted event. [float] ====== Generic fields: diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index 638d9da2f40..77f4aadb47f 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -169,8 +169,16 @@ func (dq *diskQueue) handleReaderLoopResponse(response readerLoopResponse) { // A segment in the writing list can't be finished writing, // so we don't check the endOffset. segment = dq.segments.writing[0] + if response.err != nil { + // Errors reading a writing segment are awkward since we can't discard + // them until the writer loop is done with them. Instead we just seek + // to the end of the current data region. If we're lucky this lets us + // skip the intervening errors; if not, the segment will be cleaned up + // after the writer loop is done with it. + dq.segments.nextReadOffset = segment.endOffset + } } - segment.framesRead = uint64(dq.segments.nextReadFrameID - segment.firstFrameID) + segment.framesRead += response.frameCount // If there was an error, report it. if response.err != nil { @@ -346,6 +354,16 @@ func (dq *diskQueue) maybeReadPending() { // A read request is already pending return } + // Check if the next reading segment has already been completely read. (This + // can happen if it was being written and read simultaneously.) In this case + // we should move it to the acking list and proceed to the next segment. + if len(dq.segments.reading) > 0 && + dq.segments.nextReadOffset >= dq.segments.reading[0].endOffset { + dq.segments.acking = append(dq.segments.acking, dq.segments.reading[0]) + dq.segments.reading = dq.segments.reading[1:] + dq.segments.nextReadOffset = 0 + } + // Get the next available segment from the reading or writing lists. segment := dq.segments.readingSegment() if segment == nil || dq.segments.nextReadOffset >= segmentOffset(segment.endOffset) { @@ -353,7 +371,12 @@ func (dq *diskQueue) maybeReadPending() { return } if dq.segments.nextReadOffset == 0 { - // If we're reading the beginning of this segment, assign its firstFrameID. + // If we're reading the beginning of this segment, assign its firstFrameID + // so we can recognize its acked frames later. + // The first segment we read might not have its initial nextReadOffset + // set to 0 if the segment was already partially read on a previous run. + // However that can only happen when nextReadFrameID == 0, so we don't + // need to do anything in that case. segment.firstFrameID = dq.segments.nextReadFrameID } request := readerLoopRequest{ diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go index b5f0d301d15..309a145968d 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop_test.go +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -17,7 +17,12 @@ package diskqueue -import "testing" +import ( + "fmt" + "testing" + + "github.com/elastic/beats/v7/libbeat/logp" +) func TestProducerWriteRequest(t *testing.T) { dq := &diskQueue{settings: DefaultSettings()} @@ -92,3 +97,366 @@ func TestHandleWriterLoopResponse(t *testing.T) { dq.segments.writing[0].endOffset) } } + +func TestHandleReaderLoopResponse(t *testing.T) { + // handleReaderLoopResponse should: + // - advance segments.{nextReadFrameID, nextReadOffset} by the values in + // response.{frameCount, byteCount} + // - advance the target segment's framesRead field by response.frameCount + // - if reading[0] encountered an error or was completely read, move it from + // the reading list to the acking list and reset nextReadOffset to zero + // - if writing[0] encountered an error, advance nextReadOffset to the + // segment's current endOffset (we can't discard the active writing + // segment like we do for errors in the reading list, but we can still + // mark the remaining data as processed) + + testCases := map[string]struct { + // The segment structure to start with before calling maybeReadPending + segments diskQueueSegments + response readerLoopResponse + + expectedFrameID frameID + expectedOffset segmentOffset + expectedACKingSegment *segmentID + }{ + "completely read first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 10, + byteCount: 1000, + }, + expectedFrameID: 15, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "read first half of first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 500, + }, + "read second half of first reading segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + nextReadOffset: 500, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "read of first reading segment aborted by error": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 1, + byteCount: 100, + err: fmt.Errorf("something bad happened"), + }, + expectedFrameID: 6, + expectedOffset: 0, + expectedACKingSegment: segmentIDRef(1), + }, + "completely read first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 10, + byteCount: 1000, + }, + expectedFrameID: 15, + expectedOffset: 1000, + }, + "read first half of first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 500, + }, + "read second half of first writing segment": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadOffset: 500, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 5, + byteCount: 500, + }, + expectedFrameID: 10, + expectedOffset: 1000, + }, + "error reading a writing segments skips remaining data": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadFrameID: 5, + }, + response: readerLoopResponse{ + frameCount: 1, + byteCount: 100, + err: fmt.Errorf("something bad happened"), + }, + expectedFrameID: 6, + expectedOffset: 1000, + }, + } + + for description, test := range testCases { + dq := &diskQueue{ + logger: logp.L(), + settings: DefaultSettings(), + segments: test.segments, + } + dq.handleReaderLoopResponse(test.response) + + if dq.segments.nextReadFrameID != test.expectedFrameID { + t.Errorf("%s: expected nextReadFrameID = %d, got %d", + description, test.expectedFrameID, dq.segments.nextReadFrameID) + } + if dq.segments.nextReadOffset != test.expectedOffset { + t.Errorf("%s: expected nextReadOffset = %d, got %d", + description, test.expectedOffset, dq.segments.nextReadOffset) + } + if test.expectedACKingSegment != nil { + if len(dq.segments.acking) == 0 { + t.Errorf("%s: expected acking segment %d, got none", + description, *test.expectedACKingSegment) + } else if dq.segments.acking[0].id != *test.expectedACKingSegment { + t.Errorf("%s: expected acking segment %d, got %d", + description, *test.expectedACKingSegment, dq.segments.acking[0].id) + } + } else if len(dq.segments.acking) != 0 { + t.Errorf("%s: expected no acking segment, got %v", + description, *dq.segments.acking[0]) + } + } +} + +func TestMaybeReadPending(t *testing.T) { + // maybeReadPending should: + // - If any unread data is available in a reading or writing segment, + // send a readerLoopRequest for the full amount available in the + // first such segment. + // - When creating a readerLoopRequest that includes the beginning of + // a segment (startOffset == 0), set that segment's firstFrameID + // to segments.nextReadFrameID (so ACKs based on frame ID can be linked + // back to the segment that generated them). + // - If the first reading segment has already been completely read (which + // can happen if it was read while still in the writing list), move it to + // the acking list and set segments.nextReadOffset to 0. + + testCases := map[string]struct { + // The segment structure to start with before calling maybeReadPending + segments diskQueueSegments + // The request we expect to see on the reader loop's request channel, + // or nil if there should be none. + expectedRequest *readerLoopRequest + // The segment ID we expect to see in the acking list, or nil for none. + expectedACKingSegment *segmentID + }{ + "read one full segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + // The next read request should start with frame 5 + nextReadFrameID: 5, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startFrameID: 5, + startOffset: 0, + endOffset: 1000, + }, + }, + "read the end of a segment": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + // The next read request should start with frame 5 + nextReadFrameID: 5, + // Start reading at position 500 + nextReadOffset: 500, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startFrameID: 5, + // Should be reading from nextReadOffset (500) to the end of + // the segment (1000). + startOffset: 500, + endOffset: 1000, + }, + }, + "ignore writing segments if reading is available": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 1}, + startOffset: 0, + endOffset: 1000, + }, + }, + "do nothing if no segments are available": { + segments: diskQueueSegments{}, + expectedRequest: nil, + }, + "read the writing segment if no reading segments are available": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + nextReadOffset: 500, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 2}, + startOffset: 500, + endOffset: 1000, + }, + }, + "do nothing if the writing segment has already been fully read": { + segments: diskQueueSegments{ + writing: []*queueSegment{ + {id: 2, endOffset: 1000}, + }, + nextReadOffset: 1000, + }, + expectedRequest: nil, + }, + "skip the first reading segment if it's already been fully read": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + {id: 2, endOffset: 500}, + }, + nextReadOffset: 1000, + }, + expectedRequest: &readerLoopRequest{ + segment: &queueSegment{id: 2}, + startOffset: 0, + endOffset: 500, + }, + expectedACKingSegment: segmentIDRef(1), + }, + "move empty reading segment to the acking list if it's the only one": { + segments: diskQueueSegments{ + reading: []*queueSegment{ + {id: 1, endOffset: 1000}, + }, + nextReadOffset: 1000, + }, + expectedRequest: nil, + expectedACKingSegment: segmentIDRef(1), + }, + } + + for description, test := range testCases { + dq := &diskQueue{ + settings: DefaultSettings(), + segments: test.segments, + readerLoop: &readerLoop{ + requestChan: make(chan readerLoopRequest, 1), + }, + } + firstFrameID := test.segments.nextReadFrameID + dq.maybeReadPending() + select { + case request := <-dq.readerLoop.requestChan: + if test.expectedRequest == nil { + t.Errorf("%s: expected no read request, got %v", + description, request) + break + } + if !equalReaderLoopRequests(request, *test.expectedRequest) { + t.Errorf("%s: expected request %v, got %v", + description, *test.expectedRequest, request) + } + if request.startOffset == 0 && + request.segment.firstFrameID != firstFrameID { + t.Errorf( + "%s: maybeReadPending should update firstFrameID", description) + } + default: + if test.expectedRequest != nil { + t.Errorf("%s: expected read request %v, got none", + description, test.expectedRequest) + } + } + if test.expectedACKingSegment != nil { + if len(dq.segments.acking) != 1 { + t.Errorf("%s: expected acking segment %v, got none", + description, *test.expectedACKingSegment) + } else if dq.segments.acking[0].id != *test.expectedACKingSegment { + t.Errorf("%s: expected acking segment %v, got %v", + description, *test.expectedACKingSegment, dq.segments.acking[0].id) + } + if dq.segments.nextReadOffset != 0 { + t.Errorf("%s: expected read offset 0 after acking segment, got %v", + description, dq.segments.nextReadOffset) + } + } else if len(dq.segments.acking) != 0 { + t.Errorf("%s: expected no acking segment, got %v", + description, *dq.segments.acking[0]) + } + } +} + +func segmentIDRef(id segmentID) *segmentID { + return &id +} + +func equalReaderLoopRequests( + r0 readerLoopRequest, r1 readerLoopRequest, +) bool { + // We compare segment ids rather than segment pointers because it's + // awkward to include the same pointer repeatedly in the test definition. + return r0.startOffset == r1.startOffset && + r0.endOffset == r1.endOffset && + r0.segment.id == r1.segment.id && + r0.startFrameID == r1.startFrameID +} diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index dc2bb95777f..5b30f03e81d 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -35,6 +35,8 @@ type readerLoopResponse struct { frameCount uint64 // The number of bytes successfully read from the requested segment file. + // If this is less than (endOffset - startOffset) from the original request, + // then err is guaranteed to be non-nil. byteCount uint64 // If there was an error in the segment file (i.e. inconsistent data), the @@ -100,7 +102,8 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon return readerLoopResponse{err: err} } defer handle.Close() - _, err = handle.Seek(segmentHeaderSize+int64(request.startOffset), 0) + _, err = handle.Seek( + segmentHeaderSize+int64(request.startOffset), os.SEEK_SET) if err != nil { return readerLoopResponse{err: err} } @@ -137,7 +140,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon } // We are done with this request if: - // - there was an error reading the frame, + // - there was an error reading the frame // - there are no more frames to read, or // - we have reached the end of the requested region if err != nil || frame == nil || byteCount >= targetLength { @@ -166,6 +169,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon // nextFrame reads and decodes one frame from the given file handle, as long // it does not exceed the given length bound. The returned frame leaves the // segment and frame IDs unset. +// The returned error will be set if and only if the returned frame is nil. func (rl *readerLoop) nextFrame( handle *os.File, maxLength uint64, ) (*readFrame, error) { diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index deae2522773..64d1a3b589b 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -15,7 +15,9 @@ - Copy Action store on upgrade {pull}21298[21298] - Include inputs in action store actions {pull}21298[21298] - Fix issue where inputs without processors defined would panic {pull}21628[21628] +- Prevent reporting ecs version twice {pull}21616[21616] - Partial extracted beat result in failure to spawn beat {issue}21718[21718] +- Use local temp instead of system one {pull}21883[21883] ==== New features diff --git a/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc index e102d5b4787..49ddbdce466 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-command-line.asciidoc @@ -46,10 +46,10 @@ elastic-agent enroll [--ca-sha256 ] === Options `kibana_url`:: -Required. URL of the {kib} endpoint where {ingest-manager} is running. +Required. URL of the {kib} endpoint where {fleet} is running. `enrollment_token`:: -Required. Enrollment token generated by {ingest-manager}. You can use the same +Required. Enrollment token generated by {fleet}. You can use the same enrollment token for multiple agents. `--ca-sha256 `:: @@ -60,7 +60,7 @@ verification. Comma-separated list of root certificates used for server verification. `--force`:: -Force overwrite of current configuration without prompting for confirmation. +Force overwrite of current policy without prompting for confirmation. This flag is helpful when using automation software or scripted deployments. `--help`:: @@ -125,9 +125,9 @@ elastic-agent help enroll [[elastic-agent-inspect-command]] == elastic-agent inspect -Show the current {agent} configuration. +Show the current {agent} policy. -If no parameters are specified, shows the full {agent} configuration. +If no parameters are specified, shows the full {agent} policy. [discrete] === Synopsis @@ -145,7 +145,7 @@ elastic-agent inspect output [--output ] [--program ] [discrete] === Options -`output`:: Display the current configuration for the output. This command +`output`:: Display the current policy for the output. This command accepts additional flags: + -- @@ -197,7 +197,7 @@ elastic-agent run [global-flags] These flags are valid whenever you run `elastic-agent` on the command line. `-c `:: -The configuration file to use. If not specified, {agent} uses +The policy file to use. If not specified, {agent} uses `{path.home}/elastic-agent.yml`. `--e`:: @@ -209,7 +209,7 @@ The environment in which the agent will run. //TODO: Clarify what we mean by environment by showing an example. `--path.config `:: -The directory where {agent} looks for its configuration file. The default +The directory where {agent} looks for its policy file. The default varies by platform. `--path.data `:: @@ -220,7 +220,7 @@ If not specified, {agent} uses `{path.home}/data`. `--path.home `:: The home directory of {agent}. `path.home` determines the location of the -configuration files and data directory. +policy files and data directory. + If not specified, {agent} uses the current working directory. diff --git a/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc index b5f0ed0aef6..cd4747b268e 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-configuration-example.asciidoc @@ -1,10 +1,10 @@ -[[elastic-agent-configuration-example]] +[[elastic-agent-policy-example]] [role="xpack"] -= Configuration example += Policy example beta[] -The following example shows a full list of configuration options: +The following example shows a full list of policy options: [source,yaml] ---- diff --git a/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc b/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc index d72c572370c..98ba4a9b424 100644 --- a/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc +++ b/x-pack/elastic-agent/docs/elastic-agent-configuration.asciidoc @@ -18,7 +18,7 @@ and send the logs and metrics to the same {es} instance. To alter this behavior, configure the output and other configuration settings. When running the agent standalone, specify configuration settings in the `elastic-agent.yml` file. When using {fleet}, do not modify settings in -the `elastic-agent.yml` file. Instead, use {ingest-manager} in {kib} to change +the `elastic-agent.yml` file. Instead, use {fleet} in {kib} to change settings. TIP: To get started quickly, you can use {fleet} to generate a standalone diff --git a/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc b/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc index 7c48084b8fb..34bb2481f7f 100644 --- a/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc +++ b/x-pack/elastic-agent/docs/run-elastic-agent.asciidoc @@ -12,8 +12,8 @@ configure and manage the agent. == Run in {fleet} mode With _fleet mode_, you manage {agent} remotely. The agent uses a trusted {kib} -instance to retrieve configurations and report agent events. This trusted {kib} -instance must have {ingest-manager} and {fleet} enabled. +instance to retrieve policies and report agent events. This trusted {kib} +instance must have {fleet} enabled. To create a trusted communication channel between {agent} and {kib}, enroll the agent to {fleet}. @@ -22,14 +22,14 @@ To enroll an {agent} to {fleet}: . Stop {agent}, if it's already running. -. In {ingest-manager}, click **Settings** and change the defaults, if necessary. +. In {fleet}, click **Settings** and change the defaults, if necessary. For self-managed installations, set the URLs for {es} and {kib}, including the http ports, then save your changes. + [role="screenshot"] -image::images/kibana-ingest-manager-settings.png[{ingest-manager} settings] +//image::images/kibana-fleet-settings.png[{fleet} settings] -. Select **{fleet}**, then click **Add agent** to get an enrollment token. See +. Select **Agents**, then click **Add agent** to get an enrollment token. See <> for detailed steps. . Change to the directory where {agent} is installed, and enroll the agent to @@ -60,8 +60,8 @@ To start {agent} manually, run: include::{beats-repo-dir}/x-pack/elastic-agent/docs/tab-widgets/run-standalone-widget.asciidoc[] -Use the `-c` flag to specify the configuration file. If no configuration file is -specified, {agent} uses the default configuration, `elastic-agent.yml`, which is +Use the `-c` flag to specify the policy file. If no policy file is +specified, {agent} uses the default policy, `elastic-agent.yml`, which is located in the same directory as {agent}. For configuration options, see <>. diff --git a/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc b/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc index 19b4628fde9..fc211baabac 100644 --- a/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc +++ b/x-pack/elastic-agent/docs/running-on-kubernetes.asciidoc @@ -44,7 +44,7 @@ curl -L -O https://raw.githubusercontent.com/elastic/beats/{branch}/deploy/kuber By default, {agent} is enrolled to an existing Kibana deployment, if present using the specified credentials. FLEET_ENROLLMENT_TOKEN parameter is used to connect Agent to the -corresponding Ingest Management configuration. It is suggested to connect Daemonset Agents to a node scope configuration +corresponding {agent} policy. It is suggested to connect Daemonset Agents to a node scope configuration and Deployment Agent to a cluster scope configuration. Then Kubernetes package will be deployed enabling cluster scope datasets using cluster scope configuration while node scope datasets will be enabled under node scope configuration. diff --git a/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc b/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc index cd77fc3dde3..78c7fab9cf9 100644 --- a/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc +++ b/x-pack/elastic-agent/docs/unenroll-elastic-agent.asciidoc @@ -4,7 +4,7 @@ You can unenroll an agent to invalidate the API key used to connect to {es}. -. In {ingest-manager}, select **{fleet}**. +. In {fleet}, select **Agents**. . Under Agents, choose **Unenroll** from the **Actions** menu next to the agent you want to unenroll. diff --git a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go index b646f3796ba..fca3dbd8828 100644 --- a/x-pack/elastic-agent/pkg/agent/application/paths/paths.go +++ b/x-pack/elastic-agent/pkg/agent/application/paths/paths.go @@ -10,14 +10,20 @@ import ( "os" "path/filepath" "strings" + "sync" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) +const ( + tempSubdir = "tmp" +) + var ( topPath string configPath string logsPath string + tmpCreator sync.Once ) func init() { @@ -37,6 +43,16 @@ func Top() string { return topPath } +// TempDir returns agent temp dir located within data dir. +func TempDir() string { + tmpDir := filepath.Join(Data(), tempSubdir) + tmpCreator.Do(func() { + // create tempdir as it probably don't exists + os.MkdirAll(tmpDir, 0750) + }) + return tmpDir +} + // Home returns a directory where binary lives func Home() string { return versionedHome(topPath) diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go index 5e26436bfc4..3dc0dbe232a 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer.go @@ -9,6 +9,8 @@ import ( "io/ioutil" "os" "path/filepath" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) type embeddedInstaller interface { @@ -31,7 +33,7 @@ func NewInstaller(i embeddedInstaller) (*Installer, error) { // Install performs installation of program in a specific version. func (i *Installer) Install(ctx context.Context, programName, version, installDir string) error { // tar installer uses Dir of installDir to determine location of unpack - tempDir, err := ioutil.TempDir(os.TempDir(), "elastic-agent-install") + tempDir, err := ioutil.TempDir(paths.TempDir(), "elastic-agent-install") if err != nil { return err } diff --git a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go index d6266659b7d..a0bfa213ca7 100644 --- a/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go +++ b/x-pack/elastic-agent/pkg/artifact/install/atomic/atomic_installer_test.go @@ -14,6 +14,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" ) func TestOKInstall(t *testing.T) { @@ -25,7 +27,7 @@ func TestOKInstall(t *testing.T) { assert.NoError(t, err) ctx := context.Background() - installDir := filepath.Join(os.TempDir(), "install_dir") + installDir := filepath.Join(paths.TempDir(), "install_dir") wg.Add(1) go func() { @@ -59,7 +61,7 @@ func TestContextCancelledInstall(t *testing.T) { assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) - installDir := filepath.Join(os.TempDir(), "install_dir") + installDir := filepath.Join(paths.TempDir(), "install_dir") wg.Add(1) go func() { diff --git a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go index 53b9f377fcd..2b4617bc2bd 100644 --- a/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go +++ b/x-pack/elastic-agent/pkg/core/monitoring/beats/beats_monitor.go @@ -113,7 +113,6 @@ func (b *Monitor) EnrichArgs(process, pipelineID string, args []string, isSideca logFile = fmt.Sprintf("%s-json.log", logFile) appendix = append(appendix, "-E", "logging.json=true", - "-E", "logging.ecs=true", "-E", "logging.files.path="+loggingPath, "-E", "logging.files.name="+logFile, "-E", "logging.files.keepfiles=7", diff --git a/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc b/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc index 4f7aa03a9ef..87a15d72a94 100644 --- a/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/istio/proxy/_meta/docs.asciidoc @@ -21,7 +21,7 @@ them. Here is an example configuration that can be used for that purpose: metricbeat.autodiscover: providers: - type: kubernetes - include_annotations: ["prometheus.io.scrape"] + node: ${NODE_NAME} templates: - condition: contains: