Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Add elastic agent ID and version to events from filebeat and metricbeat. #21543

Merged
merged 3 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]
- Add `install` and `uninstall` subcommands {pull}21206[21206]
- Send updating state {pull}21461[21461]
- Add `elastic.agent.id` and `elastic.agent.version` to published events from filebeat and metricbeat {pull}21543[21543]
- Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425]
11 changes: 7 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"sync"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
Expand All @@ -18,7 +19,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

type decoratorFunc = func(string, *transpiler.AST, []program.Program) ([]program.Program, error)
type decoratorFunc = func(*info.AgentInfo, string, *transpiler.AST, []program.Program) ([]program.Program, error)
type filterFunc = func(*logger.Logger, *transpiler.AST) error

type reloadable interface {
Expand All @@ -36,6 +37,7 @@ type programsDispatcher interface {

type emitterController struct {
logger *logger.Logger
agentInfo *info.AgentInfo
controller composable.Controller
router programsDispatcher
modifiers *configModifiers
Expand Down Expand Up @@ -112,14 +114,14 @@ func (e *emitterController) update() error {

e.logger.Debug("Converting single configuration into specific programs configuration")

programsToRun, err := program.Programs(ast)
programsToRun, err := program.Programs(e.agentInfo, ast)
if err != nil {
return err
}

for _, decorator := range e.modifiers.Decorators {
for outputType, ptr := range programsToRun {
programsToRun[outputType], err = decorator(outputType, ast, ptr)
programsToRun[outputType], err = decorator(e.agentInfo, outputType, ast, ptr)
if err != nil {
return err
}
Expand All @@ -135,12 +137,13 @@ func (e *emitterController) update() error {
return e.router.Dispatch(ast.HashStr(), programsToRun)
}

func emitter(ctx context.Context, log *logger.Logger, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) (emitterFunc, error) {
func emitter(ctx context.Context, log *logger.Logger, agentInfo *info.AgentInfo, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) (emitterFunc, error) {
log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", "))

init, _ := transpiler.NewVars(map[string]interface{}{})
ctrl := &emitterController{
logger: log,
agentInfo: agentInfo,
controller: controller,
router: router,
modifiers: modifiers,
Expand Down
12 changes: 12 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/info/agent_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package info

import "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release"

// AgentInfo is a collection of information about agent.
type AgentInfo struct {
agentID string
Expand Down Expand Up @@ -44,3 +46,13 @@ func ForceNewAgentInfo() (*AgentInfo, error) {
func (i *AgentInfo) AgentID() string {
return i.agentID
}

// Version returns the version for this Agent.
func (*AgentInfo) Version() string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we use release.Version and release.Snapshot throughout the code, so maybe we can unify the usage of these where it's possible in a followup

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I would like to. I only had to add this here for testing purposes as I needed a way to mock the version for the unit tests.

return release.Version()
}

// Snapshot returns if this version is a snapshot.
func (*AgentInfo) Snapshot() bool {
return release.Snapshot()
}
45 changes: 26 additions & 19 deletions x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"context"
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"

"github.com/elastic/beats/v7/libbeat/logp"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand All @@ -37,14 +38,19 @@ func NewInspectOutputCmd(configPath, output, program string) (*InspectOutputCmd,

// Execute tries to enroll the agent into Fleet.
func (c *InspectOutputCmd) Execute() error {
agentInfo, err := info.NewAgentInfo()
if err != nil {
return err
}

if c.output == "" {
return c.inspectOutputs()
return c.inspectOutputs(agentInfo)
}

return c.inspectOutput()
return c.inspectOutput(agentInfo)
}

func (c *InspectOutputCmd) inspectOutputs() error {
func (c *InspectOutputCmd) inspectOutputs(agentInfo *info.AgentInfo) error {
rawConfig, err := loadConfig(c.cfgPath)
if err != nil {
return err
Expand All @@ -61,7 +67,7 @@ func (c *InspectOutputCmd) inspectOutputs() error {
}

if isStandalone(cfg.Fleet) {
return listOutputsFromConfig(l, rawConfig)
return listOutputsFromConfig(l, agentInfo, rawConfig)
}

fleetConfig, err := loadFleetConfig(rawConfig)
Expand All @@ -71,11 +77,11 @@ func (c *InspectOutputCmd) inspectOutputs() error {
return fmt.Errorf("no fleet config retrieved yet")
}

return listOutputsFromMap(l, fleetConfig)
return listOutputsFromMap(l, agentInfo, fleetConfig)
}

func listOutputsFromConfig(log *logger.Logger, cfg *config.Config) error {
programsGroup, err := getProgramsFromConfig(log, cfg)
func listOutputsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *config.Config) error {
programsGroup, err := getProgramsFromConfig(log, agentInfo, cfg)
if err != nil {
return err

Expand All @@ -88,16 +94,16 @@ func listOutputsFromConfig(log *logger.Logger, cfg *config.Config) error {
return nil
}

func listOutputsFromMap(log *logger.Logger, cfg map[string]interface{}) error {
func listOutputsFromMap(log *logger.Logger, agentInfo *info.AgentInfo, cfg map[string]interface{}) error {
c, err := config.NewConfigFrom(cfg)
if err != nil {
return err
}

return listOutputsFromConfig(log, c)
return listOutputsFromConfig(log, agentInfo, c)
}

func (c *InspectOutputCmd) inspectOutput() error {
func (c *InspectOutputCmd) inspectOutput(agentInfo *info.AgentInfo) error {
rawConfig, err := loadConfig(c.cfgPath)
if err != nil {
return err
Expand All @@ -114,7 +120,7 @@ func (c *InspectOutputCmd) inspectOutput() error {
}

if isStandalone(cfg.Fleet) {
return printOutputFromConfig(l, c.output, c.program, rawConfig)
return printOutputFromConfig(l, agentInfo, c.output, c.program, rawConfig)
}

fleetConfig, err := loadFleetConfig(rawConfig)
Expand All @@ -124,11 +130,11 @@ func (c *InspectOutputCmd) inspectOutput() error {
return fmt.Errorf("no fleet config retrieved yet")
}

return printOutputFromMap(l, c.output, c.program, fleetConfig)
return printOutputFromMap(l, agentInfo, c.output, c.program, fleetConfig)
}

func printOutputFromConfig(log *logger.Logger, output, programName string, cfg *config.Config) error {
programsGroup, err := getProgramsFromConfig(log, cfg)
func printOutputFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, output, programName string, cfg *config.Config) error {
programsGroup, err := getProgramsFromConfig(log, agentInfo, cfg)
if err != nil {
return err

Expand Down Expand Up @@ -164,16 +170,16 @@ func printOutputFromConfig(log *logger.Logger, output, programName string, cfg *

}

func printOutputFromMap(log *logger.Logger, output, programName string, cfg map[string]interface{}) error {
func printOutputFromMap(log *logger.Logger, agentInfo *info.AgentInfo, output, programName string, cfg map[string]interface{}) error {
c, err := config.NewConfigFrom(cfg)
if err != nil {
return err
}

return printOutputFromConfig(log, output, programName, c)
return printOutputFromConfig(log, agentInfo, output, programName, c)
}

func getProgramsFromConfig(log *logger.Logger, cfg *config.Config) (map[string][]program.Program, error) {
func getProgramsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *config.Config) (map[string][]program.Program, error) {
monitor := noop.NewMonitor()
router := &inmemRouter{}
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -186,6 +192,7 @@ func getProgramsFromConfig(log *logger.Logger, cfg *config.Config) (map[string][
emit, err := emitter(
ctx,
log,
agentInfo,
composableWaiter,
router,
&configModifiers{
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func newLocal(
emit, err := emitter(
localApplication.bgContext,
log,
agentInfo,
composableCtrl,
router,
&configModifiers{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func newManaged(
emit, err := emitter(
managedApplication.bgContext,
log,
agentInfo,
composableCtrl,
router,
&configModifiers{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
Expand All @@ -32,8 +33,9 @@ func TestManagedModeRouting(t *testing.T) {

log, _ := logger.New("")
router, _ := newRouter(log, streamFn)
agentInfo, _ := info.NewAgentInfo()
composableCtrl, _ := composable.New(log, nil)
emit, err := emitter(ctx, log, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}})
emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}})
require.NoError(t, err)

actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package application
import (
"fmt"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
)
Expand All @@ -28,7 +29,7 @@ const (
defaultOutputName = "default"
)

func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun []program.Program) ([]program.Program, error) {
func injectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *transpiler.AST, programsToRun []program.Program) ([]program.Program, error) {
var err error
monitoringProgram := program.Program{
Spec: program.Spec{
Expand Down Expand Up @@ -63,7 +64,7 @@ func injectMonitoring(outputGroup string, rootAst *transpiler.AST, programsToRun
}

ast := rootAst.Clone()
if err := getMonitoringRule(monitoringOutputName).Apply(ast); err != nil {
if err := getMonitoringRule(monitoringOutputName).Apply(agentInfo, ast); err != nil {
return programsToRun, err
}

Expand Down Expand Up @@ -93,6 +94,7 @@ func getMonitoringRule(outputName string) *transpiler.RuleList {
return transpiler.NewRuleList(
transpiler.Copy(monitoringOutputSelector, outputKey),
transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey),
transpiler.InjectAgentInfo(),
transpiler.Filter(monitoringKey, programsKey, outputKey),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,30 @@ package application
import (
"testing"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler"
)

func TestMonitoringInjection(t *testing.T) {
agentInfo, err := info.NewAgentInfo()
if err != nil {
t.Fatal(err)
}
ast, err := transpiler.NewAST(inputConfigMap)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(ast)
programsToRun, err := program.Programs(agentInfo, ast)
if err != nil {
t.Fatal(err)
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := injectMonitoring(group, ast, ptr)
newPtr, err := injectMonitoring(agentInfo, group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
Expand Down Expand Up @@ -83,20 +88,24 @@ GROUPLOOP:
}

func TestMonitoringInjectionDefaults(t *testing.T) {
agentInfo, err := info.NewAgentInfo()
if err != nil {
t.Fatal(err)
}
ast, err := transpiler.NewAST(inputConfigMapDefaults)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(ast)
programsToRun, err := program.Programs(agentInfo, ast)
if err != nil {
t.Fatal(err)
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := injectMonitoring(group, ast, ptr)
newPtr, err := injectMonitoring(agentInfo, group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
Expand Down Expand Up @@ -154,20 +163,24 @@ GROUPLOOP:
}

func TestMonitoringInjectionDisabled(t *testing.T) {
agentInfo, err := info.NewAgentInfo()
if err != nil {
t.Fatal(err)
}
ast, err := transpiler.NewAST(inputConfigMapDisabled)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(ast)
programsToRun, err := program.Programs(agentInfo, ast)
if err != nil {
t.Fatal(err)
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := injectMonitoring(group, ast, ptr)
newPtr, err := injectMonitoring(agentInfo, group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
Expand Down
Loading