Skip to content

Commit

Permalink
Fix missing support for setting document id in decoder_json pr… (#15859)
Browse files Browse the repository at this point in the history
* Change to metadata._id

Update processors, output, and json parser to store the document ID in
`@metadata._id`. This ensures better compatibility with Logstash
inputs/filters setting `@metadata._id`.

Also add missing `document_id` to decode_json_fields processor, given
users the chance to set the document id if the JSON document was
embedded in another JSON document.
  • Loading branch information
Steffen Siering authored Jan 28, 2020
1 parent c3f0276 commit d60b04a
Show file tree
Hide file tree
Showing 14 changed files with 85 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Affecting all Beats*

- The document id fields has been renamed from @metadata.id to @metadata._id {pull}15859[15859]


*Auditbeat*

Expand Down Expand Up @@ -84,6 +86,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Affecting all Beats*

- Add document_id setting to decode_json_fields processor. {pull}15859[15859]


*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ occur.

*`document_id`*:: Option configuration setting that specifies the JSON key to
set the document id. If configured, the field will be removed from the original
json document and stored in `@metadata.id`
json document and stored in `@metadata._id`

*`ignore_decoding_error`*:: An optional configuration setting that specifies if
JSON decoding errors should be logged or not. If set to true, errors will not
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ func (h *Harvester) onMessage(

if id != "" {
meta = common.MapStr{
"id": id,
"_id": id,
}
}
} else if &text != nil {
Expand Down
4 changes: 2 additions & 2 deletions filebeat/tests/system/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ def test_id_in_message(self):

assert len(output) == 3
for i in xrange(len(output)):
assert("@metadata.id" in output[i])
assert(output[i]["@metadata.id"] == str(i))
assert("@metadata._id" in output[i])
assert(output[i]["@metadata._id"] == str(i))
assert("json.id" not in output[i])

def test_with_generic_filtering(self):
Expand Down
2 changes: 1 addition & 1 deletion libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (e *Event) SetID(id string) {
if e.Meta == nil {
e.Meta = common.MapStr{}
}
e.Meta["id"] = id
e.Meta["_id"] = id
}

func (e *Event) GetValue(key string) (interface{}, error) {
Expand Down
8 changes: 4 additions & 4 deletions libbeat/beat/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestEventPutGetTimestamp(t *testing.T) {

func TestEventMetadata(t *testing.T) {
const id = "123"
newMeta := func() common.MapStr { return common.MapStr{"id": id} }
newMeta := func() common.MapStr { return common.MapStr{"_id": id} }

t.Run("put", func(t *testing.T) {
evt := newEmptyEvent()
Expand All @@ -75,7 +75,7 @@ func TestEventMetadata(t *testing.T) {
t.Run("put sub-key", func(t *testing.T) {
evt := newEmptyEvent()

evt.PutValue("@metadata.id", id)
evt.PutValue("@metadata._id", id)

assert.Equal(t, newMeta(), evt.Meta)
assert.Empty(t, evt.Fields)
Expand All @@ -85,7 +85,7 @@ func TestEventMetadata(t *testing.T) {
evt := newEmptyEvent()
evt.Meta = newMeta()

v, err := evt.GetValue("@metadata.id")
v, err := evt.GetValue("@metadata._id")

assert.NoError(t, err)
assert.Equal(t, id, v)
Expand All @@ -105,7 +105,7 @@ func TestEventMetadata(t *testing.T) {
evt := newEmptyEvent()
evt.Meta = newMeta()

err := evt.Delete("@metadata.id")
err := evt.Delete("@metadata._id")

assert.NoError(t, err)
assert.Empty(t, evt.Meta)
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func createEventBulkMeta(

var id string
if m := event.Meta; m != nil {
if tmp := m["id"]; tmp != nil {
if tmp := m["_id"]; tmp != nil {
if s, ok := tmp.(string); ok {
id = s
} else {
Expand Down
31 changes: 30 additions & 1 deletion libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type decodeJSONFields struct {
overwriteKeys bool
addErrorKey bool
processArray bool
documentID string
target *string
}

Expand All @@ -50,6 +51,7 @@ type config struct {
AddErrorKey bool `config:"add_error_key"`
ProcessArray bool `config:"process_array"`
Target *string `config:"target"`
DocumentID string `config:"document_id"`
}

var (
Expand Down Expand Up @@ -81,7 +83,15 @@ func NewDecodeJSONFields(c *common.Config) (processors.Processor, error) {
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err)
}

f := &decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, overwriteKeys: config.OverwriteKeys, addErrorKey: config.AddErrorKey, processArray: config.ProcessArray, target: config.Target}
f := &decodeJSONFields{
fields: config.Fields,
maxDepth: config.MaxDepth,
overwriteKeys: config.OverwriteKeys,
addErrorKey: config.AddErrorKey,
processArray: config.ProcessArray,
documentID: config.DocumentID,
target: config.Target,
}
return f, nil
}

Expand Down Expand Up @@ -115,6 +125,18 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
target = *f.target
}

var id string
if key := f.documentID; key != "" {
if dict, ok := output.(map[string]interface{}); ok {
if tmp, err := common.MapStr(dict).GetValue(key); err == nil {
if v, ok := tmp.(string); ok {
id = v
common.MapStr(dict).Delete(key)
}
}
}
}

if target != "" {
_, err = event.PutValue(target, output)
} else {
Expand All @@ -131,6 +153,13 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) {
errs = append(errs, err.Error())
continue
}

if id != "" {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["_id"] = id
}
}

if len(errs) > 0 {
Expand Down
33 changes: 33 additions & 0 deletions libbeat/processors/actions/decode_json_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -94,6 +95,38 @@ func TestInvalidJSONMultiple(t *testing.T) {
assert.Equal(t, expected.String(), actual.String())
}

func TestDocumentID(t *testing.T) {
logp.TestingSetup()

input := common.MapStr{
"msg": `{"log": "message", "myid": "myDocumentID"}`,
}

config := common.MustNewConfigFrom(map[string]interface{}{
"fields": []string{"msg"},
"document_id": "myid",
})

p, err := NewDecodeJSONFields(config)
if err != nil {
logp.Err("Error initializing decode_json_fields")
t.Fatal(err)
}

actual, err := p.Run(&beat.Event{Fields: input})
require.NoError(t, err)

wantFields := common.MapStr{
"msg": map[string]interface{}{"log": "message"},
}
wantMeta := common.MapStr{
"_id": "myDocumentID",
}

assert.Equal(t, wantFields, actual.Fields)
assert.Equal(t, wantMeta, actual.Meta)
}

func TestValidJSONDepthOne(t *testing.T) {
input := common.MapStr{
"msg": "{\"log\":\"{\\\"level\\\":\\\"info\\\"}\",\"stream\":\"stderr\",\"count\":3}",
Expand Down
3 changes: 3 additions & 0 deletions libbeat/processors/actions/docs/decode_json_fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ default value is false.
`error` field is going to be part of event with error message. If it set to false, there
will not be any error in event's field. Even error occurs while decoding json keys. The
default value is false
`document_id`:: (Optional) JSON key to use as the document id. If configured,
the field will be removed from the original json document and stored in
`@metadata._id`
4 changes: 2 additions & 2 deletions libbeat/processors/add_id/add_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDefaultTargetField(t *testing.T) {
newEvent, err := p.Run(testEvent)
assert.NoError(t, err)

v, err := newEvent.GetValue("@metadata.id")
v, err := newEvent.GetValue("@metadata._id")
assert.NoError(t, err)
assert.NotEmpty(t, v)
}
Expand All @@ -59,7 +59,7 @@ func TestNonDefaultTargetField(t *testing.T) {
assert.NoError(t, err)
assert.NotEmpty(t, v)

v, err = newEvent.GetValue("@metadata.id")
v, err = newEvent.GetValue("@metadata._id")
assert.NoError(t, err)
assert.Empty(t, v)
}
2 changes: 1 addition & 1 deletion libbeat/processors/add_id/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type config struct {

func defaultConfig() config {
return config{
TargetField: "@metadata.id",
TargetField: "@metadata._id",
Type: "elasticsearch",
}
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_id/docs/add_id.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ processors:

The following settings are supported:

`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata.id`.
`target_field`:: (Optional) Field where the generated ID will be stored. Default is `@metadata._id`.

`type`:: (Optional) Type of ID to generate. Currently only `elasticsearch` is supported and is the default.
The `elasticsearch` type generates IDs using the same algorithm that Elasticsearch uses for auto-generating
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/mb/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func TestEventConversionToBeatEvent(t *testing.T) {
e := mbEvent.BeatEvent(module, metricSet)
e = mbEvent.BeatEvent(module, metricSet)

assert.Equal(t, "foobar", e.Meta["id"])
assert.Equal(t, "foobar", e.Meta["_id"])
assert.Equal(t, timestamp, e.Timestamp)
assert.Equal(t, common.MapStr{
"type": "docker",
Expand Down

0 comments on commit d60b04a

Please sign in to comment.