diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index e4f74663193..efe7ac71d78 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -17,3 +17,4 @@ - Add restart CLI cmd {pull}20359[20359] - Add new `synthetics/*` inputs to run Heartbeat {pull}20387[20387] - Users of the Docker image can now pass `FLEET_ENROLL_INSECURE=1` to include the `--insecure` flag with the `elastic-agent enroll` command {issue}20312[20312] {pull}20713[20713] +- Add support for dynamic inputs with providers and `{{variable|"default"}}` substitution. {pull}20839[20839] diff --git a/x-pack/elastic-agent/pkg/agent/application/application.go b/x-pack/elastic-agent/pkg/agent/application/application.go index 08bd0f94b8d..d0b16f11f13 100644 --- a/x-pack/elastic-agent/pkg/agent/application/application.go +++ b/x-pack/elastic-agent/pkg/agent/application/application.go @@ -26,7 +26,7 @@ func New(log *logger.Logger, pathConfigFile string) (Application, error) { // Load configuration from disk to understand in which mode of operation // we must start the elastic-agent, the mode of operation cannot be changed without restarting the // elastic-agent. - rawConfig, err := config.LoadYAML(pathConfigFile) + rawConfig, err := LoadConfigFromFile(pathConfigFile) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/agent/application/config.go b/x-pack/elastic-agent/pkg/agent/application/config.go index 8dfd093e040..ff15ca44074 100644 --- a/x-pack/elastic-agent/pkg/agent/application/config.go +++ b/x-pack/elastic-agent/pkg/agent/application/config.go @@ -5,8 +5,15 @@ package application import ( + "io/ioutil" + + "github.com/elastic/go-ucfg" + + "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/kibana" ) @@ -26,3 +33,50 @@ func createFleetConfigFromEnroll(accessAPIKey string, kbn *kibana.Config) (*conf } return cfg, nil } + +// LoadConfigFromFile loads the Agent configuration from a file. +// +// This must be used to load the Agent configuration, so that variables defined in the inputs are not +// parsed by go-ucfg. Variables from the inputs should be parsed by the transpiler. +func LoadConfigFromFile(path string) (*config.Config, error) { + in, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + var m map[string]interface{} + if err := yaml.Unmarshal(in, &m); err != nil { + return nil, err + } + return LoadConfig(m) +} + +// LoadConfig loads the Agent configuration from a map. +// +// This must be used to load the Agent configuration, so that variables defined in the inputs are not +// parsed by go-ucfg. Variables from the inputs should be parsed by the transpiler. +func LoadConfig(m map[string]interface{}) (*config.Config, error) { + inputs, ok := m["inputs"] + if ok { + // remove the inputs + delete(m, "inputs") + } + cfg, err := config.NewConfigFrom(m) + if err != nil { + return nil, err + } + if ok { + inputsOnly := map[string]interface{}{ + "inputs": inputs, + } + // convert to config without variable substitution + inputsCfg, err := config.NewConfigFrom(inputsOnly, ucfg.PathSep("."), ucfg.ResolveNOOP) + if err != nil { + return nil, err + } + err = cfg.Merge(inputsCfg, ucfg.PathSep("."), ucfg.ResolveNOOP) + if err != nil { + return nil, err + } + } + return cfg, err +} diff --git a/x-pack/elastic-agent/pkg/agent/application/config_test.go b/x-pack/elastic-agent/pkg/agent/application/config_test.go index fe9453ac8f4..4d4527a1e60 100644 --- a/x-pack/elastic-agent/pkg/agent/application/config_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/config_test.go @@ -5,9 +5,14 @@ package application import ( + "io/ioutil" + "os" + "path/filepath" "testing" "time" + "gopkg.in/yaml.v2" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,6 +20,44 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" ) +func TestLoadConfig(t *testing.T) { + contents := map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"127.0.0.1:9200"}, + "username": "elastic", + "password": "changeme", + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "logfile", + "streams": []interface{}{ + map[string]interface{}{ + "paths": []interface{}{"/var/log/${host.name}"}, + }, + }, + }, + }, + } + + tmp, err := ioutil.TempDir("", "config") + require.NoError(t, err) + defer os.RemoveAll(tmp) + + cfgPath := filepath.Join(tmp, "config.yml") + dumpToYAML(t, cfgPath, contents) + + cfg, err := LoadConfigFromFile(cfgPath) + require.NoError(t, err) + + cfgData, err := cfg.ToMapStr() + require.NoError(t, err) + + assert.Equal(t, contents, cfgData) +} + func TestConfig(t *testing.T) { testMgmtMode(t) testLocalConfig(t) @@ -74,3 +117,9 @@ func mustWithConfigMode(standalone bool) *config.Config { }, ) } + +func dumpToYAML(t *testing.T, out string, in interface{}) { + b, err := yaml.Marshal(in) + require.NoError(t, err) + ioutil.WriteFile(out, b, 0600) +} diff --git a/x-pack/elastic-agent/pkg/agent/application/emitter.go b/x-pack/elastic-agent/pkg/agent/application/emitter.go index 249acdd213f..52391b5eff5 100644 --- a/x-pack/elastic-agent/pkg/agent/application/emitter.go +++ b/x-pack/elastic-agent/pkg/agent/application/emitter.go @@ -5,11 +5,15 @@ package application import ( + "context" + "fmt" "strings" + "sync" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) @@ -30,54 +34,163 @@ type programsDispatcher interface { Dispatch(id string, grpProg map[routingKey][]program.Program) error } -func emitter(log *logger.Logger, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) emitterFunc { - return func(c *config.Config) error { - if err := InjectAgentConfig(c); err != nil { - return err +type emitterController struct { + logger *logger.Logger + controller composable.Controller + router programsDispatcher + modifiers *configModifiers + reloadables []reloadable + + // state + lock sync.RWMutex + config *config.Config + ast *transpiler.AST + vars []transpiler.Vars +} + +func (e *emitterController) Update(c *config.Config) error { + if err := InjectAgentConfig(c); err != nil { + return err + } + + // perform and verify ast translation + m, err := c.ToMapStr() + if err != nil { + return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig) + } + rawAst, err := transpiler.NewAST(m) + if err != nil { + return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig) + } + for _, filter := range e.modifiers.Filters { + if err := filter(e.logger, rawAst); err != nil { + return errors.New(err, "failed to filter configuration", errors.TypeConfig) } + } - log.Debug("Transforming configuration into a tree") - m, err := c.ToMapStr() + // sanitary check that nothing in the config is wrong when it comes to variable syntax + ast := rawAst.Clone() + inputs, ok := transpiler.Lookup(ast, "inputs") + if ok { + renderedInputs, err := renderInputs(inputs, []transpiler.Vars{ + { + Mapping: map[string]interface{}{}, + }, + }) if err != nil { - return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig) + return err } - - ast, err := transpiler.NewAST(m) + err = transpiler.Insert(ast, renderedInputs, "inputs") if err != nil { - return errors.New(err, "could not create the AST from the configuration", errors.TypeConfig) + return err } + } + + programsToRun, err := program.Programs(ast) + if err != nil { + return err + } - for _, filter := range modifiers.Filters { - if err := filter(log, ast); err != nil { - return errors.New(err, "failed to filter configuration", errors.TypeConfig) + for _, decorator := range e.modifiers.Decorators { + for outputType, ptr := range programsToRun { + programsToRun[outputType], err = decorator(outputType, ast, ptr) + if err != nil { + return err } } + } - log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", ")) - log.Debug("Converting single configuration into specific programs configuration") + e.lock.Lock() + e.config = c + e.ast = rawAst + e.lock.Unlock() + + return e.update() +} - programsToRun, err := program.Programs(ast) +func (e *emitterController) Set(vars []transpiler.Vars) { + e.lock.Lock() + ast := e.ast + e.vars = vars + e.lock.Unlock() + + if ast != nil { + err := e.update() if err != nil { - return err + e.logger.Errorf("Failed to render configuration with latest context from composable controller: %s", err) } + } +} - for _, decorator := range modifiers.Decorators { - for outputType, ptr := range programsToRun { - programsToRun[outputType], err = decorator(outputType, ast, ptr) - if err != nil { - return err - } - } +func (e *emitterController) update() error { + e.lock.RLock() + cfg := e.config + rawAst := e.ast + varsArray := e.vars + e.lock.RUnlock() + + ast := rawAst.Clone() + inputs, ok := transpiler.Lookup(ast, "inputs") + if ok { + renderedInputs, err := renderInputs(inputs, varsArray) + if err != nil { + return err } + err = transpiler.Insert(ast, renderedInputs, "inputs") + if err != nil { + return err + } + } + + e.logger.Debug("Converting single configuration into specific programs configuration") - for _, r := range reloadables { - if err := r.Reload(c); err != nil { + programsToRun, err := program.Programs(ast) + if err != nil { + return err + } + + for _, decorator := range e.modifiers.Decorators { + for outputType, ptr := range programsToRun { + programsToRun[outputType], err = decorator(outputType, ast, ptr) + if err != nil { return err } } + } + + for _, r := range e.reloadables { + if err := r.Reload(cfg); err != nil { + return err + } + } + + return e.router.Dispatch(ast.HashStr(), programsToRun) +} + +func emitter(ctx context.Context, log *logger.Logger, controller composable.Controller, router programsDispatcher, modifiers *configModifiers, reloadables ...reloadable) (emitterFunc, error) { + log.Debugf("Supported programs: %s", strings.Join(program.KnownProgramNames(), ", ")) - return router.Dispatch(ast.HashStr(), programsToRun) + ctrl := &emitterController{ + logger: log, + controller: controller, + router: router, + modifiers: modifiers, + reloadables: reloadables, + vars: []transpiler.Vars{ + { + Mapping: map[string]interface{}{}, + }, + }, } + err := controller.Run(ctx, func(vars []transpiler.Vars) { + ctrl.Set(vars) + }) + if err != nil { + return nil, errors.New(err, "failed to start composable controller") + } + return func(c *config.Config) error { + return ctrl.Update(c) + }, nil } func readfiles(files []string, emitter emitterFunc) error { @@ -88,3 +201,80 @@ func readfiles(files []string, emitter emitterFunc) error { return emitter(c) } + +func renderInputs(inputs transpiler.Node, varsArray []transpiler.Vars) (transpiler.Node, error) { + l, ok := inputs.Value().(*transpiler.List) + if !ok { + return nil, fmt.Errorf("inputs must be an array") + } + nodes := []transpiler.Node{} + nodesMap := map[string]*transpiler.Dict{} + for _, vars := range varsArray { + for _, node := range l.Value().([]transpiler.Node) { + dict, ok := node.Clone().(*transpiler.Dict) + if !ok { + continue + } + n, err := dict.Apply(vars) + if err == transpiler.ErrNoMatch { + // has a variable that didn't exist, so we ignore it + continue + } + if err != nil { + // another error that needs to be reported + return nil, err + } + dict = n.(*transpiler.Dict) + dict = promoteProcessors(dict) + hash := string(dict.Hash()) + _, exists := nodesMap[hash] + if !exists { + nodesMap[hash] = dict + nodes = append(nodes, dict) + } + } + } + return transpiler.NewList(nodes), nil +} + +func promoteProcessors(dict *transpiler.Dict) *transpiler.Dict { + p := dict.Processors() + if p == nil { + return dict + } + current, ok := dict.Find("processors") + currentList, isList := current.Value().(*transpiler.List) + if !isList { + return dict + } + ast, _ := transpiler.NewAST(map[string]interface{}{ + "processors": p, + }) + procs, _ := transpiler.Lookup(ast, "processors") + nodes := nodesFromList(procs.Value().(*transpiler.List)) + if ok { + nodes = append(nodes, nodesFromList(currentList)...) + } + dictNodes := dict.Value().([]transpiler.Node) + set := false + for i, node := range dictNodes { + switch n := node.(type) { + case *transpiler.Key: + if n.Name() == "processors" { + dictNodes[i] = transpiler.NewKey("processors", transpiler.NewList(nodes)) + set = true + } + } + if set { + break + } + } + if !set { + dictNodes = append(dictNodes, transpiler.NewKey("processors", transpiler.NewList(nodes))) + } + return transpiler.NewDict(dictNodes) +} + +func nodesFromList(list *transpiler.List) []transpiler.Node { + return list.Value().([]transpiler.Node) +} diff --git a/x-pack/elastic-agent/pkg/agent/application/emitter_test.go b/x-pack/elastic-agent/pkg/agent/application/emitter_test.go new file mode 100644 index 00000000000..0c5ba837328 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/application/emitter_test.go @@ -0,0 +1,537 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package application + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" +) + +func TestRenderInputs(t *testing.T) { + testcases := map[string]struct { + input transpiler.Node + expected transpiler.Node + varsArray []transpiler.Vars + err bool + }{ + "inputs not list": { + input: transpiler.NewKey("inputs", transpiler.NewStrVal("not list")), + err: true, + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{}, + }, + }, + }, + "bad variable error": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.name|'missing ending quote}")), + }), + })), + err: true, + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, + }, + }, + }, + "basic single var": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.name}")), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value1")), + }), + }), + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, + }, + }, + }, + "duplicate result is removed": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.name}")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.diff}")), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value1")), + }), + }), + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + "diff": "value1", + }, + }, + }, + }, + }, + "missing var removes input": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.name}")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.missing|var1.diff}")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.removed}")), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value1")), + }), + }), + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + "diff": "value1", + }, + }, + }, + }, + }, + "duplicate var result but unique input not removed": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.name}")), + transpiler.NewKey("unique", transpiler.NewStrVal("0")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.diff}")), + transpiler.NewKey("unique", transpiler.NewStrVal("1")), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value1")), + transpiler.NewKey("unique", transpiler.NewStrVal("0")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value1")), + transpiler.NewKey("unique", transpiler.NewStrVal("1")), + }), + }), + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + "diff": "value1", + }, + }, + }, + }, + }, + "duplicates across vars array handled": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.name}")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("${var1.diff}")), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value1")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value2")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value3")), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("key", transpiler.NewStrVal("value4")), + }), + }), + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + "diff": "value1", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + "diff": "value2", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + "diff": "value3", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + "diff": "value2", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + "diff": "value4", + }, + }, + }, + }, + }, + "nested in streams": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/${var1.name}.log"), + })), + }), + })), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value1.log"), + })), + }), + })), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value2.log"), + })), + }), + })), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value3.log"), + })), + }), + })), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value4.log"), + })), + }), + })), + }), + }), + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value2", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value2", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value3", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value4", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "missing": "other", + }, + }, + }, + }, + }, + "inputs with processors": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/${var1.name}.log"), + })), + }), + })), + transpiler.NewKey("processors", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("user", transpiler.NewStrVal("user1")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("user")), + })), + }), + })), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value1.log"), + })), + }), + })), + transpiler.NewKey("processors", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("user", transpiler.NewStrVal("user1")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("user")), + })), + }), + })), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value2.log"), + })), + }), + })), + transpiler.NewKey("processors", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("user", transpiler.NewStrVal("user1")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("user")), + })), + }), + })), + }), + }), + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value2", + }, + }, + }, + }, + }, + "vars with processors": { + input: transpiler.NewKey("inputs", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/${var1.name}.log"), + })), + }), + })), + transpiler.NewKey("processors", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("user", transpiler.NewStrVal("user1")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("user")), + })), + }), + })), + }), + })), + expected: transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value1.log"), + })), + }), + })), + transpiler.NewKey("processors", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("custom", transpiler.NewStrVal("value1")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("dynamic")), + })), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("user", transpiler.NewStrVal("user1")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("user")), + })), + }), + })), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("type", transpiler.NewStrVal("logfile")), + transpiler.NewKey("streams", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("paths", transpiler.NewList([]transpiler.Node{ + transpiler.NewStrVal("/var/log/value2.log"), + })), + }), + })), + transpiler.NewKey("processors", transpiler.NewList([]transpiler.Node{ + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("custom", transpiler.NewStrVal("value2")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("dynamic")), + })), + }), + transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("add_fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("fields", transpiler.NewDict([]transpiler.Node{ + transpiler.NewKey("user", transpiler.NewStrVal("user1")), + })), + transpiler.NewKey("to", transpiler.NewStrVal("user")), + })), + }), + })), + }), + }), + varsArray: []transpiler.Vars{ + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value1", + }, + }, + ProcessorsKey: "var1", + Processors: []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "custom": "value1", + }, + "to": "dynamic", + }, + }, + }, + }, + { + Mapping: map[string]interface{}{ + "var1": map[string]interface{}{ + "name": "value2", + }, + }, + ProcessorsKey: "var1", + Processors: []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "custom": "value2", + }, + "to": "dynamic", + }, + }, + }, + }, + }, + }, + } + + for name, test := range testcases { + t.Run(name, func(t *testing.T) { + v, err := renderInputs(test.input, test.varsArray) + if test.err { + require.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.expected.String(), v.String()) + } + }) + } +} diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go index 996811260ef..34fd5716980 100644 --- a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go @@ -9,7 +9,6 @@ import ( "fmt" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" ) @@ -26,7 +25,7 @@ func (h *handlerConfigChange) Handle(ctx context.Context, a action, acker fleetA return fmt.Errorf("invalid type, expected ActionConfigChange and received %T", a) } - c, err := config.NewConfigFrom(action.Config) + c, err := LoadConfig(action.Config) if err != nil { return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) } diff --git a/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go b/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go index abf682a3b2a..2c53fc62bf2 100644 --- a/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go @@ -61,7 +61,7 @@ func (c *InspectConfigCmd) inspectConfig() error { } func loadConfig(configPath string) (*config.Config, error) { - rawConfig, err := config.LoadYAML(configPath) + rawConfig, err := LoadConfigFromFile(configPath) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go b/x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go index 65206a69d77..ca9983cca22 100644 --- a/x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/application/inspect_output_cmd.go @@ -5,12 +5,16 @@ package application import ( + "context" "fmt" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/noop" @@ -173,8 +177,17 @@ func printOutputFromMap(log *logger.Logger, output, programName string, cfg map[ func getProgramsFromConfig(log *logger.Logger, cfg *config.Config) (map[string][]program.Program, error) { monitor := noop.NewMonitor() router := &inmemRouter{} - emit := emitter( + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + composableCtrl, err := composable.New(cfg) + if err != nil { + return nil, err + } + composableWaiter := newWaitForCompose(composableCtrl) + emit, err := emitter( + ctx, log, + composableWaiter, router, &configModifiers{ Decorators: []decoratorFunc{injectMonitoring}, @@ -182,10 +195,14 @@ func getProgramsFromConfig(log *logger.Logger, cfg *config.Config) (map[string][ }, monitor, ) + if err != nil { + return nil, err + } if err := emit(cfg); err != nil { return nil, err } + composableWaiter.Wait() return router.programs, nil } @@ -201,3 +218,27 @@ func (r *inmemRouter) Dispatch(id string, grpProg map[routingKey][]program.Progr func newErrorLogger() (*logger.Logger, error) { return logger.NewWithLogpLevel("", logp.ErrorLevel) } + +type waitForCompose struct { + controller composable.Controller + done chan bool +} + +func newWaitForCompose(wrapped composable.Controller) *waitForCompose { + return &waitForCompose{ + controller: wrapped, + done: make(chan bool), + } +} + +func (w *waitForCompose) Run(ctx context.Context, cb composable.VarsCallback) error { + err := w.controller.Run(ctx, func(vars []transpiler.Vars) { + cb(vars) + w.done <- true + }) + return err +} + +func (w *waitForCompose) Wait() { + <-w.done +} diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index df89b97bb96..4b0753af9a8 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring" @@ -102,9 +103,16 @@ func newLocal( } localApplication.router = router + composableCtrl, err := composable.New(rawConfig) + if err != nil { + return nil, errors.New(err, "failed to initialize composable controller") + } + discover := discoverer(pathConfigFile, cfg.Settings.Path) - emit := emitter( + emit, err := emitter( + localApplication.bgContext, log, + composableCtrl, router, &configModifiers{ Decorators: []decoratorFunc{injectMonitoring}, @@ -112,6 +120,9 @@ func newLocal( }, monitor, ) + if err != nil { + return nil, err + } var cfgSource source if !cfg.Settings.Reload.Enabled { diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index ea2815913fc..8b9320b53c0 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -17,6 +17,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring" @@ -151,8 +152,15 @@ func newManaged( } managedApplication.router = router - emit := emitter( + composableCtrl, err := composable.New(rawConfig) + if err != nil { + return nil, errors.New(err, "failed to initialize composable controller") + } + + emit, err := emitter( + managedApplication.bgContext, log, + composableCtrl, router, &configModifiers{ Decorators: []decoratorFunc{injectMonitoring}, @@ -160,6 +168,9 @@ func newManaged( }, monitor, ) + if err != nil { + return nil, err + } acker, err := newActionAcker(log, agentInfo, client) if err != nil { return nil, err diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go index c14e65c29e7..efc8608c233 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go @@ -10,9 +10,11 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/filters" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" ) @@ -26,12 +28,17 @@ func TestManagedModeRouting(t *testing.T) { return m, nil } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + log, _ := logger.New("") router, _ := newRouter(log, streamFn) - emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}) + composableCtrl, _ := composable.New(nil) + emit, err := emitter(ctx, log, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}) + require.NoError(t, err) - actionDispatcher, err := newActionDispatcher(context.Background(), log, &handlerDefault{log: log}) - assert.NoError(t, err) + actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log}) + require.NoError(t, err) actionDispatcher.MustRegister( &fleetapi.ActionConfigChange{}, @@ -42,10 +49,10 @@ func TestManagedModeRouting(t *testing.T) { ) actions, err := testActions() - assert.NoError(t, err) + require.NoError(t, err) err = actionDispatcher.Dispatch(newNoopAcker(), actions...) - assert.NoError(t, err) + require.NoError(t, err) // has 1 config request for fb, mb and monitoring? assert.Equal(t, 1, len(streams)) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go index b34e0236782..6749b57b250 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go @@ -22,7 +22,6 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/warn" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" ) @@ -56,7 +55,7 @@ func newEnrollCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStr func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args []string) error { warn.PrintNotGA(streams.Out) pathConfigFile := flags.Config() - rawConfig, err := config.LoadYAML(pathConfigFile) + rawConfig, err := application.LoadConfigFromFile(pathConfigFile) if err != nil { return errors.New(err, fmt.Sprintf("could not read configuration file %s", pathConfigFile), diff --git a/x-pack/elastic-agent/pkg/agent/cmd/include.go b/x-pack/elastic-agent/pkg/agent/cmd/include.go new file mode 100644 index 00000000000..a28d47490d5 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/cmd/include.go @@ -0,0 +1,15 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + // include the composable providers + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/agent" + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env" + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host" + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local" + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/localdynamic" + _ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/path" +) diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index 74cf010cc66..f86b7e909a7 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -22,7 +22,6 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/server" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) @@ -41,7 +40,7 @@ func newRunCommandWithArgs(flags *globalFlags, _ []string, streams *cli.IOStream func run(flags *globalFlags, streams *cli.IOStreams) error { pathConfigFile := flags.Config() - rawConfig, err := config.LoadYAML(pathConfigFile) + rawConfig, err := application.LoadConfigFromFile(pathConfigFile) if err != nil { return errors.New(err, fmt.Sprintf("could not read configuration file %s", pathConfigFile), diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/ast.go b/x-pack/elastic-agent/pkg/agent/transpiler/ast.go index 498c32b15d7..14ebba556eb 100644 --- a/x-pack/elastic-agent/pkg/agent/transpiler/ast.go +++ b/x-pack/elastic-agent/pkg/agent/transpiler/ast.go @@ -14,6 +14,8 @@ import ( "sort" "strconv" "strings" + + "github.com/elastic/go-ucfg" ) const selectorSep = "." @@ -28,6 +30,9 @@ var ( falseVal = []byte{0} ) +// Processors represent an attached list of processors. +type Processors []map[string]interface{} + // Node represents a node in the configuration Tree a Node can point to one or multiples children // nodes. type Node interface { @@ -44,6 +49,12 @@ type Node interface { // Hash compute a sha256 hash of the current node and recursively call any children. Hash() []byte + + // Apply apply the current vars, returning the new value for the node. + Apply(Vars) (Node, error) + + // Processors returns any attached processors, because of variable substitution. + Processors() Processors } // AST represents a raw configuration which is purely data, only primitives are currently supported, @@ -61,12 +72,18 @@ func (a *AST) String() string { // Dict represents a dictionary in the Tree, where each key is a entry into an array. The Dict will // keep the ordering. type Dict struct { - value []Node + value []Node + processors []map[string]interface{} } // NewDict creates a new dict with provided nodes. func NewDict(nodes []Node) *Dict { - return &Dict{nodes} + return NewDictWithProcessors(nodes, nil) +} + +// NewDictWithProcessors creates a new dict with provided nodes and attached processors. +func NewDictWithProcessors(nodes []Node, processors Processors) *Dict { + return &Dict{nodes, processors} } // Find takes a string which is a key and try to find the elements in the associated K/V. @@ -115,6 +132,32 @@ func (d *Dict) Hash() []byte { return h.Sum(nil) } +// Apply applies the vars to all the nodes in the dictionary. +func (d *Dict) Apply(vars Vars) (Node, error) { + nodes := make([]Node, len(d.value)) + for i, v := range d.value { + n, err := v.Apply(vars) + if err != nil { + return nil, err + } + nodes[i] = n + } + return &Dict{nodes, nil}, nil +} + +// Processors returns any attached processors, because of variable substitution. +func (d *Dict) Processors() Processors { + if d.processors != nil { + return d.processors + } + for _, v := range d.value { + if p := v.Processors(); p != nil { + return p + } + } + return nil +} + // sort sorts the keys in the dictionary func (d *Dict) sort() { sort.Slice(d.value, func(i, j int) bool { @@ -157,6 +200,11 @@ func (k *Key) Find(key string) (Node, bool) { } } +// Name returns the name for the key. +func (k *Key) Name() string { + return k.name +} + // Value returns the raw value. func (k *Key) Value() interface{} { return k.value @@ -181,26 +229,52 @@ func (k *Key) Hash() []byte { return h.Sum(nil) } +// Apply applies the vars to the value. +func (k *Key) Apply(vars Vars) (Node, error) { + if k.value == nil { + return k, nil + } + v, err := k.value.Apply(vars) + if err != nil { + return nil, err + } + return &Key{k.name, v}, nil +} + +// Processors returns any attached processors, because of variable substitution. +func (k *Key) Processors() Processors { + if k.value != nil { + return k.value.Processors() + } + return nil +} + // List represents a slice in our Tree. type List struct { - value []Node + value []Node + processors Processors } // NewList creates a new list with provided nodes. func NewList(nodes []Node) *List { - return &List{nodes} + return NewListWithProcessors(nodes, nil) +} + +// NewListWithProcessors creates a new list with provided nodes with processors attached. +func NewListWithProcessors(nodes []Node, processors Processors) *List { + return &List{nodes, processors} } func (l *List) String() string { var sb strings.Builder + sb.WriteString("[") for i := 0; i < len(l.value); i++ { - sb.WriteString("[") sb.WriteString(l.value[i].String()) - sb.WriteString("]") if i < len(l.value)-1 { sb.WriteString(",") } } + sb.WriteString("]") return sb.String() } @@ -244,14 +318,46 @@ func (l *List) Clone() Node { return &List{value: nodes} } +// Apply applies the vars to all nodes in the list. +func (l *List) Apply(vars Vars) (Node, error) { + nodes := make([]Node, len(l.value)) + for i, v := range l.value { + n, err := v.Apply(vars) + if err != nil { + return nil, err + } + nodes[i] = n + } + return NewList(nodes), nil +} + +// Processors returns any attached processors, because of variable substitution. +func (l *List) Processors() Processors { + if l.processors != nil { + return l.processors + } + for _, v := range l.value { + if p := v.Processors(); p != nil { + return p + } + } + return nil +} + // StrVal represents a string. type StrVal struct { - value string + value string + processors Processors } // NewStrVal creates a new string value node with provided value. func NewStrVal(val string) *StrVal { - return &StrVal{val} + return NewStrValWithProcessors(val, nil) +} + +// NewStrValWithProcessors creates a new string value node with provided value and processors. +func NewStrValWithProcessors(val string, processors Processors) *StrVal { + return &StrVal{val, processors} } // Find receive a key and return false since the node is not a List or Dict. @@ -279,14 +385,30 @@ func (s *StrVal) Hash() []byte { return []byte(s.value) } +// Apply applies the vars to the string value. +func (s *StrVal) Apply(vars Vars) (Node, error) { + return vars.Replace(s.value) +} + +// Processors returns any linked processors that are now connected because of Apply. +func (s *StrVal) Processors() Processors { + return s.processors +} + // IntVal represents an int. type IntVal struct { - value int + value int + processors Processors } // NewIntVal creates a new int value node with provided value. func NewIntVal(val int) *IntVal { - return &IntVal{val} + return NewIntValWithProcessors(val, nil) +} + +// NewIntValWithProcessors creates a new int value node with provided value and attached processors. +func NewIntValWithProcessors(val int, processors Processors) *IntVal { + return &IntVal{val, processors} } // Find receive a key and return false since the node is not a List or Dict. @@ -309,19 +431,35 @@ func (s *IntVal) Clone() Node { return &k } +// Apply does nothing. +func (s *IntVal) Apply(_ Vars) (Node, error) { + return s, nil +} + // Hash we convert the value into a string and return the byte slice. func (s *IntVal) Hash() []byte { return []byte(s.String()) } +// Processors returns any linked processors that are now connected because of Apply. +func (s *IntVal) Processors() Processors { + return s.processors +} + // UIntVal represents an int. type UIntVal struct { - value uint64 + value uint64 + processors Processors } // NewUIntVal creates a new uint value node with provided value. func NewUIntVal(val uint64) *UIntVal { - return &UIntVal{val} + return NewUIntValWithProcessors(val, nil) +} + +// NewUIntValWithProcessors creates a new uint value node with provided value with processors attached. +func NewUIntValWithProcessors(val uint64, processors Processors) *UIntVal { + return &UIntVal{val, processors} } // Find receive a key and return false since the node is not a List or Dict. @@ -349,15 +487,31 @@ func (s *UIntVal) Hash() []byte { return []byte(s.String()) } +// Apply does nothing. +func (s *UIntVal) Apply(_ Vars) (Node, error) { + return s, nil +} + +// Processors returns any linked processors that are now connected because of Apply. +func (s *UIntVal) Processors() Processors { + return s.processors +} + // FloatVal represents a float. // NOTE: We will convert float32 to a float64. type FloatVal struct { - value float64 + value float64 + processors Processors } // NewFloatVal creates a new float value node with provided value. func NewFloatVal(val float64) *FloatVal { - return &FloatVal{val} + return NewFloatValWithProcessors(val, nil) +} + +// NewFloatValWithProcessors creates a new float value node with provided value with processors attached. +func NewFloatValWithProcessors(val float64, processors Processors) *FloatVal { + return &FloatVal{val, processors} } // Find receive a key and return false since the node is not a List or Dict. @@ -385,14 +539,30 @@ func (s *FloatVal) Hash() []byte { return []byte(strconv.FormatFloat(s.value, 'f', -1, 64)) } +// Apply does nothing. +func (s *FloatVal) Apply(_ Vars) (Node, error) { + return s, nil +} + +// Processors returns any linked processors that are now connected because of Apply. +func (s *FloatVal) Processors() Processors { + return s.processors +} + // BoolVal represents a boolean in our Tree. type BoolVal struct { - value bool + value bool + processors Processors } // NewBoolVal creates a new bool value node with provided value. func NewBoolVal(val bool) *BoolVal { - return &BoolVal{val} + return NewBoolValWithProcessors(val, nil) +} + +// NewBoolValWithProcessors creates a new bool value node with provided value with processors attached. +func NewBoolValWithProcessors(val bool, processors Processors) *BoolVal { + return &BoolVal{val, processors} } // Find receive a key and return false since the node is not a List or Dict. @@ -426,13 +596,22 @@ func (s *BoolVal) Hash() []byte { return falseVal } +// Apply does nothing. +func (s *BoolVal) Apply(_ Vars) (Node, error) { + return s, nil +} + +// Processors returns any linked processors that are now connected because of Apply. +func (s *BoolVal) Processors() Processors { + return s.processors +} + // NewAST takes a map and convert it to an internal Tree, allowing us to executes rules on the // data to shape it in a different way or to filter some of the information. func NewAST(m map[string]interface{}) (*AST, error) { - val := reflect.ValueOf(m) - root, err := load(val) + root, err := loadForNew(m) if err != nil { - return nil, fmt.Errorf("could not parse configuration into a tree, error: %+v", err) + return nil, err } return &AST{root: root}, nil } @@ -446,6 +625,40 @@ func MustNewAST(m map[string]interface{}) *AST { return v } +// NewASTFromConfig takes a config and converts it to an internal Tree, allowing us to executes rules on the +// data to shape it in a different way or to filter some of the information. +func NewASTFromConfig(cfg *ucfg.Config) (*AST, error) { + var v interface{} + if cfg.IsDict() { + var m map[string]interface{} + if err := cfg.Unpack(&m); err != nil { + return nil, err + } + v = m + } else if cfg.IsArray() { + var l []string + if err := cfg.Unpack(&l); err != nil { + return nil, err + } + v = l + } else { + return nil, fmt.Errorf("cannot create AST from none dict or array type") + } + root, err := loadForNew(v) + if err != nil { + return nil, err + } + return &AST{root: root}, nil +} + +func loadForNew(val interface{}) (Node, error) { + root, err := load(reflect.ValueOf(val)) + if err != nil { + return nil, fmt.Errorf("could not parse configuration into a tree, error: %+v", err) + } + return root, nil +} + func load(val reflect.Value) (Node, error) { val = lookupVal(val) @@ -557,6 +770,16 @@ func (a *AST) MarshalJSON() ([]byte, error) { return b, nil } +// Apply applies the variables to the replacement in the AST. +func (a *AST) Apply(vars Vars) error { + n, err := a.root.Apply(vars) + if err != nil { + return err + } + a.root = n + return nil +} + func splitPath(s Selector) []string { if s == "" { return nil @@ -666,6 +889,26 @@ func lookupVal(val reflect.Value) reflect.Value { return val } +func attachProcessors(node Node, processors Processors) Node { + switch n := node.(type) { + case *Dict: + n.processors = processors + case *List: + n.processors = processors + case *StrVal: + n.processors = processors + case *IntVal: + n.processors = processors + case *UIntVal: + n.processors = processors + case *FloatVal: + n.processors = processors + case *BoolVal: + n.processors = processors + } + return node +} + // Select takes an AST and a selector and will return a sub AST based on the selector path, will // return false if the path could not be found. func Select(a *AST, selector Selector) (*AST, bool) { @@ -763,7 +1006,7 @@ func Insert(a *AST, node Node, to Selector) error { case *List: d.value = node default: - d.value = &Dict{[]Node{node}} + d.value = &Dict{[]Node{node}, nil} } return nil } diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go b/x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go index 80c7a1d7401..742850bdeb4 100644 --- a/x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go +++ b/x-pack/elastic-agent/pkg/agent/transpiler/ast_test.go @@ -112,13 +112,13 @@ func TestAST(t *testing.T) { value: []Node{ &Key{ name: "range", - value: &List{ + value: NewList( []Node{ &IntVal{value: 20}, &IntVal{value: 30}, &IntVal{value: 40}, }, - }, + ), }, &Key{name: "timeout", value: &IntVal{value: 12}}, }, @@ -135,13 +135,13 @@ func TestAST(t *testing.T) { value: []Node{ &Key{ name: "range", - value: &List{ + value: NewList( []Node{ &UIntVal{value: uint64(20)}, &UIntVal{value: uint64(30)}, &UIntVal{value: uint64(40)}, }, - }, + ), }, &Key{name: "timeout", value: &IntVal{value: 12}}, }, @@ -159,23 +159,23 @@ func TestAST(t *testing.T) { value: []Node{ &Key{ name: "range32", - value: &List{ + value: NewList( []Node{ &FloatVal{value: 20.0}, &FloatVal{value: 30.0}, &FloatVal{value: 40.0}, }, - }, + ), }, &Key{ name: "range64", - value: &List{ + value: NewList( []Node{ &FloatVal{value: 20.0}, &FloatVal{value: 30.0}, &FloatVal{value: 40.0}, }, - }, + ), }, &Key{name: "ratio", value: &FloatVal{value: 0.5}}, }, @@ -195,7 +195,7 @@ func TestAST(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ &Key{name: "ignore_older", value: &StrVal{value: "20s"}}, &Key{name: "paths", value: &List{value: []Node{ @@ -203,8 +203,8 @@ func TestAST(t *testing.T) { &StrVal{value: "/var/log/log2"}, }}}, &Key{name: "type", value: &StrVal{value: "log/docker"}}, - }, - }}, + }), + }, }, }, }, @@ -225,11 +225,11 @@ func TestAST(t *testing.T) { }, }, ast: &AST{ - root: &Dict{ - value: []Node{ + root: NewDict( + []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ &Key{name: "ignore_older", value: &StrVal{value: "20s"}}, &Key{name: "paths", value: &List{value: []Node{ @@ -237,42 +237,37 @@ func TestAST(t *testing.T) { &StrVal{value: "/var/log/log2"}, }}}, &Key{name: "type", value: &StrVal{value: "log/docker"}}, - }, - }}, + }), + }, &Key{ name: "outputs", - value: &Dict{ + value: NewDict( []Node{ &Key{ name: "elasticsearch", - value: &Dict{ + value: NewDict( []Node{ &Key{ name: "ssl", - value: &Dict{ + value: NewDict( []Node{ &Key{name: "certificates_authorities", - value: &List{ + value: NewList( []Node{ &StrVal{value: "abc1"}, &StrVal{value: "abc2"}, }, - }, + ), }, - }, - }, + }), }, - }, - }, + }), }, - }, - }, + }), }, - }, - }, + }), }, }, - "Keys with multiple levels of deeps with compact keys": { hashmap: map[string]interface{}{ "inputs": map[string]interface{}{ @@ -305,7 +300,7 @@ func TestAST(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ &Key{name: "ignore_older", value: &StrVal{value: "20s"}}, &Key{name: "paths", value: &List{value: []Node{ @@ -313,36 +308,33 @@ func TestAST(t *testing.T) { &StrVal{value: "/var/log/log2"}, }}}, &Key{name: "type", value: &StrVal{value: "log/docker"}}, - }, - }}, + }), + }, &Key{ name: "outputs", - value: &Dict{ + value: NewDict( []Node{ &Key{ name: "elasticsearch", - value: &Dict{ + value: NewDict( []Node{ &Key{ name: "ssl", - value: &Dict{ + value: NewDict( []Node{ &Key{name: "certificates_authorities", - value: &List{ + value: NewList( []Node{ &StrVal{value: "abc1"}, &StrVal{value: "abc2"}, }, - }, + ), }, - }, - }, + }), }, - }, - }, + }), }, - }, - }, + }), }, }, }, @@ -404,11 +396,11 @@ func TestSelector(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ &Key{name: "type", value: &StrVal{value: "log/docker"}}, - }, - }}, + }), + }, }, }, }, @@ -431,17 +423,18 @@ func TestSelector(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ - &Key{name: "ssl", value: &Dict{ + &Key{name: "ssl", value: NewDict( []Node{ &Key{name: "ca", value: &List{ value: []Node{&StrVal{value: "ca1"}, &StrVal{value: "ca2"}}, }}, &Key{name: "certificate", value: &StrVal{value: "/etc/ssl/my.crt"}}, - }}}, - }, - }}, + }), + }, + }), + }, }, }, }, @@ -471,11 +464,11 @@ func TestSelector(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ &Key{name: "1", value: &StrVal{value: "log/docker"}}, - }, - }}, + }), + }, }, }, }, @@ -495,21 +488,20 @@ func TestSelector(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ - &Key{name: "x", value: &Dict{ + &Key{name: "x", value: NewDict( []Node{ - &Key{name: "ssl", value: &Dict{ + &Key{name: "ssl", value: NewDict( []Node{ &Key{name: "ca", value: &List{ value: []Node{&StrVal{value: "ca1"}, &StrVal{value: "ca2"}}, }}, &Key{name: "certificate", value: &StrVal{value: "/etc/ssl/my.crt"}}, - }}}, - }, - }}, - }, - }, + })}, + }), + }, + }), }, }, }, @@ -536,32 +528,31 @@ func TestSelector(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ - &Key{name: "x", value: &Dict{ + &Key{name: "x", value: NewDict( []Node{ - &Key{name: "ssl", value: &Dict{ + &Key{name: "ssl", value: NewDict( []Node{ &Key{name: "ca", value: &List{ value: []Node{&StrVal{value: "ca1"}, &StrVal{value: "ca2"}}, }}, &Key{name: "certificate", value: &StrVal{value: "/etc/ssl/my.crt"}}, - }}}, - }, - }}, - &Key{name: "y", value: &Dict{ + })}, + }), + }, + &Key{name: "y", value: NewDict( []Node{ - &Key{name: "ssl", value: &Dict{ + &Key{name: "ssl", value: NewDict( []Node{ &Key{name: "ca", value: &List{ value: []Node{&StrVal{value: "ca1"}, &StrVal{value: "ca2"}}, }}, &Key{name: "certificate", value: &StrVal{value: "/etc/ssl/my.crt"}}, - }}}, - }, - }}, - }, - }, + })}, + }), + }, + }), }, }, }, @@ -588,21 +579,20 @@ func TestSelector(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ - &Key{name: "x", value: &Dict{ + &Key{name: "x", value: NewDict( []Node{ - &Key{name: "ssl", value: &Dict{ + &Key{name: "ssl", value: NewDict( []Node{ &Key{name: "ca", value: &List{ value: []Node{&StrVal{value: "ca1"}, &StrVal{value: "ca2"}}, }}, &Key{name: "certificate", value: &StrVal{value: "/etc/ssl/my.crt"}}, - }}}, - }, - }}, - }, - }, + })}, + }), + }, + }), }, }, }, @@ -623,21 +613,20 @@ func TestSelector(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ - &Key{name: "x", value: &Dict{ + &Key{name: "x", value: NewDict( []Node{ - &Key{name: "ssl", value: &Dict{ + &Key{name: "ssl", value: NewDict( []Node{ &Key{name: "ca", value: &List{ value: []Node{&StrVal{value: "ca1"}, &StrVal{value: "ca2"}}, }}, &Key{name: "certificate", value: &StrVal{value: "/etc/ssl/my.crt"}}, - }}}, - }, - }}, - }, - }, + })}, + }), + }, + }), }, }, }, @@ -662,21 +651,20 @@ func TestSelector(t *testing.T) { value: []Node{ &Key{ name: "inputs", - value: &Dict{ + value: NewDict( []Node{ - &Key{name: "x", value: &Dict{ + &Key{name: "x", value: NewDict( []Node{ - &Key{name: "ssl", value: &Dict{ + &Key{name: "ssl", value: NewDict( []Node{ &Key{name: "ca", value: &List{ value: []Node{&StrVal{value: "ca1"}, &StrVal{value: "ca2"}}, }}, &Key{name: "certificate", value: &StrVal{value: "/etc/ssl/my.crt"}}, - }}}, - }, - }}, - }, - }, + })}, + }), + }, + }), }, }, }, @@ -704,6 +692,196 @@ func TestSelector(t *testing.T) { } } +func TestAST_Apply(t *testing.T) { + testcases := map[string]struct { + input map[string]interface{} + expected *AST + vars Vars + matchErr bool + }{ + //"2 vars missing with default": { + // input: map[string]interface{}{ + // "inputs": map[string]interface{}{ + // "type": "log/docker", + // "paths": []string{"/var/log/${var1.key1}", "/var/log/${var1.missing|'other'}"}, + // }, + // }, + // expected: &AST{ + // root: &Dict{ + // value: []Node{ + // &Key{ + // name: "inputs", + // value: NewDict( + // []Node{ + // &Key{ + // name: "paths", + // value: &List{ + // value: []Node{ + // &StrVal{value: "/var/log/value1"}, + // &StrVal{value: "/var/log/other"}, + // }, + // }, + // }, + // &Key{name: "type", value: &StrVal{value: "log/docker"}}, + // }), + // }, + // }, + // }, + // }, + // vars: Vars{ + // Mapping: map[string]interface{}{ + // "var1": map[string]interface{}{ + // "key1": "value1", + // }, + // }, + // }, + //}, + //"2 vars missing no default": { + // input: map[string]interface{}{ + // "inputs": map[string]interface{}{ + // "type": "log/docker", + // "paths": []string{"/var/log/${var1.key1}", "/var/log/${var1.missing}"}, + // }, + // }, + // vars: Vars{ + // Mapping: map[string]interface{}{ + // "var1": map[string]interface{}{ + // "key1": "value1", + // }, + // }, + // }, + // matchErr: true, + //}, + //"vars not string": { + // input: map[string]interface{}{ + // "inputs": map[string]interface{}{ + // "type": "log/docker", + // "paths": []string{"/var/log/${var1.key1}"}, + // }, + // }, + // expected: &AST{ + // root: &Dict{ + // value: []Node{ + // &Key{ + // name: "inputs", + // value: NewDict( + // []Node{ + // &Key{ + // name: "paths", + // value: &List{ + // value: []Node{ + // &StrVal{value: "/var/log/1"}, + // }, + // }, + // }, + // &Key{name: "type", value: &StrVal{value: "log/docker"}}, + // }), + // }, + // }, + // }, + // }, + // vars: Vars{ + // Mapping: map[string]interface{}{ + // "var1": map[string]interface{}{ + // "key1": 1, + // }, + // }, + // }, + //}, + "vars replace with object": { + input: map[string]interface{}{ + "inputs": map[string]interface{}{ + "type": "logfile", + "paths": []string{"/var/log/syslog"}, + "processors": []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "labels": "${host.labels}", + }, + }, + }, + }, + }, + expected: &AST{ + root: &Dict{ + value: []Node{ + &Key{ + name: "inputs", + value: NewDict( + []Node{ + &Key{ + name: "paths", + value: &List{ + value: []Node{ + &StrVal{value: "/var/log/syslog"}, + }, + }, + }, + &Key{ + name: "processors", + value: &List{ + value: []Node{ + NewDict( + []Node{ + &Key{ + name: "add_fields", + value: NewDict( + []Node{ + &Key{ + name: "labels", + value: &List{ + value: []Node{ + &StrVal{value: "label1"}, + &StrVal{value: "label2"}, + }, + }, + }, + }, + ), + }, + }, + ), + }, + }, + }, + &Key{name: "type", value: &StrVal{value: "logfile"}}, + }), + }, + }, + }, + }, + vars: Vars{ + Mapping: map[string]interface{}{ + "host": map[string]interface{}{ + "labels": []string{ + "label1", + "label2", + }, + }, + }, + }, + }, + } + + for name, test := range testcases { + t.Run(name, func(t *testing.T) { + v, err := NewAST(test.input) + require.NoError(t, err) + err = v.Apply(test.vars) + if test.matchErr { + require.Equal(t, ErrNoMatch, err) + } else { + require.NoError(t, err) + if !assert.True(t, reflect.DeepEqual(test.expected, v)) { + t.Logf( + `received: %+v + expected: %+v`, v, test.expected) + } + } + }) + } +} + func TestCount(t *testing.T) { ast := &AST{ root: &Dict{ diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/vars.go b/x-pack/elastic-agent/pkg/agent/transpiler/vars.go new file mode 100644 index 00000000000..f5b7b9922d3 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/transpiler/vars.go @@ -0,0 +1,199 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package transpiler + +import ( + "fmt" + "regexp" + "strings" + "unicode" + + "github.com/elastic/go-ucfg" +) + +var varsRegex = regexp.MustCompile(`\${([\p{L}\d\s\\\-_|.'"]*)}`) + +// ErrNoMatch is return when the replace didn't fail, just that no vars match to perform the replace. +var ErrNoMatch = fmt.Errorf("no matching vars") + +// Vars is a context of variables that also contain a list of processors that go with the mapping. +type Vars struct { + Mapping map[string]interface{} + + ProcessorsKey string + Processors Processors +} + +// Replace returns a new value based on variable replacement. +func (v *Vars) Replace(value string) (Node, error) { + var processors []map[string]interface{} + c, err := ucfg.NewFrom(v.Mapping, ucfg.PathSep(".")) + if err != nil { + return nil, err + } + matchIdxs := varsRegex.FindAllSubmatchIndex([]byte(value), -1) + if !validBrackets(value, matchIdxs) { + return nil, fmt.Errorf("starting ${ is missing ending }") + } + + result := "" + lastIndex := 0 + for _, r := range matchIdxs { + for i := 0; i < len(r); i += 4 { + vars, err := extractVars(value[r[i+2]:r[i+3]]) + if err != nil { + return nil, fmt.Errorf(`error parsing variable "%s": %s`, value[r[i]:r[i+1]], err) + } + set := false + for _, val := range vars { + switch val.(type) { + case *constString: + result += value[lastIndex:r[0]] + val.Value() + set = true + case *varString: + if r[i] == 0 && r[i+1] == len(value) { + // possible for complete replacement of object, because the variable + // is not inside of a string + child, err := c.Child(val.Value(), -1, ucfg.PathSep(".")) + if err == nil { + ast, err := NewASTFromConfig(child) + if err == nil { + if v.ProcessorsKey != "" && varPrefixMatched(val.Value(), v.ProcessorsKey) { + processors = v.Processors + } + return attachProcessors(ast.root, processors), nil + } + } + } + replace, err := c.String(val.Value(), -1, ucfg.PathSep(".")) + if err == nil { + result += value[lastIndex:r[0]] + replace + set = true + if v.ProcessorsKey != "" && varPrefixMatched(val.Value(), v.ProcessorsKey) { + processors = v.Processors + } + } + } + if set { + break + } + } + if !set { + return NewStrVal(""), ErrNoMatch + } + lastIndex = r[1] + } + } + return NewStrValWithProcessors(result+value[lastIndex:], processors), nil +} + +// validBrackets returns true when all starting {$ have a matching ending }. +func validBrackets(s string, matchIdxs [][]int) bool { + result := "" + lastIndex := 0 + match := false + for _, r := range matchIdxs { + match = true + for i := 0; i < len(r); i += 4 { + result += s[lastIndex:r[0]] + lastIndex = r[1] + } + } + if !match { + return !strings.Contains(s, "${") + } + return !strings.Contains(result, "${") +} + +type varI interface { + Value() string +} + +type varString struct { + value string +} + +func (v *varString) Value() string { + return v.value +} + +type constString struct { + value string +} + +func (v *constString) Value() string { + return v.value +} + +func extractVars(i string) ([]varI, error) { + const out = rune(0) + + quote := out + constant := false + escape := false + is := make([]rune, 0, len(i)) + res := make([]varI, 0) + for _, r := range i { + if r == '|' { + if escape { + return nil, fmt.Errorf(`variable pipe cannot be escaped; remove \ before |`) + } + if quote == out { + if constant { + res = append(res, &constString{string(is)}) + } else if len(is) > 0 { + if is[len(is)-1] == '.' { + return nil, fmt.Errorf("variable cannot end with '.'") + } + res = append(res, &varString{string(is)}) + } + is = is[:0] // slice to zero length; to keep allocated memory + constant = false + } else { + is = append(is, r) + } + continue + } + if !escape && (r == '"' || r == '\'') { + if quote == out { + // start of unescaped quote + quote = r + constant = true + } else if quote == r { + // end of unescaped quote + quote = out + } else { + is = append(is, r) + } + continue + } + // escape because of backslash (\); except when it is the second backslash of a pair + escape = !escape && r == '\\' + if r == '\\' { + if !escape { + is = append(is, r) + } + } else if quote != out || !unicode.IsSpace(r) { + is = append(is, r) + } + } + if quote != out { + return nil, fmt.Errorf(`starting %s is missing ending %s`, string(quote), string(quote)) + } + if constant { + res = append(res, &constString{string(is)}) + } else if len(is) > 0 { + if is[len(is)-1] == '.' { + return nil, fmt.Errorf("variable cannot end with '.'") + } + res = append(res, &varString{string(is)}) + } + return res, nil +} + +func varPrefixMatched(val string, key string) bool { + s := strings.SplitN(val, ".", 2) + return s[0] == key +} diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/vars_test.go b/x-pack/elastic-agent/pkg/agent/transpiler/vars_test.go new file mode 100644 index 00000000000..31249316099 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/transpiler/vars_test.go @@ -0,0 +1,250 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package transpiler + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestVars_Replace(t *testing.T) { + vars := &Vars{ + Mapping: map[string]interface{}{ + "un-der_score": map[string]interface{}{ + "key1": "data1", + "key2": "data2", + "list": []string{ + "array1", + "array2", + }, + "dict": map[string]interface{}{ + "key1": "value1", + "key2": "value2", + }, + }, + "other": map[string]interface{}{ + "data": "info", + }, + }, + } + tests := []struct { + Input string + Result Node + Error bool + NoMatch bool + }{ + { + "${un-der_score.key1}", + NewStrVal("data1"), + false, + false, + }, + { + "${un-der_score.missing}", + NewStrVal(""), + false, + true, + }, + { + "${un-der_score.missing|un-der_score.key2}", + NewStrVal("data2"), + false, + false, + }, + { + "${un-der_score.missing|un-der_score.missing2|other.data}", + NewStrVal("info"), + false, + false, + }, + { + "${un-der_score.missing|'fallback'}", + NewStrVal("fallback"), + false, + false, + }, + { + `${un-der_score.missing|||||||||"fallback"}`, + NewStrVal("fallback"), + false, + false, + }, + { + `${"direct"}`, + NewStrVal("direct"), + false, + false, + }, + { + `${un-der_score.}`, + NewStrVal(""), + true, + false, + }, + { + `${un-der_score.missing|"oth}`, + NewStrVal(""), + true, + false, + }, + { + `${un-der_score.missing`, + NewStrVal(""), + true, + false, + }, + { + `${un-der_score.missing ${other}`, + NewStrVal(""), + true, + false, + }, + { + `${}`, + NewStrVal(""), + true, + false, + }, + { + "around ${un-der_score.key1} the var", + NewStrVal("around data1 the var"), + false, + false, + }, + { + "multi ${un-der_score.key1} var ${ un-der_score.missing | un-der_score.key2 } around", + NewStrVal("multi data1 var data2 around"), + false, + false, + }, + { + `multi ${un-der_score.key1} var ${ un-der_score.missing| 'other"s with space' } around`, + NewStrVal(`multi data1 var other"s with space around`), + false, + false, + }, + { + `start ${ un-der_score.missing| 'others | with space' } end`, + NewStrVal(`start others | with space end`), + false, + false, + }, + { + `start ${ un-der_score.missing| 'other\'s with space' } end`, + NewStrVal(`start other's with space end`), + false, + false, + }, + { + `${un-der_score.list}`, + NewList([]Node{ + NewStrVal("array1"), + NewStrVal("array2"), + }), + false, + false, + }, + { + `list inside string ${un-der_score.list} causes no match`, + NewList([]Node{ + NewStrVal("array1"), + NewStrVal("array2"), + }), + false, + true, + }, + { + `${un-der_score.dict}`, + NewDict([]Node{ + NewKey("key1", NewStrVal("value1")), + NewKey("key2", NewStrVal("value2")), + }), + false, + false, + }, + { + `dict inside string ${un-der_score.dict} causes no match`, + NewDict([]Node{ + NewKey("key1", NewStrVal("value1")), + NewKey("key2", NewStrVal("value2")), + }), + false, + true, + }, + } + for _, test := range tests { + t.Run(test.Input, func(t *testing.T) { + res, err := vars.Replace(test.Input) + if test.Error { + assert.Error(t, err) + } else if test.NoMatch { + assert.Error(t, ErrNoMatch, err) + } else { + require.NoError(t, err) + assert.Equal(t, test.Result, res) + } + }) + } +} + +func TestVars_ReplaceWithProcessors(t *testing.T) { + processers := Processors{ + { + "add_fields": map[string]interface{}{ + "dynamic": "added", + }, + }, + } + vars := &Vars{ + Mapping: map[string]interface{}{ + "testing": map[string]interface{}{ + "key1": "data1", + }, + "dynamic": map[string]interface{}{ + "key1": "dynamic1", + "list": []string{ + "array1", + "array2", + }, + "dict": map[string]string{ + "key1": "value1", + "key2": "value2", + }, + }, + }, + ProcessorsKey: "dynamic", + Processors: processers, + } + + res, err := vars.Replace("${testing.key1}") + require.NoError(t, err) + assert.Equal(t, NewStrVal("data1"), res) + + res, err = vars.Replace("${dynamic.key1}") + require.NoError(t, err) + assert.Equal(t, NewStrValWithProcessors("dynamic1", processers), res) + + res, err = vars.Replace("${other.key1|dynamic.key1}") + require.NoError(t, err) + assert.Equal(t, NewStrValWithProcessors("dynamic1", processers), res) + + res, err = vars.Replace("${dynamic.list}") + require.NoError(t, err) + assert.Equal(t, processers, res.Processors()) + assert.Equal(t, NewListWithProcessors([]Node{ + NewStrVal("array1"), + NewStrVal("array2"), + }, processers), res) + + res, err = vars.Replace("${dynamic.dict}") + require.NoError(t, err) + assert.Equal(t, processers, res.Processors()) + assert.Equal(t, NewDictWithProcessors([]Node{ + NewKey("key1", NewStrVal("value1")), + NewKey("key2", NewStrVal("value2")), + }, processers), res) +} diff --git a/x-pack/elastic-agent/pkg/composable/controller.go b/x-pack/elastic-agent/pkg/composable/controller.go index e11f416cbba..9ff0013e82d 100644 --- a/x-pack/elastic-agent/pkg/composable/controller.go +++ b/x-pack/elastic-agent/pkg/composable/controller.go @@ -8,38 +8,49 @@ import ( "context" "encoding/json" "fmt" + "reflect" "sort" "sync" "time" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" ) -// Vars is a context of variables that also contain a list of processors that go with the mapping. -type Vars struct { - Mapping map[string]interface{} - - ProcessorsKey string - Processors []map[string]interface{} -} - // VarsCallback is callback called when the current vars state changes. -type VarsCallback func([]Vars) +type VarsCallback func([]transpiler.Vars) // Controller manages the state of the providers current context. -type Controller struct { +type Controller interface { + // Run runs the controller. + // + // Cancelling the context stops the controller. + Run(ctx context.Context, cb VarsCallback) error +} + +// controller manages the state of the providers current context. +type controller struct { contextProviders map[string]*contextProviderState dynamicProviders map[string]*dynamicProviderState } // New creates a new controller. -func New(c *config.Config) (*Controller, error) { - var providersCfg Config - err := c.Unpack(&providersCfg) +func New(c *config.Config) (Controller, error) { + l, err := logger.New("composable") if err != nil { - return nil, errors.New(err, "failed to unpack providers config", errors.TypeConfig) + return nil, err + } + l.Info("EXPERIMENTAL - Inputs with variables are currently experimental and should not be used in production") + + var providersCfg Config + if c != nil { + err := c.Unpack(&providersCfg) + if err != nil { + return nil, errors.New(err, "failed to unpack providers config", errors.TypeConfig) + } } // build all the context providers @@ -73,18 +84,18 @@ func New(c *config.Config) (*Controller, error) { } dynamicProviders[name] = &dynamicProviderState{ provider: provider, - mappings: map[string]Vars{}, + mappings: map[string]transpiler.Vars{}, } } - return &Controller{ + return &controller{ contextProviders: contextProviders, dynamicProviders: dynamicProviders, }, nil } // Run runs the controller. -func (c *Controller) Run(ctx context.Context, cb VarsCallback) error { +func (c *controller) Run(ctx context.Context, cb VarsCallback) error { // large number not to block performing Run on the provided providers notify := make(chan bool, 5000) localCtx, cancel := context.WithCancel(ctx) @@ -136,12 +147,12 @@ func (c *Controller) Run(ctx context.Context, cb VarsCallback) error { } // build the vars list of mappings - vars := make([]Vars, 1) + vars := make([]transpiler.Vars, 1) mapping := map[string]interface{}{} for name, state := range c.contextProviders { mapping[name] = state.Current() } - vars[0] = Vars{ + vars[0] = transpiler.Vars{ Mapping: mapping, } @@ -150,7 +161,7 @@ func (c *Controller) Run(ctx context.Context, cb VarsCallback) error { for _, mappings := range state.Mappings() { local, _ := cloneMap(mapping) // will not fail; already been successfully cloned once local[name] = mappings.Mapping - vars = append(vars, Vars{ + vars = append(vars, transpiler.Vars{ Mapping: local, ProcessorsKey: name, Processors: mappings.Processors, @@ -207,7 +218,7 @@ type dynamicProviderState struct { provider DynamicProvider lock sync.RWMutex - mappings map[string]Vars + mappings map[string]transpiler.Vars signal chan bool } @@ -230,7 +241,7 @@ func (c *dynamicProviderState) AddOrUpdate(id string, mapping map[string]interfa // same mapping; no need to update and signal return nil } - c.mappings[id] = Vars{ + c.mappings[id] = transpiler.Vars{ Mapping: mapping, Processors: processors, } @@ -251,11 +262,11 @@ func (c *dynamicProviderState) Remove(id string) { } // Mappings returns the current mappings. -func (c *dynamicProviderState) Mappings() []Vars { +func (c *dynamicProviderState) Mappings() []transpiler.Vars { c.lock.RLock() defer c.lock.RUnlock() - mappings := make([]Vars, 0) + mappings := make([]transpiler.Vars, 0) ids := make([]string, 0) for name := range c.mappings { ids = append(ids, name) diff --git a/x-pack/elastic-agent/pkg/composable/controller_test.go b/x-pack/elastic-agent/pkg/composable/controller_test.go index 43c2889a92a..2c1e2e15f3c 100644 --- a/x-pack/elastic-agent/pkg/composable/controller_test.go +++ b/x-pack/elastic-agent/pkg/composable/controller_test.go @@ -9,6 +9,8 @@ import ( "sync" "testing" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/transpiler" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -34,12 +36,36 @@ func TestController(t *testing.T) { }, }, "local_dynamic": map[string]interface{}{ - "vars": []map[string]interface{}{ + "items": []map[string]interface{}{ { - "key1": "value1", + "vars": map[string]interface{}{ + "key1": "value1", + }, + "processors": []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "add": "value1", + }, + "to": "dynamic", + }, + }, + }, }, { - "key1": "value2", + "vars": map[string]interface{}{ + "key1": "value2", + }, + "processors": []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "add": "value2", + }, + "to": "dynamic", + }, + }, + }, }, }, }, @@ -54,8 +80,8 @@ func TestController(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg.Add(1) - var setVars []composable.Vars - err = c.Run(ctx, func(vars []composable.Vars) { + var setVars []transpiler.Vars + err = c.Run(ctx, func(vars []transpiler.Vars) { setVars = vars wg.Done() }) @@ -76,10 +102,10 @@ func TestController(t *testing.T) { localMap = setVars[1].Mapping["local_dynamic"].(map[string]interface{}) assert.Equal(t, "value1", localMap["key1"]) assert.Equal(t, "local_dynamic", setVars[1].ProcessorsKey) - assert.Nil(t, setVars[1].Processors) + assert.Len(t, setVars[1].Processors, 1) localMap = setVars[2].Mapping["local_dynamic"].(map[string]interface{}) assert.Equal(t, "value2", localMap["key1"]) assert.Equal(t, "local_dynamic", setVars[2].ProcessorsKey) - assert.Nil(t, setVars[2].Processors) + assert.Len(t, setVars[2].Processors, 1) } diff --git a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go index f622b13410e..0e5297a9e10 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go +++ b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic.go @@ -17,14 +17,19 @@ func init() { composable.Providers.AddDynamicProvider("local_dynamic", DynamicProviderBuilder) } +type dynamicItem struct { + Mapping map[string]interface{} `config:"vars"` + Processors []map[string]interface{} `config:"processors"` +} + type dynamicProvider struct { - Mappings []map[string]interface{} `config:"vars"` + Items []dynamicItem `config:"items"` } // Run runs the environment context provider. func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { - for i, mapping := range c.Mappings { - if err := comm.AddOrUpdate(strconv.Itoa(i), mapping, nil); err != nil { + for i, item := range c.Items { + if err := comm.AddOrUpdate(strconv.Itoa(i), item.Mapping, item.Processors); err != nil { return errors.New(err, fmt.Sprintf("failed to add mapping for index %d", i), errors.TypeUnexpected) } } @@ -40,8 +45,8 @@ func DynamicProviderBuilder(c *config.Config) (composable.DynamicProvider, error return nil, fmt.Errorf("failed to unpack vars: %s", err) } } - if p.Mappings == nil { - p.Mappings = []map[string]interface{}{} + if p.Items == nil { + p.Items = []dynamicItem{} } return p, nil } diff --git a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go index 68dc676dc7c..79c107372e3 100644 --- a/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go +++ b/x-pack/elastic-agent/pkg/composable/providers/localdynamic/localdynamic_test.go @@ -21,13 +21,42 @@ func TestContextProvider(t *testing.T) { "key1": "value1", "key2": "value2", } + processors1 := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "add": "value1", + }, + "to": "dynamic", + }, + }, + } mapping2 := map[string]interface{}{ "key1": "value12", "key2": "value22", } - mapping := []map[string]interface{}{mapping1, mapping2} + processors2 := []map[string]interface{}{ + { + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{ + "add": "value12", + }, + "to": "dynamic", + }, + }, + } + mapping := []map[string]interface{}{ + { + "vars": mapping1, + "processors": processors1, + }, + { + "vars": mapping2, + "processors": processors2, + }, + } cfg, err := config.NewConfigFrom(map[string]interface{}{ - "vars": mapping, + "items": mapping, }) require.NoError(t, err) builder, _ := composable.Providers.GetDynamicProvider("local_dynamic") @@ -41,8 +70,10 @@ func TestContextProvider(t *testing.T) { curr1, ok1 := comm.Current("0") assert.True(t, ok1) assert.Equal(t, mapping1, curr1.Mapping) + assert.Equal(t, processors1, curr1.Processors) curr2, ok2 := comm.Current("1") assert.True(t, ok2) assert.Equal(t, mapping2, curr2.Mapping) + assert.Equal(t, processors2, curr2.Processors) } diff --git a/x-pack/elastic-agent/pkg/composable/providers/path/path.go b/x-pack/elastic-agent/pkg/composable/providers/path/path.go new file mode 100644 index 00000000000..41f18b91deb --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/path/path.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package path + +import ( + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" +) + +func init() { + composable.Providers.AddContextProvider("path", ContextProviderBuilder) +} + +type contextProvider struct{} + +// Run runs the Agent context provider. +func (*contextProvider) Run(comm composable.ContextProviderComm) error { + err := comm.Set(map[string]interface{}{ + "home": paths.Home(), + "data": paths.Data(), + "config": paths.Config(), + "logs": paths.Logs(), + }) + if err != nil { + return errors.New(err, "failed to set mapping", errors.TypeUnexpected) + } + return nil +} + +// ContextProviderBuilder builds the context provider. +func ContextProviderBuilder(_ *config.Config) (composable.ContextProvider, error) { + return &contextProvider{}, nil +} diff --git a/x-pack/elastic-agent/pkg/composable/providers/path/path_test.go b/x-pack/elastic-agent/pkg/composable/providers/path/path_test.go new file mode 100644 index 00000000000..46d5006cebd --- /dev/null +++ b/x-pack/elastic-agent/pkg/composable/providers/path/path_test.go @@ -0,0 +1,33 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package path + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/paths" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" + ctesting "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/testing" +) + +func TestContextProvider(t *testing.T) { + builder, _ := composable.Providers.GetContextProvider("path") + provider, err := builder(nil) + require.NoError(t, err) + + comm := ctesting.NewContextComm(context.Background()) + err = provider.Run(comm) + require.NoError(t, err) + + current := comm.Current() + assert.Equal(t, paths.Home(), current["home"]) + assert.Equal(t, paths.Data(), current["data"]) + assert.Equal(t, paths.Config(), current["config"]) + assert.Equal(t, paths.Logs(), current["logs"]) +} diff --git a/x-pack/elastic-agent/pkg/config/config.go b/x-pack/elastic-agent/pkg/config/config.go index 2968dd618e0..35bdac74aae 100644 --- a/x-pack/elastic-agent/pkg/config/config.go +++ b/x-pack/elastic-agent/pkg/config/config.go @@ -24,11 +24,6 @@ var DefaultOptions = []ucfg.Option{ // Config custom type over a ucfg.Config to add new methods on the object. type Config ucfg.Config -// ReadFile reads a configuration from disk. -func ReadFile(file string) (*Config, error) { - return nil, nil -} - // LoadYAML takes YAML configuration and return a concrete Config or any errors. func LoadYAML(path string, opts ...ucfg.Option) (*Config, error) { if len(opts) == 0 { @@ -42,9 +37,13 @@ func LoadYAML(path string, opts ...ucfg.Option) (*Config, error) { } // NewConfigFrom takes a interface and read the configuration like it was YAML. -func NewConfigFrom(from interface{}) (*Config, error) { +func NewConfigFrom(from interface{}, opts ...ucfg.Option) (*Config, error) { + if len(opts) == 0 { + opts = DefaultOptions + } + if str, ok := from.(string); ok { - c, err := yaml.NewConfig([]byte(str), DefaultOptions...) + c, err := yaml.NewConfig([]byte(str), opts...) return newConfigFrom(c), err } @@ -57,11 +56,11 @@ func NewConfigFrom(from interface{}) (*Config, error) { if err != nil { return nil, err } - c, err := yaml.NewConfig(content, DefaultOptions...) + c, err := yaml.NewConfig(content, opts...) return newConfigFrom(c), err } - c, err := ucfg.NewFrom(from, DefaultOptions...) + c, err := ucfg.NewFrom(from, opts...) return newConfigFrom(c), err } @@ -89,8 +88,11 @@ func (c *Config) access() *ucfg.Config { } // Merge merges two configuration together. -func (c *Config) Merge(from interface{}) error { - return c.access().Merge(from, DefaultOptions...) +func (c *Config) Merge(from interface{}, opts ...ucfg.Option) error { + if len(opts) == 0 { + opts = DefaultOptions + } + return c.access().Merge(from, opts...) } // ToMapStr takes the config and transform it into a map[string]interface{}