Skip to content

Commit

Permalink
Fix handling of @timestamp and type from the JSON decoder (#1421)
Browse files Browse the repository at this point in the history
If `json.keys_under_root` and `json.overwrite_keys` are set, and the keys
`@timestamp` or `type` show up in the decoded JSON, they can cause issues because
the Elasticsearch output makes some assumptions about them: @timestamp has to
be of type `common.Time` and `type` has to be of type `string`.

The fix attempts to make use of the `@timestamp` and `type` from the JSON, but if
parsing fails or the resulting values are invalid, the fields are not overwritten
and an error key is added to help with troubleshooting.

Fixes #1378.

Also hardens the `getIndex` call in libbeat, to protect against wrong types there
as well.
  • Loading branch information
tsg authored and andrewkroh committed Apr 20, 2016
1 parent e4b0678 commit dbcf2c7
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 5 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha1...master[Check the HEAD d
* Default location for the registry file was changed to be `data/registry` from the binary directory,
rather than `.filebeat` in the current working directory. This affects installations for zip/tar.gz/source,
the location for DEB and RPM packages stays the same. {pull}1373[1373]
* Fix issue with JSON decoding where `@timestamp` or `type` keys with the wrong type could cause Filebeat
to crash. {issue}1378[1378]

*Winlogbeat*

Expand Down
37 changes: 36 additions & 1 deletion filebeat/input/file.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package input

import (
"fmt"
"os"
"time"

Expand All @@ -9,6 +10,10 @@ import (
"github.com/elastic/beats/libbeat/logp"
)

const (
jsonErrorKey = "json_error"
)

type File struct {
File *os.File
FileInfo os.FileInfo
Expand Down Expand Up @@ -72,7 +77,37 @@ func mergeJSONFields(f *FileEvent, event common.MapStr) {
if f.JSONConfig.KeysUnderRoot {
for k, v := range f.JSONFields {
if f.JSONConfig.OverwriteKeys {
event[k] = v
if k == "@timestamp" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite @timestamp because value is not string")
event[jsonErrorKey] = "@timestamp not overwritten (not string)"
continue
}
// @timestamp must be of time common.Time
ts, err := common.ParseTime(vstr)
if err != nil {
logp.Err("JSON: Won't overwrite @timestamp because of parsing error: %v", err)
event[jsonErrorKey] = fmt.Sprintf("@timestamp not overwritten (parse error on %s)", vstr)
continue
}
event[k] = ts
} else if k == "type" {
vstr, ok := v.(string)
if !ok {
logp.Err("JSON: Won't overwrite type because value is not string")
event[jsonErrorKey] = "type not overwritten (not string)"
continue
}
if len(vstr) == 0 || vstr[0] == '_' {
logp.Err("JSON: Won't overwrite type because value is empty or starts with an underscore")
event[jsonErrorKey] = fmt.Sprintf("type not overwritten (invalid value [%s])", vstr)
continue
}
event[k] = vstr
} else {
event[k] = v
}
} else if _, exists := event[k]; !exists {
event[k] = v
}
Expand Down
90 changes: 90 additions & 0 deletions filebeat/input/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -124,6 +125,8 @@ func TestFileEventToMapStrJSON(t *testing.T) {

text := "hello"

now := time.Now()

tests := []io{
{
// by default, don't overwrite keys
Expand Down Expand Up @@ -177,10 +180,97 @@ func TestFileEventToMapStrJSON(t *testing.T) {
"type": "test_type",
},
},
{
// when @timestamp is in JSON and overwrite_keys is true, parse it
// in a common.Time
Event: FileEvent{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.444Z"},
JSONConfig: &config.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"),
"type": "test",
},
},
{
// when the parsing on @timestamp fails, leave the existing value and add an error key
// in a common.Time
Event: FileEvent{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "@timestamp": "2016-04-05T18:47:18.44XX4Z"},
JSONConfig: &config.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
"type": "test",
"json_error": "@timestamp not overwritten (parse error on 2016-04-05T18:47:18.44XX4Z)",
},
},
{
// when the @timestamp has the wrong type, leave the existing value and add an error key
// in a common.Time
Event: FileEvent{
ReadTime: now,
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "test", "@timestamp": 42},
JSONConfig: &config.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
"type": "test",
"json_error": "@timestamp not overwritten (not string)",
},
},
{
// if overwrite_keys is true, but the `type` key in json is not a string, ignore it
Event: FileEvent{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": 42},
JSONConfig: &config.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
"json_error": "type not overwritten (not string)",
},
},
{
// if overwrite_keys is true, but the `type` key in json is empty, ignore it
Event: FileEvent{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": ""},
JSONConfig: &config.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
"json_error": "type not overwritten (invalid value [])",
},
},
{
// if overwrite_keys is true, but the `type` key in json starts with _, ignore it
Event: FileEvent{
DocumentType: "test_type",
Text: &text,
JSONFields: common.MapStr{"type": "_type"},
JSONConfig: &config.JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
},
ExpectedItems: common.MapStr{
"type": "test_type",
"json_error": "type not overwritten (invalid value [_type])",
},
},
}

for _, test := range tests {
result := test.Event.ToMapStr()
t.Log("Executing test:", test)
for k, v := range test.ExpectedItems {
assert.Equal(t, v, result[k])
}
Expand Down
3 changes: 3 additions & 0 deletions filebeat/tests/files/logs/json_timestamp.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"@timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended"}
{"@timestamp":"invalid","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended"}
{"@timestamp":{"hello": "test"},"level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended"}
3 changes: 3 additions & 0 deletions filebeat/tests/files/logs/json_type.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": "test"}
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": 5}
{"timestamp":"2016-04-05T18:47:18.444Z","level":"INFO","logger":"iapi.logger","thread":"JobCourier4","appInfo":{"appname":"SessionManager","appid":"Pooler","host":"demohost.mydomain.com","ip":"192.168.128.113","pid":13982},"userFields":{"ApplicationId":"PROFAPP_001","RequestTrackingId":"RetrieveTBProfileToken-6066477"},"source":"DataAccess\/FetchActiveSessionToken.process","msg":"FetchActiveSessionToken process ended", "type": {"hello": "shouldn't work"}}
75 changes: 75 additions & 0 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,78 @@ def test_config_no_msg_key_multiline(self):
assert self.log_contains("When using the JSON decoder and multiline" +
" together, you need to specify a" +
" message_key value")

def test_timestamp_in_message(self):
"""
Should be able to make use of a `@timestamp` field if it exists in the
message.
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
json=dict(
message_key="msg",
keys_under_root=True,
overwrite_keys=True
),
)
os.mkdir(self.working_dir + "/log/")
self.copy_files(["logs/json_timestamp.log"],
source_dir="../files",
target_dir="log")

proc = self.start_beat()
self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)
proc.check_kill_and_wait()

output = self.read_output()
assert len(output) == 3
assert all(isinstance(o["@timestamp"], basestring) for o in output)
assert all(isinstance(o["type"], basestring) for o in output)
assert output[0]["@timestamp"] == "2016-04-05T18:47:18.444Z"

assert output[1]["@timestamp"] != "invalid"
assert output[1]["json_error"] == \
"@timestamp not overwritten (parse error on invalid)"

assert output[2]["json_error"] == \
"@timestamp not overwritten (not string)"

def test_type_in_message(self):
"""
If overwrite_keys is true and type is in the message, we have to
be careful to keep it as a valid type name.
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
json=dict(
message_key="msg",
keys_under_root=True,
overwrite_keys=True
),
)
os.mkdir(self.working_dir + "/log/")
self.copy_files(["logs/json_type.log"],
source_dir="../files",
target_dir="log")

proc = self.start_beat()
self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)
proc.check_kill_and_wait()

output = self.read_output()
assert len(output) == 3
assert all(isinstance(o["@timestamp"], basestring) for o in output)
assert all(isinstance(o["type"], basestring) for o in output)
assert output[0]["type"] == "test"

assert output[1]["type"] == "log"
assert output[1]["json_error"] == \
"type not overwritten (not string)"

assert output[2]["type"] == "log"
assert output[2]["json_error"] == \
"type not overwritten (not string)"
13 changes: 9 additions & 4 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,15 @@ func getIndex(event common.MapStr, index string) string {

// Check for dynamic index
if _, ok := event["beat"]; ok {
beatMeta := event["beat"].(common.MapStr)
// Check if index is set dynamically
if dynamicIndex, ok := beatMeta["index"]; ok {
index = dynamicIndex.(string)
beatMeta, ok := event["beat"].(common.MapStr)
if ok {
// Check if index is set dynamically
if dynamicIndex, ok := beatMeta["index"]; ok {
dynamicIndexValue, ok := dynamicIndex.(string)
if ok {
index = dynamicIndexValue
}
}
}
}

Expand Down

0 comments on commit dbcf2c7

Please sign in to comment.