Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into feature/packaging…
Browse files Browse the repository at this point in the history
…-store-in-another-location-too

* upstream/master:
  [Ingest Manager] Prevent reporting ecs version twice (elastic#21616)
  [CI] Use google storage to keep artifacts (elastic#21910)
  Update docs.asciidoc (elastic#21849)
  Kubernetes leaderelection improvements (elastic#21896)
  Apply name changes to elastic agent docs (elastic#21549)
  Add 7.7.1 relnotes to 7.8 docs (elastic#21937) (elastic#21941)
  [libbeat] Fix potential deadlock in the disk queue + add more unit tests (elastic#21930)
  Refactor docker watcher to fix flaky test and other small issues (elastic#21851)
  [CI] Add stage name in the step (elastic#21887)
  [docs] Remove extra word in autodiscover docs (elastic#21871)
  [CI] lint stage doesn't produce test reports (elastic#21888)
  Add tests of reader of filestream input (elastic#21814)
  [Ingest Manager] Use local temp instead of system one (elastic#21883)
  • Loading branch information
v1v committed Oct 20, 2020
2 parents 4e07f83 + ee7d329 commit 21cd8fa
Show file tree
Hide file tree
Showing 27 changed files with 1,000 additions and 231 deletions.
39 changes: 38 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,38 @@ https://github.com/elastic/beats/compare/v7.7.0...v7.8.0[View commits]
- Add support for event IDs 4673,4674,4697,4698,4699,4700,4701,4702,4768,4769,4770,4771,4776,4778,4779,4964 to the Security module. {pull}17517[17517]
- Add registry and code signature information and ECS categorization fields for sysmon module. {pull}18058[18058]

[[release-notes-7.7.1]]
=== Beats version 7.7.1
https://github.com/elastic/beats/compare/v7.7.0...v7.7.1[View commits]

==== Bugfixes

*Affecting all Beats*

- Fix `keystore add` command hanging on Windows. {issue}18649[18649] {pull}18654[18654]

*Filebeat*

- Unescape filenames in SQS messages to resolve file paths correctly. {pull}18370[18370]
- Improve failure handler for Cisco ASA and FTD pipelines to avoid mapping temporary fields. {issue}18391[18391] {pull}18392[18392]
- Fix `source.address` field not being set for the Nginx `ingress_controller` fileset. {pull}18511[18511]
- Fix Google Cloud `audit` fileset to only take in fields that are explicitly defined by the fileset. {issue}18465[18465] {pull}18472[18472]
- Fix rate limit related issue in the `httpjson` input for the Okta module. {issue}18530[18530] {pull}18534[18534]
- Fix Cisco ASA and FTD parsing errors caused by NAT fields that contain a hostname instead of an IP. {issue}14034[14034] {pull}18376[18376]
- Fix PANW module to use correct mappings for bytes and packets counters. {issue}18522[18522] {pull}18525[18525]
- Fix Office 365 ingest failures caused by IP addresses surrounded by square brackets. {issue}18587[18587] {pull}18591[18591]

*Metricbeat*

- Fix `tags_filter` setting to work correctly for the AWS `cloudwatch` metricset. {pull}18524[18524]

==== Added

*Filebeat*

- Add support for Google Application Default Credentials to the Google Pub/Sub input and Google Cloud modules. {pull}15668[15668]
- Make `decode_cef` processor GA. {pull}17944[17944]

[[release-notes-7.7.0]]
=== Beats version 7.7.0
https://github.com/elastic/beats/compare/v7.6.2...v7.7.0[View commits]
Expand Down Expand Up @@ -729,6 +761,12 @@ https://github.com/elastic/beats/compare/v7.6.0...v7.6.1[View commits]

- Fix timeout option of GCP functions. {issue}16282[16282] {pull}16287[16287]

==== Added

*Winlogbeat*

- Made the event parser more lenient w.r.t. invalid event log definition version numbers. {issue}15838[15838]

[[release-notes-7.6.0]]
=== Beats version 7.6.0
https://github.com/elastic/beats/compare/v7.5.1...v7.6.0[View commits]
Expand Down Expand Up @@ -1101,7 +1139,6 @@ processing events. (CVE-2019-17596) See https://www.elastic.co/community/securit

- Fill `event.provider`. {pull}13937[13937]
- Add support for user management events to the Security module. {pull}13530[13530]
- Made the event parser more lenient w.r.t. invalid event log definition version numbers. {issue}15838[15838]

==== Deprecated

Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix `libbeat.output.write.bytes` and `libbeat.output.read.bytes` metrics of the Elasticsearch output. {issue}20752[20752] {pull}21197[21197]
- The `o365input` and `o365` module now recover from an authentication problem or other fatal errors, instead of terminating. {pull}21259[21258]
- Orderly close processors when processing pipelines are not needed anymore to release their resources. {pull}16349[16349]
- Fix memory leak and events duplication in docker autodiscover and add_docker_metadata. {pull}21851[21851]

*Auditbeat*

Expand Down
21 changes: 17 additions & 4 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pipeline {
}
steps {
withGithubNotify(context: 'Lint') {
withBeatsEnv(archive: true, id: 'lint') {
withBeatsEnv(archive: false, id: 'lint') {
dumpVariables()
cmd(label: 'make check', script: 'make check')
}
Expand Down Expand Up @@ -220,7 +220,7 @@ def target(Map args = [:]) {
// make commands use -C <folder> while mage commands require the dir(folder)
// let's support this scenario with the location variable.
dir(isMage ? directory : '') {
cmd(label: "${command}", script: "${command}")
cmd(label: "${args.id?.trim() ? args.id : env.STAGE_NAME} - ${command}", script: "${command}")
}
}
}
Expand Down Expand Up @@ -384,7 +384,7 @@ def archiveTestOutput(Map args = [:]) {
script: 'rm -rf ve || true; find . -type d -name vendor -exec rm -r {} \\;')
} else { log(level: 'INFO', text: 'Delete folders that are causing exceptions (See JENKINS-58421) is disabled for Windows.') }
junitAndStore(allowEmptyResults: true, keepLongStdio: true, testResults: args.testResults, stashedTestReports: stashedTestReports, id: args.id)
tar(file: "test-build-artifacts-${args.id}.tgz", dir: '.', archive: true, allowMissing: true)
tarAndUploadArtifacts(file: "test-build-artifacts-${args.id}.tgz", location: '.')
}
catchError(buildResult: 'SUCCESS', message: 'Failed to archive the build test results', stageResult: 'SUCCESS') {
def folder = cmd(label: 'Find system-tests', returnStdout: true, script: 'python .ci/scripts/search_system_tests.py').trim()
Expand All @@ -393,12 +393,25 @@ def archiveTestOutput(Map args = [:]) {
// TODO: nodeOS() should support ARM
def os_suffix = isArm() ? 'linux' : nodeOS()
def name = folder.replaceAll('/', '-').replaceAll('\\\\', '-').replaceAll('build', '').replaceAll('^-', '') + '-' + os_suffix
tar(file: "${name}.tgz", archive: true, dir: folder)
tarAndUploadArtifacts(file: "${name}.tgz", location: folder)
}
}
}
}

/**
* Wrapper to tar and upload artifacts to Google Storage to avoid killing the
* disk space of the jenkins instance
*/
def tarAndUploadArtifacts(Map args = [:]) {
tar(file: args.file, dir: args.location, archive: false, allowMissing: true)
googleStorageUpload(bucket: "gs://${JOB_GCS_BUCKET}/${env.JOB_NAME}-${env.BUILD_ID}",
credentialsId: "${JOB_GCS_CREDENTIALS}",
pattern: "${args.file}",
sharedPublicly: true,
showInline: true)
}

/**
* This method executes a closure with credentials for cloud test
* environments.
Expand Down
10 changes: 3 additions & 7 deletions filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,16 @@ func (f *logFile) Read(buf []byte) (int, error) {
}

func (f *logFile) startFileMonitoringIfNeeded() {
if f.closeInactive == 0 && f.closeAfterInterval == 0 {
return
}

if f.closeInactive > 0 {
if f.closeInactive > 0 || f.closeRemoved || f.closeRenamed {
f.tg.Go(func(ctx unison.Canceler) error {
f.closeIfTimeout(ctx)
f.periodicStateCheck(ctx)
return nil
})
}

if f.closeAfterInterval > 0 {
f.tg.Go(func(ctx unison.Canceler) error {
f.periodicStateCheck(ctx)
f.closeIfTimeout(ctx)
return nil
})
}
Expand Down
136 changes: 136 additions & 0 deletions filebeat/input/filestream/filestream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"context"
"io"
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/logp"
)

func TestLogFileTimedClosing(t *testing.T) {
testCases := map[string]struct {
inactive time.Duration
closeEOF bool
afterInterval time.Duration
expectedErr error
}{
"read from file and close inactive": {
inactive: 2 * time.Second,
expectedErr: ErrClosed,
},
"read from file and close after interval": {
afterInterval: 3 * time.Second,
expectedErr: ErrClosed,
},
"read from file and close on EOF": {
closeEOF: true,
expectedErr: io.EOF,
},
}

for name, test := range testCases {
test := test

f := createTestLogFile()
defer f.Close()
defer os.Remove(f.Name())

t.Run(name, func(t *testing.T) {
reader, err := newFileReader(
logp.L(),
context.TODO(),
f,
readerConfig{},
closerConfig{
OnStateChange: stateChangeCloserConfig{
CheckInterval: 1 * time.Second,
Inactive: test.inactive,
},
Reader: readerCloserConfig{
OnEOF: test.closeEOF,
AfterInterval: test.afterInterval,
},
},
)
if err != nil {
t.Fatalf("error while creating logReader: %+v", err)
}

err = readUntilError(reader)

assert.Equal(t, test.expectedErr, err)
})
}
}

func TestLogFileTruncated(t *testing.T) {
f := createTestLogFile()
defer f.Close()
defer os.Remove(f.Name())

reader, err := newFileReader(logp.L(), context.TODO(), f, readerConfig{}, closerConfig{})
if err != nil {
t.Fatalf("error while creating logReader: %+v", err)
}

buf := make([]byte, 1024)
_, err = reader.Read(buf)
assert.Nil(t, err)

err = f.Truncate(0)
if err != nil {
t.Fatalf("error while truncating file: %+v", err)
}

err = readUntilError(reader)

assert.Equal(t, ErrFileTruncate, err)
}

func createTestLogFile() *os.File {
f, err := ioutil.TempFile("", "filestream_reader_test")
if err != nil {
panic(err)
}
content := []byte("first log line\nanother interesting line\na third log message\n")
if _, err := f.Write(content); err != nil {
panic(err)
}
if _, err := f.Seek(0, io.SeekStart); err != nil {
panic(err)
}
return f
}

func readUntilError(reader *logFile) error {
buf := make([]byte, 1024)
_, err := reader.Read(buf)
for err == nil {
buf := make([]byte, 1024)
_, err = reader.Read(buf)
}
return err
}
104 changes: 104 additions & 0 deletions filebeat/input/filestream/filestream_test_non_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !windows

package filestream

import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/logp"
)

// these tests are separated as one cannot delete/rename files
// while another process is working with it on Windows
func TestLogFileRenamed(t *testing.T) {
f := createTestLogFile()
defer f.Close()

renamedFile := f.Name() + ".renamed"

reader, err := newFileReader(
logp.L(),
context.TODO(),
f,
readerConfig{},
closerConfig{
OnStateChange: stateChangeCloserConfig{
CheckInterval: 1 * time.Second,
Renamed: true,
},
},
)
if err != nil {
t.Fatalf("error while creating logReader: %+v", err)
}

buf := make([]byte, 1024)
_, err = reader.Read(buf)
assert.Nil(t, err)

err = os.Rename(f.Name(), renamedFile)
if err != nil {
t.Fatalf("error while renaming file: %+v", err)
}

err = readUntilError(reader)
os.Remove(renamedFile)

assert.Equal(t, ErrClosed, err)
}

func TestLogFileRemoved(t *testing.T) {
f := createTestLogFile()
defer f.Close()

reader, err := newFileReader(
logp.L(),
context.TODO(),
f,
readerConfig{},
closerConfig{
OnStateChange: stateChangeCloserConfig{
CheckInterval: 1 * time.Second,
Removed: true,
},
},
)
if err != nil {
t.Fatalf("error while creating logReader: %+v", err)
}

buf := make([]byte, 1024)
_, err = reader.Read(buf)
assert.Nil(t, err)

err = os.Remove(f.Name())
if err != nil {
t.Fatalf("error while remove file: %+v", err)
}

err = readUntilError(reader)

assert.Equal(t, ErrClosed, err)
}
Loading

0 comments on commit 21cd8fa

Please sign in to comment.