Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load inputs from external configuration files in Elastic Agent in standalone mode #30087

Merged
merged 5 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ inputs:
# port: 6791

# # Allow fleet to reload its configuration locally on disk.
# # Notes: Only specific process configuration will be reloaded.
# # Notes: Only specific process configuration and external input configurations will be reloaded.
# agent.reload:
# # enabled configure the Elastic Agent to reload or not the local configuration.
# #
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ inputs:
# port: 6791

# # Allow fleet to reload its configuration locally on disk.
# # Notes: Only specific process configuration will be reloaded.
# # Notes: Only specific process configuration and external input configurations will be reloaded.
# agent.reload:
# # enabled configure the Elastic Agent to reload or not the local configuration.
# #
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ func newFleetServerBootstrap(
return nil, err
}

loader := config.NewLoader(log, "")
discover := discoverer(pathConfigFile, cfg.Settings.Path)
bootstrapApp.source = newOnce(log, discover, emit)
bootstrapApp.source = newOnce(log, discover, loader, emit)
return bootstrapApp, nil
}

Expand Down
13 changes: 10 additions & 3 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package application

import (
"context"
"path/filepath"

"go.elastic.co/apm"

Expand Down Expand Up @@ -117,7 +118,7 @@ func newLocal(
return nil, errors.New(err, "failed to initialize composable controller")
}

discover := discoverer(pathConfigFile, cfg.Settings.Path)
discover := discoverer(pathConfigFile, cfg.Settings.Path, configuration.ExternalInputsPattern)
emit, err := emitter.New(
localApplication.bgContext,
log,
Expand All @@ -135,13 +136,15 @@ func newLocal(
return nil, err
}

loader := config.NewLoader(log, externalConfigsGlob())

var cfgSource source
if !cfg.Settings.Reload.Enabled {
log.Debug("Reloading of configuration is off")
cfgSource = newOnce(log, discover, emit)
cfgSource = newOnce(log, discover, loader, emit)
} else {
log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period)
cfgSource = newPeriodic(log, cfg.Settings.Reload.Period, discover, emit)
cfgSource = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader, emit)
}

localApplication.source = cfgSource
Expand All @@ -161,6 +164,10 @@ func newLocal(
return localApplication, nil
}

func externalConfigsGlob() string {
return filepath.Join(paths.Config(), configuration.ExternalInputsPattern)
}

// Routes returns a list of routes handled by agent.
func (l *Local) Routes() *sorted.Set {
return l.router.Routes()
Expand Down
11 changes: 6 additions & 5 deletions x-pack/elastic-agent/pkg/agent/application/once.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (
type once struct {
log *logger.Logger
discover discoverFunc
loader *config.Loader
emitter pipeline.EmitterFunc
}

func newOnce(log *logger.Logger, discover discoverFunc, emitter pipeline.EmitterFunc) *once {
return &once{log: log, discover: discover, emitter: emitter}
func newOnce(log *logger.Logger, discover discoverFunc, loader *config.Loader, emitter pipeline.EmitterFunc) *once {
return &once{log: log, discover: discover, loader: loader, emitter: emitter}
}

func (o *once) Start() error {
Expand All @@ -33,15 +34,15 @@ func (o *once) Start() error {
return ErrNoConfiguration
}

return readfiles(context.Background(), files, o.emitter)
return readfiles(context.Background(), files, o.loader, o.emitter)
}

func (o *once) Stop() error {
return nil
}

func readfiles(ctx context.Context, files []string, emitter pipeline.EmitterFunc) error {
c, err := config.LoadFiles(files...)
func readfiles(ctx context.Context, files []string, loader *config.Loader, emitter pipeline.EmitterFunc) error {
c, err := loader.Load(files)
if err != nil {
return errors.New(err, "could not load or merge configuration", errors.TypeConfig)
}
Expand Down
6 changes: 5 additions & 1 deletion x-pack/elastic-agent/pkg/agent/application/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/filewatcher"
)
Expand All @@ -20,6 +21,7 @@ type periodic struct {
period time.Duration
done chan struct{}
watcher *filewatcher.Watch
loader *config.Loader
emitter pipeline.EmitterFunc
discover discoverFunc
}
Expand Down Expand Up @@ -90,7 +92,7 @@ func (p *periodic) work() error {
p.log.Debugf("Unchanged %d files: %s", len(s.Unchanged), strings.Join(s.Updated, ", "))
}

err := readfiles(context.Background(), files, p.emitter)
err := readfiles(context.Background(), files, p.loader, p.emitter)
if err != nil {
// assume something when really wrong and invalidate any cache
// so we get a full new config on next tick.
Expand All @@ -112,6 +114,7 @@ func newPeriodic(
log *logger.Logger,
period time.Duration,
discover discoverFunc,
loader *config.Loader,
emitter pipeline.EmitterFunc,
) *periodic {
w, err := filewatcher.New(log, filewatcher.DefaultComparer)
Expand All @@ -127,6 +130,7 @@ func newPeriodic(
done: make(chan struct{}),
watcher: w,
discover: discover,
loader: loader,
emitter: emitter,
}
}
5 changes: 5 additions & 0 deletions x-pack/elastic-agent/pkg/agent/configuration/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package configuration

import (
"path/filepath"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
monitoringCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/config"
Expand All @@ -13,6 +15,9 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
)

// ExternalInputsPattern is a glob that matches the paths of external configuration files.
var ExternalInputsPattern = filepath.Join("inputs.d", "*.yml")

// SettingsConfig is an collection of agent settings configuration.
type SettingsConfig struct {
DownloadConfig *artifact.Config `yaml:"download" config:"download" json:"download"`
Expand Down
104 changes: 104 additions & 0 deletions x-pack/elastic-agent/pkg/config/loader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package config

import (
"fmt"
"path/filepath"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/go-ucfg"
"github.com/elastic/go-ucfg/cfgutil"
)

// Loader is used to load configuration from the paths
// including appending multiple input configurations.
type Loader struct {
logger *logger.Logger
inputsFolder string
}

// NewLoader creates a new Loader instance to load configuration
// files from different paths.
func NewLoader(logger *logger.Logger, inputsFolder string) *Loader {
return &Loader{logger: logger, inputsFolder: inputsFolder}
}

// Load iterates over the list of files and loads the confguration from them.
// If a configuration file is under the folder set in `agent.config.inputs.path`
// it is appended to a list. If it is a regular config file, it is merged into
// the result config. The list of input configurations is merged into the result
// last.
func (l *Loader) Load(files []string) (*Config, error) {
inputsList := make([]*ucfg.Config, 0)
merger := cfgutil.NewCollector(nil)
for _, f := range files {
cfg, err := LoadFile(f)
if err != nil {
if l.isFileUnderInputsFolder(f) {
return nil, fmt.Errorf("failed to load external configuration file '%s': %w. Are you sure it contains an inputs section?", f, err)
}
return nil, fmt.Errorf("failed to load configuration file '%s': %w", f, err)
}
l.logger.Debugf("Loaded configuration from %s", f)
if l.isFileUnderInputsFolder(f) {
inp, err := getInput(cfg)
if err != nil {
return nil, fmt.Errorf("cannot get configuration from '%s': %w", f, err)
}
inputsList = append(inputsList, inp...)
l.logger.Debugf("Loaded %s input(s) from configuration from %s", len(inp), f)
} else {
if err := merger.Add(cfg.access(), err); err != nil {
return nil, fmt.Errorf("failed to merge configuration file '%s' to existing one: %w", f, err)
}
l.logger.Debugf("Merged configuration from %s into result", f)
}
}
config := merger.Config()

// if there is no input configuration, return what we have collected.
if len(inputsList) == 0 {
l.logger.Debugf("Merged all configuration files from %v, no external input files", files)
return newConfigFrom(config), nil
}

// merge inputs sections from the last standalone configuration
// file and all files from the inputs folder
start := 0
if config.HasField("inputs") {
var err error
start, err = config.CountField("inputs")
if err != nil {
return nil, fmt.Errorf("failed to count the number of inputs in the configuration: %w", err)
}
}
for i, ll := range inputsList {
if err := config.SetChild("inputs", start+i, ll); err != nil {
return nil, fmt.Errorf("failed to add inputs to result configuration: %w", err)
}
}

l.logger.Debugf("Merged all configuration files from %v, with external input files", files)
return newConfigFrom(config), nil
}

func getInput(c *Config) ([]*ucfg.Config, error) {
tmpConfig := struct {
Inputs []*ucfg.Config `config:"inputs"`
}{make([]*ucfg.Config, 0)}

if err := c.Unpack(&tmpConfig); err != nil {
return nil, fmt.Errorf("failed to parse inputs section from configuration: %w", err)
}
return tmpConfig.Inputs, nil
}

func (l *Loader) isFileUnderInputsFolder(f string) bool {
if matches, err := filepath.Match(l.inputsFolder, f); !matches || err != nil {
return false
}
return true
}
Loading