From 2e2c62bd0853add6000bbf919a96f35f45a858d2 Mon Sep 17 00:00:00 2001 From: Gil Raphaelli Date: Wed, 9 Jan 2019 12:34:48 -0500 Subject: [PATCH] move agent metadata to a processor (#9952) --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/beat/pipeline.go | 4 + libbeat/publisher/pipeline/module.go | 11 -- libbeat/publisher/pipeline/processor.go | 21 ++- libbeat/publisher/pipeline/processor_test.go | 159 ++++++++++++++----- 5 files changed, 141 insertions(+), 55 deletions(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index b425914e40f..0317f68c503 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -25,3 +25,4 @@ The list below covers the major changes between 7.0.0-alpha2 and master only. ==== Added - Allow multiple object type configurations per field. {pull}9772[9772] +- Move agent metadata addition to a processor. {pull}9952[9952] diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 444daae7efa..3db7baae6d2 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -76,6 +76,10 @@ type ClientConfig struct { // if the normalization step should be skipped set this to true. SkipNormalization bool + // By default events are decorated with agent metadata. + // To skip adding that metadata set this to true. + SkipAgentMetadata bool + // ACK handler strategies. // Note: ack handlers are run in another go-routine owned by the publisher pipeline. // They should not block for to long, to not block the internal buffers for diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go index cff8abdc1a6..0bb3710b9da 100644 --- a/libbeat/publisher/pipeline/module.go +++ b/libbeat/publisher/pipeline/module.go @@ -80,13 +80,6 @@ func Load( Annotations: Annotations{ Event: config.EventMetadata, Builtin: common.MapStr{ - "agent": common.MapStr{ - "type": beatInfo.Beat, - "hostname": beatInfo.Hostname, - "version": beatInfo.Version, - "id": beatInfo.ID.String(), - "ephemeral_id": beatInfo.EphemeralID.String(), - }, "host": common.MapStr{ "name": name, }, @@ -97,10 +90,6 @@ func Load( }, } - if name != beatInfo.Hostname { - settings.Annotations.Builtin.Put("agent.name", name) - } - queueBuilder, err := createQueueBuilder(config.Queue, monitors) if err != nil { return nil, err diff --git a/libbeat/publisher/pipeline/processor.go b/libbeat/publisher/pipeline/processor.go index 3eab02ad90d..f85a7c40e34 100644 --- a/libbeat/publisher/pipeline/processor.go +++ b/libbeat/publisher/pipeline/processor.go @@ -121,7 +121,12 @@ func newProcessorPipeline( processors.add(makeAddFieldsProcessor("beatsMeta", meta, needsCopy)) } - // setup 7: pipeline processors list + // setup 7: add agent metadata + if !config.SkipAgentMetadata { + processors.add(makeAddAgentMetadataProcessor(info)) + } + + // setup 8: pipeline processors list processors.add(global.processors) // setup 9: debug print final event (P) @@ -290,6 +295,20 @@ func makeAddDynMetaProcessor( }) } +func makeAddAgentMetadataProcessor(info beat.Info) *processorFn { + metadata := common.MapStr{ + "type": info.Beat, + "ephemeral_id": info.EphemeralID.String(), + "hostname": info.Hostname, + "id": info.ID.String(), + "version": info.Version, + } + if info.Name != info.Hostname { + metadata.Put("name", info.Name) + } + return makeAddFieldsProcessor("add_agent_metadata", common.MapStr{"agent": metadata}, true) +} + func debugPrintProcessor(info beat.Info) *processorFn { // ensure only one go-routine is using the encoder (in case // beat.Client is shared between multiple go-routines by accident) diff --git a/libbeat/publisher/pipeline/processor_test.go b/libbeat/publisher/pipeline/processor_test.go index df5f30724b4..a2e84da4371 100644 --- a/libbeat/publisher/pipeline/processor_test.go +++ b/libbeat/publisher/pipeline/processor_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/beat" @@ -29,26 +30,28 @@ import ( ) func TestProcessors(t *testing.T) { - info := beat.Info{} + defaultInfo := beat.Info{} type local struct { - config beat.ClientConfig - events []common.MapStr - expected []common.MapStr + config beat.ClientConfig + events []common.MapStr + expected []common.MapStr + includeAgentMetadata bool } tests := []struct { name string global pipelineProcessors local []local + info *beat.Info }{ { - "user global fields and tags", - pipelineProcessors{ + name: "user global fields and tags", + global: pipelineProcessors{ fields: common.MapStr{"global": 1}, tags: []string{"tag"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{}, events: []common.MapStr{{"value": "abc", "user": nil}}, @@ -59,12 +62,12 @@ func TestProcessors(t *testing.T) { }, }, { - "no normalization", - pipelineProcessors{ + name: "no normalization", + global: pipelineProcessors{ fields: common.MapStr{"global": 1}, tags: []string{"tag"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{SkipNormalization: true}, events: []common.MapStr{{"value": "abc", "user": nil}}, @@ -75,9 +78,78 @@ func TestProcessors(t *testing.T) { }, }, { - "beat local fields", - pipelineProcessors{}, - []local{ + name: "add agent metadata", + global: pipelineProcessors{ + fields: common.MapStr{"global": 1, "agent": common.MapStr{"foo": "bar"}}, + tags: []string{"tag"}, + }, + info: &beat.Info{ + Beat: "test", + EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), + Hostname: "test.host.name", + ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), + Name: "test.host.name", + Version: "0.1", + }, + local: []local{ + { + config: beat.ClientConfig{}, + events: []common.MapStr{{"value": "abc", "user": nil}}, + expected: []common.MapStr{ + { + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "type": "test", + "version": "0.1", + "foo": "bar", + }, + "value": "abc", "global": 1, "tags": []string{"tag"}, + }, + }, + includeAgentMetadata: true, + }, + }, + }, + { + name: "add agent metadata with custom host.name", + global: pipelineProcessors{ + fields: common.MapStr{"global": 1}, + tags: []string{"tag"}, + }, + info: &beat.Info{ + Beat: "test", + EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")), + Hostname: "test.host.name", + ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")), + Name: "other.test.host.name", + Version: "0.1", + }, + local: []local{ + { + config: beat.ClientConfig{}, + events: []common.MapStr{{"value": "abc", "user": nil}}, + expected: []common.MapStr{ + { + "agent": common.MapStr{ + "ephemeral_id": "123e4567-e89b-12d3-a456-426655440000", + "hostname": "test.host.name", + "id": "123e4567-e89b-12d3-a456-426655440001", + "name": "other.test.host.name", + "type": "test", + "version": "0.1", + }, + "value": "abc", "global": 1, "tags": []string{"tag"}, + }, + }, + includeAgentMetadata: true, + }, + }, + }, + { + name: "beat local fields", + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1}, @@ -88,12 +160,12 @@ func TestProcessors(t *testing.T) { }, }, { - "beat local and user global fields", - pipelineProcessors{ + name: "beat local and user global fields", + global: pipelineProcessors{ fields: common.MapStr{"global": 1}, tags: []string{"tag"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1}, @@ -106,12 +178,12 @@ func TestProcessors(t *testing.T) { }, }, { - "user global fields overwrite beat local fields", - pipelineProcessors{ + name: "user global fields overwrite beat local fields", + global: pipelineProcessors{ fields: common.MapStr{"global": 1, "shared": "global"}, tags: []string{"tag"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1, "shared": "local"}, @@ -124,9 +196,8 @@ func TestProcessors(t *testing.T) { }, }, { - "beat local fields isolated", - pipelineProcessors{}, - []local{ + name: "beat local fields isolated", + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1}, @@ -145,11 +216,11 @@ func TestProcessors(t *testing.T) { }, { - "beat local fields + user global fields isolated", - pipelineProcessors{ + name: "beat local fields + user global fields isolated", + global: pipelineProcessors{ fields: common.MapStr{"global": 0}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ Fields: common.MapStr{"local": 1}, @@ -167,9 +238,8 @@ func TestProcessors(t *testing.T) { }, }, { - "user local fields and tags", - pipelineProcessors{}, - []local{ + name: "user local fields and tags", + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -185,9 +255,8 @@ func TestProcessors(t *testing.T) { }, }, { - "user local fields (under root) and tags", - pipelineProcessors{}, - []local{ + name: "user local fields (under root) and tags", + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -204,12 +273,12 @@ func TestProcessors(t *testing.T) { }, }, { - "user local fields overwrite user global fields", - pipelineProcessors{ + name: "user local fields overwrite user global fields", + global: pipelineProcessors{ fields: common.MapStr{"global": 0, "shared": "global"}, tags: []string{"global"}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -230,9 +299,8 @@ func TestProcessors(t *testing.T) { }, }, { - "user local fields isolated", - pipelineProcessors{}, - []local{ + name: "user local fields isolated", + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -254,11 +322,11 @@ func TestProcessors(t *testing.T) { }, }, { - "user local + global fields isolated", - pipelineProcessors{ + name: "user local + global fields isolated", + global: pipelineProcessors{ fields: common.MapStr{"fields": common.MapStr{"global": 0}}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -280,11 +348,11 @@ func TestProcessors(t *testing.T) { }, }, { - "user local + global fields isolated (fields with root)", - pipelineProcessors{ + name: "user local + global fields isolated (fields with root)", + global: pipelineProcessors{ fields: common.MapStr{"global": 0}, }, - []local{ + local: []local{ { config: beat.ClientConfig{ EventMetadata: common.EventMetadata{ @@ -314,7 +382,12 @@ func TestProcessors(t *testing.T) { t.Run(test.name, func(t *testing.T) { // create processor pipelines programs := make([]beat.Processor, len(test.local)) + info := defaultInfo + if test.info != nil { + info = *test.info + } for i, local := range test.local { + local.config.SkipAgentMetadata = !local.includeAgentMetadata programs[i] = newProcessorPipeline(info, test.global, local.config) }