Skip to content

Commit

Permalink
add more complex tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Apr 20, 2021
1 parent ab5df90 commit 446cae9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
3 changes: 2 additions & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,13 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, path stri
return nil, err
}

r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)

r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers)
if err != nil {
return nil, err
}

r = readfile.NewStripNewline(r, inp.readerConfig.LineTerminator)
r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

inp.msgPostProc = newPostProcessors(inp.readerConfig.Parsers)
Expand Down
7 changes: 0 additions & 7 deletions filebeat/input/filestream/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace)
for _, ns := range c {
name := ns.Name()
switch name {
case "limit":
p = readfile.NewLimitReader(p, pCfg.maxBytes)
case "multiline":
var config multiline.Config
cfg := ns.Config()
Expand All @@ -77,8 +75,6 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace)
return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err)
}
p = readjson.NewJSONReader(p, &config)
case "strip_newline":
p = readfile.NewStripNewline(p, pCfg.lineTerminator)
default:
return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name)
}
Expand Down Expand Up @@ -109,9 +105,6 @@ func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error {
for _, ns := range c {
name := ns.Name()
switch name {
case "limit":
case "strip_newline":
continue
case "multiline":
var config multiline.Config
cfg := ns.Config()
Expand Down
53 changes: 45 additions & 8 deletions filebeat/input/filestream/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,54 @@ func TestParsersConfigAndReading(t *testing.T) {
},
expectedMessages: []string{"line 1\n", "line 2\n"},
},
"correct strip_newline parser": {
lines: "line 1\nline 2\n",
"correct multiline parser": {
lines: "line 1.1\nline 1.2\nline 1.3\nline 2.1\nline 2.2\nline 2.3\n",
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"multiline": map[string]interface{}{
"type": "count",
"count_lines": 3,
},
},
},
},
expectedMessages: []string{
"line 1.1\n\nline 1.2\n\nline 1.3\n",
"line 2.1\n\nline 2.2\n\nline 2.3\n",
},
},
"multiline docker logs parser": {
lines: `{"log":"[log] The following are log messages\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"[log] This one is\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":" on multiple\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":" lines","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
`,
parsers: map[string]interface{}{
"paths": []string{"dummy_path"},
"parsers": []map[string]interface{}{
map[string]interface{}{
"strip_newline": nil,
"ndjson": map[string]interface{}{
"keys_under_root": true,
"message_key": "log",
},
},
map[string]interface{}{
"multiline": map[string]interface{}{
"match": "after",
"negate": true,
"pattern": "^\\[log\\]",
},
},
},
},
expectedMessages: []string{"line 1", "line 2"},
expectedMessages: []string{
"[log] The following are log messages\n",
"[log] This one is\n\n on multiple\n\n lines",
"[log] In total there should be 3 events\n",
},
},
"non existent parser configuration": {
parsers: map[string]interface{}{
Expand Down Expand Up @@ -96,12 +133,12 @@ func TestParsersConfigAndReading(t *testing.T) {
return
}

p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator}, cfg.Reader.Parsers)
p, err := newParsers(testReader(test.lines), parserConfig{lineTerminator: readfile.AutoLineTerminator, maxBytes: 64}, cfg.Reader.Parsers)

i := 0
msg, err := p.Next()
for err == nil {
require.Contains(t, test.expectedMessages[i], string(msg.Content))
require.Equal(t, test.expectedMessages[i], string(msg.Content))
i++
msg, err = p.Next()
}
Expand Down Expand Up @@ -235,9 +272,9 @@ func testReader(lines string) reader.Reader {
}
r, err := readfile.NewEncodeReader(ioutil.NopCloser(reader), readfile.Config{
Codec: enc,
BufferSize: 64,
BufferSize: 1024,
Terminator: readfile.AutoLineTerminator,
MaxBytes: 128,
MaxBytes: 1024,
})
if err != nil {
panic(err)
Expand Down

0 comments on commit 446cae9

Please sign in to comment.