From 446cae90a626452dc60945b75b2eba7b25acedeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Tue, 20 Apr 2021 17:05:53 +0200 Subject: [PATCH] add more complex tests --- filebeat/input/filestream/input.go | 3 +- filebeat/input/filestream/parser.go | 7 ---- filebeat/input/filestream/parser_test.go | 53 ++++++++++++++++++++---- 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 8d06ebc47fc..1e1298ee1a6 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -217,12 +217,13 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri return nil, err } + r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator) + r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers) if err != nil { return nil, err } - r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator) r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes) inp.msgPostProc = newPostProcessors(inp.readerConfig.Parsers) diff --git a/filebeat/input/filestream/parser.go b/filebeat/input/filestream/parser.go index c99d0853236..6345e6966c3 100644 --- a/filebeat/input/filestream/parser.go +++ b/filebeat/input/filestream/parser.go @@ -56,8 +56,6 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) for _, ns := range c { name := ns.Name() switch name { - case "limit": - p = readfile.NewLimitReader(p, pCfg.maxBytes) case "multiline": var config multiline.Config cfg := ns.Config() @@ -77,8 +75,6 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err) } p = readjson.NewJSONReader(p, &config) - case "strip_newline": - p = readfile.NewStripNewline(p, pCfg.lineTerminator) default: return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name) } @@ -109,9 +105,6 @@ func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error { for _, ns := range c { name := ns.Name() switch name { - case "limit": - case "strip_newline": - continue case "multiline": var config multiline.Config cfg := ns.Config() diff --git a/filebeat/input/filestream/parser_test.go b/filebeat/input/filestream/parser_test.go index 164cbc11395..a312012eaf3 100644 --- a/filebeat/input/filestream/parser_test.go +++ b/filebeat/input/filestream/parser_test.go @@ -45,17 +45,54 @@ func TestParsersConfigAndReading(t *testing.T) { }, expectedMessages: []string{"line 1\n", "line 2\n"}, }, - "correct strip_newline parser": { - lines: "line 1\nline 2\n", + "correct multiline parser": { + lines: "line 1.1\nline 1.2\nline 1.3\nline 2.1\nline 2.2\nline 2.3\n", + parsers: map[string]interface{}{ + "paths": []string{"dummy_path"}, + "parsers": []map[string]interface{}{ + map[string]interface{}{ + "multiline": map[string]interface{}{ + "type": "count", + "count_lines": 3, + }, + }, + }, + }, + expectedMessages: []string{ + "line 1.1\n\nline 1.2\n\nline 1.3\n", + "line 2.1\n\nline 2.2\n\nline 2.3\n", + }, + }, + "multiline docker logs parser": { + lines: `{"log":"[log] The following are log messages\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"[log] This one is\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":" on multiple\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":" lines","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +{"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"} +`, parsers: map[string]interface{}{ "paths": []string{"dummy_path"}, "parsers": []map[string]interface{}{ map[string]interface{}{ - "strip_newline": nil, + "ndjson": map[string]interface{}{ + "keys_under_root": true, + "message_key": "log", + }, + }, + map[string]interface{}{ + "multiline": map[string]interface{}{ + "match": "after", + "negate": true, + "pattern": "^\\[log\\]", + }, }, }, }, - expectedMessages: []string{"line 1", "line 2"}, + expectedMessages: []string{ + "[log] The following are log messages\n", + "[log] This one is\n\n on multiple\n\n lines", + "[log] In total there should be 3 events\n", + }, }, "non existent parser configuration": { parsers: map[string]interface{}{ @@ -96,12 +133,12 @@ func TestParsersConfigAndReading(t *testing.T) { return } - p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator}, cfg.Reader.Parsers) + p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers) i := 0 msg, err := p.Next() for err == nil { - require.Contains(t, test.expectedMessages[i], string(msg.Content)) + require.Equal(t, test.expectedMessages[i], string(msg.Content)) i++ msg, err = p.Next() } @@ -235,9 +272,9 @@ func testReader(lines string) reader.Reader { } r, err := readfile.NewEncodeReader(ioutil.NopCloser(reader), readfile.Config{ Codec: enc, - BufferSize: 64, + BufferSize: 1024, Terminator: readfile.AutoLineTerminator, - MaxBytes: 128, + MaxBytes: 1024, }) if err != nil { panic(err)