Skip to content

Commit

Permalink
move agent metadata to a processor (#9952)
Browse files Browse the repository at this point in the history
  • Loading branch information
graphaelli authored Jan 9, 2019
1 parent d9a47a8 commit 2e2c62b
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ The list below covers the major changes between 7.0.0-alpha2 and master only.
==== Added

- Allow multiple object type configurations per field. {pull}9772[9772]
- Move agent metadata addition to a processor. {pull}9952[9952]
4 changes: 4 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type ClientConfig struct {
// if the normalization step should be skipped set this to true.
SkipNormalization bool

// By default events are decorated with agent metadata.
// To skip adding that metadata set this to true.
SkipAgentMetadata bool

// ACK handler strategies.
// Note: ack handlers are run in another go-routine owned by the publisher pipeline.
// They should not block for to long, to not block the internal buffers for
Expand Down
11 changes: 0 additions & 11 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,6 @@ func Load(
Annotations: Annotations{
Event: config.EventMetadata,
Builtin: common.MapStr{
"agent": common.MapStr{
"type": beatInfo.Beat,
"hostname": beatInfo.Hostname,
"version": beatInfo.Version,
"id": beatInfo.ID.String(),
"ephemeral_id": beatInfo.EphemeralID.String(),
},
"host": common.MapStr{
"name": name,
},
Expand All @@ -97,10 +90,6 @@ func Load(
},
}

if name != beatInfo.Hostname {
settings.Annotations.Builtin.Put("agent.name", name)
}

queueBuilder, err := createQueueBuilder(config.Queue, monitors)
if err != nil {
return nil, err
Expand Down
21 changes: 20 additions & 1 deletion libbeat/publisher/pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ func newProcessorPipeline(
processors.add(makeAddFieldsProcessor("beatsMeta", meta, needsCopy))
}

// setup 7: pipeline processors list
// setup 7: add agent metadata
if !config.SkipAgentMetadata {
processors.add(makeAddAgentMetadataProcessor(info))
}

// setup 8: pipeline processors list
processors.add(global.processors)

// setup 9: debug print final event (P)
Expand Down Expand Up @@ -290,6 +295,20 @@ func makeAddDynMetaProcessor(
})
}

func makeAddAgentMetadataProcessor(info beat.Info) *processorFn {
metadata := common.MapStr{
"type": info.Beat,
"ephemeral_id": info.EphemeralID.String(),
"hostname": info.Hostname,
"id": info.ID.String(),
"version": info.Version,
}
if info.Name != info.Hostname {
metadata.Put("name", info.Name)
}
return makeAddFieldsProcessor("add_agent_metadata", common.MapStr{"agent": metadata}, true)
}

func debugPrintProcessor(info beat.Info) *processorFn {
// ensure only one go-routine is using the encoder (in case
// beat.Client is shared between multiple go-routines by accident)
Expand Down
159 changes: 116 additions & 43 deletions libbeat/publisher/pipeline/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,36 @@ import (
"testing"
"time"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

func TestProcessors(t *testing.T) {
info := beat.Info{}
defaultInfo := beat.Info{}

type local struct {
config beat.ClientConfig
events []common.MapStr
expected []common.MapStr
config beat.ClientConfig
events []common.MapStr
expected []common.MapStr
includeAgentMetadata bool
}

tests := []struct {
name string
global pipelineProcessors
local []local
info *beat.Info
}{
{
"user global fields and tags",
pipelineProcessors{
name: "user global fields and tags",
global: pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{},
events: []common.MapStr{{"value": "abc", "user": nil}},
Expand All @@ -59,12 +62,12 @@ func TestProcessors(t *testing.T) {
},
},
{
"no normalization",
pipelineProcessors{
name: "no normalization",
global: pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{SkipNormalization: true},
events: []common.MapStr{{"value": "abc", "user": nil}},
Expand All @@ -75,9 +78,78 @@ func TestProcessors(t *testing.T) {
},
},
{
"beat local fields",
pipelineProcessors{},
[]local{
name: "add agent metadata",
global: pipelineProcessors{
fields: common.MapStr{"global": 1, "agent": common.MapStr{"foo": "bar"}},
tags: []string{"tag"},
},
info: &beat.Info{
Beat: "test",
EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")),
Hostname: "test.host.name",
ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")),
Name: "test.host.name",
Version: "0.1",
},
local: []local{
{
config: beat.ClientConfig{},
events: []common.MapStr{{"value": "abc", "user": nil}},
expected: []common.MapStr{
{
"agent": common.MapStr{
"ephemeral_id": "123e4567-e89b-12d3-a456-426655440000",
"hostname": "test.host.name",
"id": "123e4567-e89b-12d3-a456-426655440001",
"type": "test",
"version": "0.1",
"foo": "bar",
},
"value": "abc", "global": 1, "tags": []string{"tag"},
},
},
includeAgentMetadata: true,
},
},
},
{
name: "add agent metadata with custom host.name",
global: pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
info: &beat.Info{
Beat: "test",
EphemeralID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440000")),
Hostname: "test.host.name",
ID: uuid.Must(uuid.FromString("123e4567-e89b-12d3-a456-426655440001")),
Name: "other.test.host.name",
Version: "0.1",
},
local: []local{
{
config: beat.ClientConfig{},
events: []common.MapStr{{"value": "abc", "user": nil}},
expected: []common.MapStr{
{
"agent": common.MapStr{
"ephemeral_id": "123e4567-e89b-12d3-a456-426655440000",
"hostname": "test.host.name",
"id": "123e4567-e89b-12d3-a456-426655440001",
"name": "other.test.host.name",
"type": "test",
"version": "0.1",
},
"value": "abc", "global": 1, "tags": []string{"tag"},
},
},
includeAgentMetadata: true,
},
},
},
{
name: "beat local fields",
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
Expand All @@ -88,12 +160,12 @@ func TestProcessors(t *testing.T) {
},
},
{
"beat local and user global fields",
pipelineProcessors{
name: "beat local and user global fields",
global: pipelineProcessors{
fields: common.MapStr{"global": 1},
tags: []string{"tag"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
Expand All @@ -106,12 +178,12 @@ func TestProcessors(t *testing.T) {
},
},
{
"user global fields overwrite beat local fields",
pipelineProcessors{
name: "user global fields overwrite beat local fields",
global: pipelineProcessors{
fields: common.MapStr{"global": 1, "shared": "global"},
tags: []string{"tag"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1, "shared": "local"},
Expand All @@ -124,9 +196,8 @@ func TestProcessors(t *testing.T) {
},
},
{
"beat local fields isolated",
pipelineProcessors{},
[]local{
name: "beat local fields isolated",
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
Expand All @@ -145,11 +216,11 @@ func TestProcessors(t *testing.T) {
},

{
"beat local fields + user global fields isolated",
pipelineProcessors{
name: "beat local fields + user global fields isolated",
global: pipelineProcessors{
fields: common.MapStr{"global": 0},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
Fields: common.MapStr{"local": 1},
Expand All @@ -167,9 +238,8 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local fields and tags",
pipelineProcessors{},
[]local{
name: "user local fields and tags",
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -185,9 +255,8 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local fields (under root) and tags",
pipelineProcessors{},
[]local{
name: "user local fields (under root) and tags",
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -204,12 +273,12 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local fields overwrite user global fields",
pipelineProcessors{
name: "user local fields overwrite user global fields",
global: pipelineProcessors{
fields: common.MapStr{"global": 0, "shared": "global"},
tags: []string{"global"},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -230,9 +299,8 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local fields isolated",
pipelineProcessors{},
[]local{
name: "user local fields isolated",
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -254,11 +322,11 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local + global fields isolated",
pipelineProcessors{
name: "user local + global fields isolated",
global: pipelineProcessors{
fields: common.MapStr{"fields": common.MapStr{"global": 0}},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand All @@ -280,11 +348,11 @@ func TestProcessors(t *testing.T) {
},
},
{
"user local + global fields isolated (fields with root)",
pipelineProcessors{
name: "user local + global fields isolated (fields with root)",
global: pipelineProcessors{
fields: common.MapStr{"global": 0},
},
[]local{
local: []local{
{
config: beat.ClientConfig{
EventMetadata: common.EventMetadata{
Expand Down Expand Up @@ -314,7 +382,12 @@ func TestProcessors(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
// create processor pipelines
programs := make([]beat.Processor, len(test.local))
info := defaultInfo
if test.info != nil {
info = *test.info
}
for i, local := range test.local {
local.config.SkipAgentMetadata = !local.includeAgentMetadata
programs[i] = newProcessorPipeline(info, test.global, local.config)
}

Expand Down

0 comments on commit 2e2c62b

Please sign in to comment.