Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into hostfs_cgroups_li…
Browse files Browse the repository at this point in the history
…bbeat
  • Loading branch information
fearful-symmetry committed Mar 11, 2021
2 parents fd7d246 + 416bc00 commit c9acdb9
Show file tree
Hide file tree
Showing 29 changed files with 1,013 additions and 202 deletions.
2 changes: 1 addition & 1 deletion .go-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.15.8
1.15.9
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Add support for customized monitoring API. {pull}22605[22605]
- Update Go version to 1.15.7. {pull}22495[22495]
- Update Go version to 1.15.8. {pull}23955[23955]
- Update Go version to 1.15.9. {pull}24442[24442]
2 changes: 1 addition & 1 deletion auditbeat/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.15.8
FROM golang:1.15.9

RUN \
apt-get update \
Expand Down
2 changes: 1 addition & 1 deletion filebeat/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.15.8
FROM golang:1.15.9

RUN \
apt-get update \
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// Config stores the options of a file stream.
type config struct {
readerConfig
Reader readerConfig `config:",inline"`

Paths []string `config:"paths"`
Close closerConfig `config:"close"`
Expand Down Expand Up @@ -79,7 +79,7 @@ type backoffConfig struct {

func defaultConfig() config {
return config{
readerConfig: defaultReaderConfig(),
Reader: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
Expand Down
37 changes: 13 additions & 24 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,8 @@ type fileMeta struct {
// are actively written by other applications.
type filestream struct {
readerConfig readerConfig
bufferSize int
tailFile bool // TODO
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
lineTerminator readfile.LineTerminator
excludeLines []match.Matcher
includeLines []match.Matcher
maxBytes int
closerConfig closerConfig
}

Expand Down Expand Up @@ -97,9 +91,9 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, fmt.Errorf("error while creating file identifier: %v", err)
}

encodingFactory, ok := encoding.FindEncoding(config.Encoding)
encodingFactory, ok := encoding.FindEncoding(config.Reader.Encoding)
if !ok || encodingFactory == nil {
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Encoding)
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding)
}

prospector := &fileProspector{
Expand All @@ -111,13 +105,8 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
}

filestream := &filestream{
readerConfig: config.readerConfig,
bufferSize: config.BufferSize,
readerConfig: config.Reader,
encodingFactory: encodingFactory,
lineTerminator: config.LineTerminator,
excludeLines: config.ExcludeLines,
includeLines: config.IncludeLines,
maxBytes: config.MaxBytes,
closerConfig: config.Close,
}

Expand Down Expand Up @@ -191,7 +180,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
return nil, err
}

log.Debug("newLogFileReader with config.MaxBytes:", inp.maxBytes)
log.Debug("newLogFileReader with config.MaxBytes:", inp.readerConfig.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
Expand All @@ -211,22 +200,22 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
// for the worst case scenario where incoming UTF32 charchers are decoded to the single byte UTF-8 characters.
// This limit serves primarily to avoid memory bload or potential OOM with expectedly long lines in the file.
// The further size limiting is performed by LimitReader at the end of the readers pipeline as needed.
encReaderMaxBytes := inp.maxBytes * 4
encReaderMaxBytes := inp.readerConfig.MaxBytes * 4

var r reader.Reader
r, err = readfile.NewEncodeReader(dbgReader, readfile.Config{
Codec: inp.encoding,
BufferSize: inp.bufferSize,
Terminator: inp.lineTerminator,
BufferSize: inp.readerConfig.BufferSize,
Terminator: inp.readerConfig.LineTerminator,
MaxBytes: encReaderMaxBytes,
})
if err != nil {
f.Close()
return nil, err
}

r = readfile.NewStripNewline(r, inp.lineTerminator)
r = readfile.NewLimitReader(r, inp.maxBytes)
r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)
r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

return r, nil
}
Expand Down Expand Up @@ -335,14 +324,14 @@ func (inp *filestream) readFromSource(
// isDroppedLine decides if the line is exported or not based on
// the include_lines and exclude_lines options.
func (inp *filestream) isDroppedLine(log *logp.Logger, line string) bool {
if len(inp.includeLines) > 0 {
if !matchAny(inp.includeLines, line) {
if len(inp.readerConfig.IncludeLines) > 0 {
if !matchAny(inp.readerConfig.IncludeLines, line) {
log.Debug("Drop line as it does not match any of the include patterns %s", line)
return true
}
}
if len(inp.excludeLines) > 0 {
if matchAny(inp.excludeLines, line) {
if len(inp.readerConfig.ExcludeLines) > 0 {
if matchAny(inp.readerConfig.ExcludeLines, line) {
log.Debug("Drop line as it does match one of the exclude patterns%s", line)
return true
}
Expand Down
187 changes: 183 additions & 4 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,18 @@
package filestream

import (
"bytes"
"context"
"os"
"runtime"
"testing"

"github.com/stretchr/testify/require"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
)

// test_close_renamed from test_harvester.py
Expand Down Expand Up @@ -57,7 +66,6 @@ func TestFilestreamCloseRenamed(t *testing.T) {
newerTestlines := []byte("new first log line\nnew second log line\n")
env.mustWriteLinesToFile(testlogName, newerTestlines)

// new two events arrived
env.waitUntilEventCount(3)

cancelInput()
Expand All @@ -67,6 +75,46 @@ func TestFilestreamCloseRenamed(t *testing.T) {
env.requireOffsetInRegistry(testlogName, len(newerTestlines))
}

// test_close_removed from test_harvester.py
func TestFilestreamCloseRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName) + "*"},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "1ms",
"close.on_state_change.removed": "true",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\n")
env.mustWriteLinesToFile(testlogName, testlines)

// first event has made it successfully
env.waitUntilEventCount(1)

env.requireOffsetInRegistry(testlogName, len(testlines))

fi, err := os.Stat(env.abspath(testlogName))
if err != nil {
t.Fatalf("cannot stat file: %+v", err)
}

env.mustRemoveFile(testlogName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()

identifier, _ := newINodeDeviceIdentifier(nil)
src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: env.abspath(testlogName)})
env.requireOffsetInRegistryByID(src.Name(), len(testlines))
}

// test_close_eof from test_harvester.py
func TestFilestreamCloseEOF(t *testing.T) {
env := newInputTestingEnvironment(t)
Expand All @@ -78,13 +126,13 @@ func TestFilestreamCloseEOF(t *testing.T) {
"close.reader.on_eof": "true",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\n")
expectedOffset := len(testlines)
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

// first event has made it successfully
env.waitUntilEventCount(1)
env.requireOffsetInRegistry(testlogName, expectedOffset)
Expand All @@ -100,3 +148,134 @@ func TestFilestreamCloseEOF(t *testing.T) {

env.requireOffsetInRegistry(testlogName, expectedOffset)
}

// test_empty_lines from test_harvester.py
func TestFilestreamEmptyLine(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("first log line\nnext is an empty line\n")
env.mustWriteLinesToFile(testlogName, testlines)

env.waitUntilEventCount(2)
env.requireOffsetInRegistry(testlogName, len(testlines))

moreTestlines := []byte("\nafter an empty line\n")
env.mustAppendLinesToFile(testlogName, moreTestlines)

env.waitUntilEventCount(3)
env.requireEventsReceived([]string{
"first log line",
"next is an empty line",
"after an empty line",
})

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines)+len(moreTestlines))
}

// test_empty_lines_only from test_harvester.py
// This test differs from the original because in filestream
// input offset is no longer persisted when the line is empty.
func TestFilestreamEmptyLinesOnly(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

testlines := []byte("\n\n\n")
env.mustWriteLinesToFile(testlogName, testlines)

cancelInput()
env.waitUntilInputStops()

env.requireNoEntryInRegistry(testlogName)
}

// test_bom_utf8 from test_harvester.py
func TestFilestreamBOMUTF8(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

// BOM: 0xEF,0xBB,0xBF
lines := append([]byte{0xEF, 0xBB, 0xBF}, []byte(`#Software: Microsoft Exchange Server
#Version: 14.0.0.0
#Log-type: Message Tracking Log
#Date: 2016-04-05T00:00:02.052Z
#Fields: date-time,client-ip,client-hostname,server-ip,server-hostname,source-context,connector-id,source,event-id,internal-message-id,message-id,recipient-address,recipient-status,total-bytes,recipient-count,related-recipient-address,reference,message-subject,sender-address,return-path,message-info,directionality,tenant-id,original-client-ip,original-server-ip,custom-data
2016-04-05T00:00:02.052Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:279f077c-216f-4323-a9ee-48e50ffd3cad, Event:269492708, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.022Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-37-DB-F9-F9-B5-F2-42-4F-86-62-E6-5D-FC-0C-A1-41-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-00-1F-D4-B5-0E-00-00-2E-EF-F2-59-0E-E8-2D-46-BC-31-02-85-0D-67-98-43-00-00-37-4A-A3-B3-00-00
2016-04-05T00:00:02.145Z,,,,,"MDB:61914740-3f1b-4ddb-94e0-557196870cfa, Mailbox:49cb09c6-5b76-415d-a085-da0ad9079682, Event:269492711, MessageClass:IPM.Note.StorageQuotaWarning.Warning, CreationTime:2016-04-05T00:00:01.038Z, ClientType:System",,STOREDRIVER,NOTIFYMAPI,,,,,,,,,,,,,,,,,S:ItemEntryId=00-00-00-00-97-8F-07-43-51-44-61-4A-AD-BD-29-D4-97-4E-20-A0-07-00-0E-D6-03-16-80-DC-8C-44-9D-30-07-23-ED-71-B7-F7-00-8E-8F-BD-EB-57-00-00-3D-FB-CE-26-A4-8D-46-4C-A4-35-0F-A7-9B-FA-D7-B9-00-00-37-44-2F-CA-00-00
`)...)
env.mustWriteLinesToFile(testlogName, lines)

env.waitUntilEventCount(7)

cancelInput()
env.waitUntilInputStops()

messages := env.getOutputMessages()
require.Equal(t, messages[0], "#Software: Microsoft Exchange Server")
}

// test_boms from test_harvester.py
func TestFilestreamUTF16BOMs(t *testing.T) {
encodings := map[string]encoding.Encoding{
"utf-16be-bom": unicode.UTF16(unicode.BigEndian, unicode.UseBOM),
"utf-16le-bom": unicode.UTF16(unicode.LittleEndian, unicode.UseBOM),
}

for name, enc := range encodings {
name := name
encoder := enc.NewEncoder()
t.Run(name, func(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"encoding": name,
})

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

line := []byte("first line\n")
buf := bytes.NewBuffer(nil)
writer := transform.NewWriter(buf, encoder)
writer.Write(line)
writer.Close()

env.mustWriteLinesToFile(testlogName, buf.Bytes())

env.waitUntilEventCount(1)

env.requireEventsReceived([]string{"first line"})

cancelInput()
env.waitUntilInputStops()
})
}
}
Loading

0 comments on commit c9acdb9

Please sign in to comment.