Skip to content

Commit

Permalink
[Ingest Manager] Send datastreams fields (elastic#20416)
Browse files Browse the repository at this point in the history
[Ingest Manager] Send datastreams fields (elastic#20416)
  • Loading branch information
michalpristas authored Aug 4, 2020
1 parent 41836d9 commit de06917
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 0 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,4 @@
- Will retry to enroll if the server return a 429. {pull}19918[19811]
- Add --staging option to enroll command {pull}20026[20026]
- Add `event.dataset` to all events {pull}20076[20076]
- Send datastreams fields {pull}20416[20416]
30 changes: 30 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,16 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
},
},
},
{
"add_fields": map[string]interface{}{
"target": "datastream",
"fields": map[string]interface{}{
"type": "logs",
"dataset": "elastic.agent",
"namespace": "default",
},
},
},
{
"add_fields": map[string]interface{}{
"target": "event",
Expand Down Expand Up @@ -232,6 +242,16 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
},
},
},
{
"add_fields": map[string]interface{}{
"target": "datastream",
"fields": map[string]interface{}{
"type": "logs",
"dataset": fmt.Sprintf("elastic.agent.%s", name),
"namespace": "default",
},
},
},
{
"add_fields": map[string]interface{}{
"target": "event",
Expand Down Expand Up @@ -282,6 +302,16 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
},
},
},
{
"add_fields": map[string]interface{}{
"target": "datastream",
"fields": map[string]interface{}{
"type": "metrics",
"dataset": fmt.Sprintf("elastic.agent.%s", name),
"namespace": "default",
},
},
},
{
"add_fields": map[string]interface{}{
"target": "event",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ filebeat:
type: logs
name: generic
namespace: default
- add_fields:
target: "datastream"
fields:
type: logs
dataset: generic
namespace: default
- add_fields:
target: "event"
fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ filebeat:
type: logs
name: generic
namespace: default
- add_fields:
target: "datastream"
fields:
type: logs
dataset: generic
namespace: default
- add_fields:
target: "event"
fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ filebeat:
type: logs
name: generic
namespace: default
- add_fields:
target: "datastream"
fields:
type: logs
dataset: generic
namespace: default
- add_fields:
target: "event"
fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ filebeat:
type: logs
name: generic
namespace: default
- add_fields:
target: "datastream"
fields:
type: logs
dataset: generic
namespace: default
- add_fields:
target: "event"
fields:
Expand All @@ -32,6 +38,12 @@ filebeat:
type: testtype
name: generic
namespace: default
- add_fields:
target: "datastream"
fields:
type: testtype
dataset: generic
namespace: default
- add_fields:
target: "event"
fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ metricbeat:
type: metrics
name: docker.status
namespace: default
- add_fields:
target: "datastream"
fields:
type: metrics
dataset: docker.status
namespace: default
- add_fields:
target: "event"
fields:
Expand All @@ -26,6 +32,12 @@ metricbeat:
type: metrics
name: generic
namespace: default
- add_fields:
target: "datastream"
fields:
type: metrics
dataset: generic
namespace: default
- add_fields:
target: "event"
fields:
Expand All @@ -44,6 +56,12 @@ metricbeat:
type: metrics
name: generic
namespace: testing
- add_fields:
target: "datastream"
fields:
type: metrics
dataset: generic
namespace: testing
- add_fields:
target: "event"
fields:
Expand Down
13 changes: 13 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
return errors.New("InjectStreamProcessorRule: processors is not a list")
}

// dataset
processorMap := &Dict{value: make([]Node, 0)}
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "dataset"}})
processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{
Expand All @@ -642,6 +643,18 @@ func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
addFieldsMap := &Dict{value: []Node{&Key{"add_fields", processorMap}}}
processorsList.value = mergeStrategy(r.OnConflict).InjectItem(processorsList.value, addFieldsMap)

// datastream
processorMap = &Dict{value: make([]Node, 0)}
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "datastream"}})
processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{
&Key{name: "type", value: &StrVal{value: datasetType}},
&Key{name: "namespace", value: &StrVal{value: namespace}},
&Key{name: "dataset", value: &StrVal{value: dataset}},
}}})
addFieldsMap = &Dict{value: []Node{&Key{"add_fields", processorMap}}}
processorsList.value = mergeStrategy(r.OnConflict).InjectItem(processorsList.value, addFieldsMap)

// event
processorMap = &Dict{value: make([]Node, 0)}
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "event"}})
processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{
Expand Down

0 comments on commit de06917

Please sign in to comment.