Skip to content

Commit

Permalink
Load inputs from external configuration files in Elastic Agent in sta…
Browse files Browse the repository at this point in the history
…ndalone mode (#30087)

## What does this PR do?

This PR adds support for loading external configuration files to load inputs when running Agent in standalone mode.
Users have to put their configuration files into `{path.config}/inputs.d` with the extension `yml`.

When it is configured, Agent takes the `inputs` configuration from the last standalone configuration and appends the inputs from the external configuration files to it. When the standalone configurations are reloaded, external input configurations are reloaded as well.

The external configuration file must include `inputs`, otherwise the file is invalid. Example for correct file:

```yaml
inputs:
- data_stream:
    dataset: system.auth
    type: logs
  id: logfile-system.auth-my-id
  paths:
  - /var/log/auth.log*
  use_output: default
```

## Why is it important?

Users can configure external configuration files, similar to Filebeat. So managing configuration can be done by Git or Chef instead of Fleet.
  • Loading branch information
kvch authored Feb 1, 2022
1 parent 6f3f09d commit 3c2072d
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ inputs:
# port: 6791

# # Allow fleet to reload its configuration locally on disk.
# # Notes: Only specific process configuration will be reloaded.
# # Notes: Only specific process configuration and external input configurations will be reloaded.
# agent.reload:
# # enabled configure the Elastic Agent to reload or not the local configuration.
# #
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ inputs:
# port: 6791

# # Allow fleet to reload its configuration locally on disk.
# # Notes: Only specific process configuration will be reloaded.
# # Notes: Only specific process configuration and external input configurations will be reloaded.
# agent.reload:
# # enabled configure the Elastic Agent to reload or not the local configuration.
# #
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ func newFleetServerBootstrap(
return nil, err
}

loader := config.NewLoader(log, "")
discover := discoverer(pathConfigFile, cfg.Settings.Path)
bootstrapApp.source = newOnce(log, discover, emit)
bootstrapApp.source = newOnce(log, discover, loader, emit)
return bootstrapApp, nil
}

Expand Down
13 changes: 10 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package application

import (
"context"
"path/filepath"

"go.elastic.co/apm"

Expand Down Expand Up @@ -117,7 +118,7 @@ func newLocal(
return nil, errors.New(err, "failed to initialize composable controller")
}

discover := discoverer(pathConfigFile, cfg.Settings.Path)
discover := discoverer(pathConfigFile, cfg.Settings.Path, configuration.ExternalInputsPattern)
emit, err := emitter.New(
localApplication.bgContext,
log,
Expand All @@ -135,13 +136,15 @@ func newLocal(
return nil, err
}

loader := config.NewLoader(log, externalConfigsGlob())

var cfgSource source
if !cfg.Settings.Reload.Enabled {
log.Debug("Reloading of configuration is off")
cfgSource = newOnce(log, discover, emit)
cfgSource = newOnce(log, discover, loader, emit)
} else {
log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period)
cfgSource = newPeriodic(log, cfg.Settings.Reload.Period, discover, emit)
cfgSource = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader, emit)
}

localApplication.source = cfgSource
Expand All @@ -161,6 +164,10 @@ func newLocal(
return localApplication, nil
}

func externalConfigsGlob() string {
return filepath.Join(paths.Config(), configuration.ExternalInputsPattern)
}

// Routes returns a list of routes handled by agent.
func (l *Local) Routes() *sorted.Set {
return l.router.Routes()
Expand Down
11 changes: 6 additions & 5 deletions x-pack/elastic-agent/pkg/agent/application/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
type once struct {
log *logger.Logger
discover discoverFunc
loader *config.Loader
emitter pipeline.EmitterFunc
}

func newOnce(log *logger.Logger, discover discoverFunc, emitter pipeline.EmitterFunc) *once {
return &once{log: log, discover: discover, emitter: emitter}
func newOnce(log *logger.Logger, discover discoverFunc, loader *config.Loader, emitter pipeline.EmitterFunc) *once {
return &once{log: log, discover: discover, loader: loader, emitter: emitter}
}

func (o *once) Start() error {
Expand All @@ -33,15 +34,15 @@ func (o *once) Start() error {
return ErrNoConfiguration
}

return readfiles(context.Background(), files, o.emitter)
return readfiles(context.Background(), files, o.loader, o.emitter)
}

func (o *once) Stop() error {
return nil
}

func readfiles(ctx context.Context, files []string, emitter pipeline.EmitterFunc) error {
c, err := config.LoadFiles(files...)
func readfiles(ctx context.Context, files []string, loader *config.Loader, emitter pipeline.EmitterFunc) error {
c, err := loader.Load(files)
if err != nil {
return errors.New(err, "could not load or merge configuration", errors.TypeConfig)
}
Expand Down
6 changes: 5 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/filewatcher"
)
Expand All @@ -20,6 +21,7 @@ type periodic struct {
period time.Duration
done chan struct{}
watcher *filewatcher.Watch
loader *config.Loader
emitter pipeline.EmitterFunc
discover discoverFunc
}
Expand Down Expand Up @@ -90,7 +92,7 @@ func (p *periodic) work() error {
p.log.Debugf("Unchanged %d files: %s", len(s.Unchanged), strings.Join(s.Updated, ", "))
}

err := readfiles(context.Background(), files, p.emitter)
err := readfiles(context.Background(), files, p.loader, p.emitter)
if err != nil {
// assume something when really wrong and invalidate any cache
// so we get a full new config on next tick.
Expand All @@ -112,6 +114,7 @@ func newPeriodic(
log *logger.Logger,
period time.Duration,
discover discoverFunc,
loader *config.Loader,
emitter pipeline.EmitterFunc,
) *periodic {
w, err := filewatcher.New(log, filewatcher.DefaultComparer)
Expand All @@ -127,6 +130,7 @@ func newPeriodic(
done: make(chan struct{}),
watcher: w,
discover: discover,
loader: loader,
emitter: emitter,
}
}
5 changes: 5 additions & 0 deletions x-pack/elastic-agent/pkg/agent/configuration/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package configuration

import (
"path/filepath"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
monitoringCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/config"
Expand All @@ -13,6 +15,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
)

// ExternalInputsPattern is a glob that matches the paths of external configuration files.
var ExternalInputsPattern = filepath.Join("inputs.d", "*.yml")

// SettingsConfig is an collection of agent settings configuration.
type SettingsConfig struct {
DownloadConfig *artifact.Config `yaml:"download" config:"download" json:"download"`
Expand Down
104 changes: 104 additions & 0 deletions x-pack/elastic-agent/pkg/config/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package config

import (
"fmt"
"path/filepath"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/cfgutil"
)

// Loader is used to load configuration from the paths
// including appending multiple input configurations.
type Loader struct {
logger *logger.Logger
inputsFolder string
}

// NewLoader creates a new Loader instance to load configuration
// files from different paths.
func NewLoader(logger *logger.Logger, inputsFolder string) *Loader {
return &Loader{logger: logger, inputsFolder: inputsFolder}
}

// Load iterates over the list of files and loads the confguration from them.
// If a configuration file is under the folder set in `agent.config.inputs.path`
// it is appended to a list. If it is a regular config file, it is merged into
// the result config. The list of input configurations is merged into the result
// last.
func (l *Loader) Load(files []string) (*Config, error) {
inputsList := make([]*ucfg.Config, 0)
merger := cfgutil.NewCollector(nil)
for _, f := range files {
cfg, err := LoadFile(f)
if err != nil {
if l.isFileUnderInputsFolder(f) {
return nil, fmt.Errorf("failed to load external configuration file '%s': %w. Are you sure it contains an inputs section?", f, err)
}
return nil, fmt.Errorf("failed to load configuration file '%s': %w", f, err)
}
l.logger.Debugf("Loaded configuration from %s", f)
if l.isFileUnderInputsFolder(f) {
inp, err := getInput(cfg)
if err != nil {
return nil, fmt.Errorf("cannot get configuration from '%s': %w", f, err)
}
inputsList = append(inputsList, inp...)
l.logger.Debugf("Loaded %s input(s) from configuration from %s", len(inp), f)
} else {
if err := merger.Add(cfg.access(), err); err != nil {
return nil, fmt.Errorf("failed to merge configuration file '%s' to existing one: %w", f, err)
}
l.logger.Debugf("Merged configuration from %s into result", f)
}
}
config := merger.Config()

// if there is no input configuration, return what we have collected.
if len(inputsList) == 0 {
l.logger.Debugf("Merged all configuration files from %v, no external input files", files)
return newConfigFrom(config), nil
}

// merge inputs sections from the last standalone configuration
// file and all files from the inputs folder
start := 0
if config.HasField("inputs") {
var err error
start, err = config.CountField("inputs")
if err != nil {
return nil, fmt.Errorf("failed to count the number of inputs in the configuration: %w", err)
}
}
for i, ll := range inputsList {
if err := config.SetChild("inputs", start+i, ll); err != nil {
return nil, fmt.Errorf("failed to add inputs to result configuration: %w", err)
}
}

l.logger.Debugf("Merged all configuration files from %v, with external input files", files)
return newConfigFrom(config), nil
}

func getInput(c *Config) ([]*ucfg.Config, error) {
tmpConfig := struct {
Inputs []*ucfg.Config `config:"inputs"`
}{make([]*ucfg.Config, 0)}

if err := c.Unpack(&tmpConfig); err != nil {
return nil, fmt.Errorf("failed to parse inputs section from configuration: %w", err)
}
return tmpConfig.Inputs, nil
}

func (l *Loader) isFileUnderInputsFolder(f string) bool {
if matches, err := filepath.Match(l.inputsFolder, f); !matches || err != nil {
return false
}
return true
}
Loading

0 comments on commit 3c2072d

Please sign in to comment.