Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Fix missing elastic_agent event data #21994

Merged
merged 6 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
- Use local temp instead of system one {pull}21883[21883]
- Rename monitoring index from `elastic.agent` to `elastic_agent` {pull}21932[21932]
- Fix issue with named pipes on Windows 7 {pull}21931[21931]
- Fix missing elastic_agent event data {pull}21994[21994]

==== New features

Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func newLocal(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(localApplication.bgContext, cfg.Settings, localApplication.srv, reporter, monitor))
router, err := newRouter(log, streamFactory(localApplication.bgContext, agentInfo, cfg.Settings, localApplication.srv, reporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func newManaged(
return nil, errors.New(err, "failed to initialize monitoring")
}

router, err := newRouter(log, streamFactory(managedApplication.bgContext, cfg.Settings, managedApplication.srv, combinedReporter, monitor))
router, err := newRouter(log, streamFactory(managedApplication.bgContext, agentInfo, cfg.Settings, managedApplication.srv, combinedReporter, monitor))
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func getMonitoringRule(outputName string) *transpiler.RuleList {
return transpiler.NewRuleList(
transpiler.Copy(monitoringOutputSelector, outputKey),
transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey),
transpiler.InjectAgentInfo(),
transpiler.Filter(monitoringKey, programsKey, outputKey),
)
}
8 changes: 5 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand Down Expand Up @@ -40,10 +41,10 @@ func (b *operatorStream) Shutdown() {
b.configHandler.Shutdown()
}

func streamFactory(ctx context.Context, cfg *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
func streamFactory(ctx context.Context, agentInfo *info.AgentInfo, cfg *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(ctx, log, id, cfg, srv, r, m)
operator, err := newOperator(ctx, log, agentInfo, id, cfg, srv, r, m)
if err != nil {
return nil, err
}
Expand All @@ -55,7 +56,7 @@ func streamFactory(ctx context.Context, cfg *configuration.SettingsConfig, srv *
}
}

func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
func newOperator(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, id routingKey, config *configuration.SettingsConfig, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
fetcher := downloader.NewDownloader(log, config.DownloadConfig, false)
allowEmptyPgp, pgp := release.PGP()
verifier, err := downloader.NewVerifier(log, config.DownloadConfig, allowEmptyPgp, pgp, false)
Expand All @@ -81,6 +82,7 @@ func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config
return operation.NewOperator(
ctx,
log,
agentInfo,
id,
config,
fetcher,
Expand Down
4 changes: 3 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver"
Expand Down Expand Up @@ -48,6 +49,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a
}

l := getLogger()
agentInfo, _ := info.NewAgentInfo()

fetcher := &DummyDownloader{}
verifier := &DummyVerifier{}
Expand All @@ -67,7 +69,7 @@ func getTestOperator(t *testing.T, downloadPath string, installPath string, p *a
t.Fatal(err)
}

operator, err := NewOperator(context.Background(), l, "p1", operatorCfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, noop.NewMonitor())
operator, err := NewOperator(context.Background(), l, agentInfo, "p1", operatorCfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, noop.NewMonitor())
if err != nil {
t.Fatal(err)
}
Expand Down
30 changes: 30 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
},
},
},
{
"add_fields": map[string]interface{}{
"target": "elastic_agent",
"fields": map[string]interface{}{
"id": o.agentInfo.AgentID(),
"version": o.agentInfo.Version(),
"snapshot": o.agentInfo.Snapshot(),
},
},
},
},
},
}
Expand Down Expand Up @@ -240,6 +250,16 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
},
},
},
{
"add_fields": map[string]interface{}{
"target": "elastic_agent",
"fields": map[string]interface{}{
"id": o.agentInfo.AgentID(),
"version": o.agentInfo.Version(),
"snapshot": o.agentInfo.Snapshot(),
},
},
},
},
})
}
Expand Down Expand Up @@ -290,6 +310,16 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
},
},
},
{
"add_fields": map[string]interface{}{
"target": "elastic_agent",
"fields": map[string]interface{}{
"id": o.agentInfo.AgentID(),
"version": o.agentInfo.Version(),
"snapshot": o.agentInfo.Snapshot(),
},
},
},
},
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/elastic/elastic-agent-client/v7/pkg/proto"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/stateresolver"
Expand Down Expand Up @@ -112,6 +113,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M
}

l := getLogger()
agentInfo, _ := info.NewAgentInfo()

fetcher := &DummyDownloader{}
verifier := &DummyVerifier{}
Expand All @@ -128,7 +130,7 @@ func getMonitorableTestOperator(t *testing.T, installPath string, m monitoring.M
}

ctx := context.Background()
operator, err := NewOperator(ctx, l, "p1", cfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, m)
operator, err := NewOperator(ctx, l, agentInfo, "p1", cfg, fetcher, verifier, installer, uninstaller, stateResolver, srv, nil, m)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
Expand Down Expand Up @@ -43,6 +44,7 @@ type Operator struct {
bgContext context.Context
pipelineID string
logger *logger.Logger
agentInfo *info.AgentInfo
config *configuration.SettingsConfig
handlers map[string]handleFunc
stateResolver *stateresolver.StateResolver
Expand All @@ -66,6 +68,7 @@ type Operator struct {
func NewOperator(
ctx context.Context,
logger *logger.Logger,
agentInfo *info.AgentInfo,
pipelineID string,
config *configuration.SettingsConfig,
fetcher download.Downloader,
Expand All @@ -85,6 +88,7 @@ func NewOperator(
config: config,
pipelineID: pipelineID,
logger: logger,
agentInfo: agentInfo,
downloader: fetcher,
verifier: verifier,
installer: installer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ filebeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
output:
elasticsearch:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ filebeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ filebeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
- type: log
paths:
- /var/log/hello3.log
Expand All @@ -43,11 +43,11 @@ filebeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ metricbeat:
fields:
dataset: docker.status
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
- module: docker
metricsets: [info]
index: metrics-generic-default
Expand All @@ -37,11 +37,11 @@ metricbeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
- module: apache
metricsets: [info]
index: metrics-generic-testing
Expand All @@ -61,11 +61,11 @@ metricbeat:
fields:
dataset: generic
- add_fields:
target: "elastic"
target: "elastic_agent"
fields:
agent.id: agent-id
agent.version: 8.0.0
agent.snapshot: false
id: agent-id
version: 8.0.0
snapshot: false
output:
elasticsearch:
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
Expand Down
8 changes: 4 additions & 4 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,11 +715,11 @@ func (r *InjectAgentInfoRule) Apply(agentInfo AgentInfo, ast *AST) error {

// elastic.agent
processorMap := &Dict{value: make([]Node, 0)}
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "elastic"}})
processorMap.value = append(processorMap.value, &Key{name: "target", value: &StrVal{value: "elastic_agent"}})
processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{
&Key{name: "agent.id", value: &StrVal{value: agentInfo.AgentID()}},
&Key{name: "agent.version", value: &StrVal{value: agentInfo.Version()}},
&Key{name: "agent.snapshot", value: &BoolVal{value: agentInfo.Snapshot()}},
&Key{name: "id", value: &StrVal{value: agentInfo.AgentID()}},
&Key{name: "version", value: &StrVal{value: agentInfo.Version()}},
&Key{name: "snapshot", value: &BoolVal{value: agentInfo.Snapshot()}},
}}})
addFieldsMap := &Dict{value: []Node{&Key{"add_fields", processorMap}}}
processorsList.value = mergeStrategy("").InjectItem(processorsList.value, addFieldsMap)
Expand Down
16 changes: 8 additions & 8 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ inputs:
type: file
processors:
- add_fields:
target: elastic
target: elastic_agent
fields:
agent.id: agent-id
agent.snapshot: false
agent.version: 8.0.0
id: agent-id
snapshot: false
version: 8.0.0
- name: With processors
type: file
processors:
Expand All @@ -197,11 +197,11 @@ inputs:
fields:
data: more
- add_fields:
target: elastic
target: elastic_agent
fields:
agent.id: agent-id
agent.snapshot: false
agent.version: 8.0.0
id: agent-id
snapshot: false
version: 8.0.0
`,
rule: &RuleList{
Rules: []Rule{
Expand Down