Skip to content

Commit

Permalink
Fix double @timestamp key when using JSON decoding
Browse files Browse the repository at this point in the history
The MergeJSONFields was adding the parsed @timestamp key to the fields,
instead of modifying it into the Event structure. This change makes it
return the new timestamp (or the empty Timestamp if no change required),
and the caller sets it into the event.
  • Loading branch information
tsg committed Oct 25, 2017
1 parent b6e4fed commit 6386f6f
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 24 deletions.
9 changes: 4 additions & 5 deletions filebeat/harvester/reader/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func createJSONError(message string) common.MapStr {
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
// takes precedence.
func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config JSONConfig) {
func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string, config JSONConfig) time.Time {
// The message key might have been modified by multiline
if len(config.MessageKey) > 0 && text != nil {
jsonFields[config.MessageKey] = *text
Expand All @@ -111,16 +111,15 @@ func MergeJSONFields(data common.MapStr, jsonFields common.MapStr, text *string,
case common.Time:
ts = time.Time(ts)
}
delete(data, "@timestamp")
}
event := &beat.Event{
Timestamp: ts,
Fields: data,
}
jsontransform.WriteJSONKeys(event, jsonFields, config.OverwriteKeys)

// if timestamp has been set -> add to data
if !event.Timestamp.IsZero() {
data["@timestamp"] = common.Time(event.Timestamp)
}
return event.Timestamp
}
return time.Time{}
}
37 changes: 23 additions & 14 deletions filebeat/harvester/reader/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,12 @@ func TestAddJSONFields(t *testing.T) {
now := time.Now().UTC()

tests := []struct {
Name string
Data common.MapStr
Text *string
JSONConfig JSONConfig
ExpectedItems common.MapStr
Name string
Data common.MapStr
Text *string
JSONConfig JSONConfig
ExpectedItems common.MapStr
ExpectedTimestamp time.Time
}{
{
// by default, don't overwrite keys
Expand All @@ -192,6 +193,7 @@ func TestAddJSONFields(t *testing.T) {
"type": "test_type",
"text": "hello",
},
ExpectedTimestamp: time.Time{},
},
{
// overwrite keys if asked
Expand All @@ -203,6 +205,7 @@ func TestAddJSONFields(t *testing.T) {
"type": "test",
"text": "hello",
},
ExpectedTimestamp: time.Time{},
},
{
// without keys_under_root, put everything in a json key
Expand All @@ -213,6 +216,7 @@ func TestAddJSONFields(t *testing.T) {
ExpectedItems: common.MapStr{
"json": common.MapStr{"type": "test", "text": "hello"},
},
ExpectedTimestamp: time.Time{},
},
{
// when MessageKey is defined, the Text overwrites the value of that key
Expand All @@ -224,6 +228,7 @@ func TestAddJSONFields(t *testing.T) {
"json": common.MapStr{"type": "test", "text": "hello"},
"type": "test_type",
},
ExpectedTimestamp: time.Time{},
},
{
// when @timestamp is in JSON and overwrite_keys is true, parse it
Expand All @@ -233,9 +238,9 @@ func TestAddJSONFields(t *testing.T) {
Text: &text,
JSONConfig: JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"@timestamp": common.MustParseTime("2016-04-05T18:47:18.444Z"),
"type": "test",
"type": "test",
},
ExpectedTimestamp: time.Time(common.MustParseTime("2016-04-05T18:47:18.444Z")),
},
{
// when the parsing on @timestamp fails, leave the existing value and add an error key
Expand All @@ -245,10 +250,10 @@ func TestAddJSONFields(t *testing.T) {
Text: &text,
JSONConfig: JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
"type": "test",
"error": common.MapStr{"type": "json", "message": "@timestamp not overwritten (parse error on 2016-04-05T18:47:18.44XX4Z)"},
"type": "test",
"error": common.MapStr{"type": "json", "message": "@timestamp not overwritten (parse error on 2016-04-05T18:47:18.44XX4Z)"},
},
ExpectedTimestamp: time.Time{},
},
{
// when the @timestamp has the wrong type, leave the existing value and add an error key
Expand All @@ -258,10 +263,10 @@ func TestAddJSONFields(t *testing.T) {
Text: &text,
JSONConfig: JSONConfig{KeysUnderRoot: true, OverwriteKeys: true},
ExpectedItems: common.MapStr{
"@timestamp": common.Time(now),
"type": "test",
"error": common.MapStr{"type": "json", "message": "@timestamp not overwritten (not string)"},
"type": "test",
"error": common.MapStr{"type": "json", "message": "@timestamp not overwritten (not string)"},
},
ExpectedTimestamp: time.Time{},
},
{
// if overwrite_keys is true, but the `type` key in json is not a string, ignore it
Expand All @@ -273,6 +278,7 @@ func TestAddJSONFields(t *testing.T) {
"type": "test_type",
"error": common.MapStr{"type": "json", "message": "type not overwritten (not string)"},
},
ExpectedTimestamp: time.Time{},
},
{
// if overwrite_keys is true, but the `type` key in json is empty, ignore it
Expand All @@ -284,6 +290,7 @@ func TestAddJSONFields(t *testing.T) {
"type": "test_type",
"error": common.MapStr{"type": "json", "message": "type not overwritten (invalid value [])"},
},
ExpectedTimestamp: time.Time{},
},
{
// if overwrite_keys is true, but the `type` key in json starts with _, ignore it
Expand All @@ -295,6 +302,7 @@ func TestAddJSONFields(t *testing.T) {
"type": "test_type",
"error": common.MapStr{"type": "json", "message": "type not overwritten (invalid value [_type])"},
},
ExpectedTimestamp: time.Time{},
},
}

Expand All @@ -305,12 +313,13 @@ func TestAddJSONFields(t *testing.T) {
jsonFields = fields.(common.MapStr)
}

MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig)
ts := MergeJSONFields(test.Data, jsonFields, test.Text, test.JSONConfig)

t.Log("Executing test:", test)
for k, v := range test.ExpectedItems {
assert.Equal(t, v, test.Data[k])
}
assert.Equal(t, test.ExpectedTimestamp, ts)
})
}
}
16 changes: 11 additions & 5 deletions filebeat/prospector/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,19 +270,25 @@ func (h *Harvester) Run() error {
jsonFields = f.(common.MapStr)
}

data.Event = beat.Event{
Timestamp: message.Ts,
}

if h.config.JSON != nil && len(jsonFields) > 0 {
reader.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
ts := reader.MergeJSONFields(fields, jsonFields, &text, *h.config.JSON)
if !ts.IsZero() {
// there was a `@timestamp` key in the event, so overwrite
// the resulting timestamp
data.Event.Timestamp = ts
}
} else if &text != nil {
if fields == nil {
fields = common.MapStr{}
}
fields["message"] = text
}

data.Event = beat.Event{
Timestamp: message.Ts,
Fields: fields,
}
data.Event.Fields = fields
}

// Always send event to update state, also if lines was skipped
Expand Down

0 comments on commit 6386f6f

Please sign in to comment.