Skip to content

Commit

Permalink
Logging code cleanup related to Nomad auto-discovery (#26498) (#26560)
Browse files Browse the repository at this point in the history
* Logging and code cleanup related to Nomad auto-discover
* Fix a few doc nits
* Update heartbeat log line assertion
* Add complete example with discovery and add_nomad_metadata
* Fix secret_id and document minimal ACL policy

Closes #26456

(cherry picked from commit b061836)

Co-authored-by: Andrew Kroh <[email protected]>
  • Loading branch information
mergify[bot] and andrewkroh authored Jun 29, 2021
1 parent 9c52778 commit 5ded62f
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 95 deletions.
39 changes: 16 additions & 23 deletions filebeat/autodiscover/builder/hints/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,55 +52,48 @@ var validModuleNames = regexp.MustCompile("[^a-zA-Z0-9\\_\\-]+")
type logHints struct {
config *config
registry *fileset.ModuleRegistry
log *logp.Logger
}

// NewLogHints builds a log hints builder
func NewLogHints(cfg *common.Config) (autodiscover.Builder, error) {
config := defaultConfig()
err := cfg.Unpack(&config)

if err != nil {
return nil, fmt.Errorf("unable to unpack hints config due to error: %v", err)
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("unable to unpack hints config due to error: %w", err)
}

moduleRegistry, err := fileset.NewModuleRegistry(nil, beat.Info{}, false)
if err != nil {
return nil, err
}

return &logHints{&config, moduleRegistry}, nil
return &logHints{&config, moduleRegistry, logp.NewLogger("hints.builder")}, nil
}

// Create config based on input hints in the bus event
func (l *logHints) CreateConfig(event bus.Event, options ...ucfg.Option) []*common.Config {
var hints common.MapStr
hIface, ok := event["hints"]
if ok {
hints, _ = hIface.(common.MapStr)
}

inputConfig := l.getInputsConfigs(hints)

// If default config is disabled return nothing unless it's explicty enabled
if !l.config.DefaultConfig.Enabled() && !builder.IsEnabled(hints, l.config.Key) {
logp.Debug("hints.builder", "default config is disabled: %+v", event)
return []*common.Config{}
if hintsIfc, found := event["hints"]; found {
hints, _ = hintsIfc.(common.MapStr)
}

// If explictly disabled, return nothing
if builder.IsDisabled(hints, l.config.Key) {
logp.Debug("hints.builder", "logs disabled by hint: %+v", event)
return []*common.Config{}
// Hint must be explicitly enabled when default_config sets enabled=false.
if !l.config.DefaultConfig.Enabled() && !builder.IsEnabled(hints, l.config.Key) ||
builder.IsDisabled(hints, l.config.Key) {
l.log.Debugw("Hints config is not enabled.", "autodiscover.event", event)
return nil
}

if inputConfig != nil {
configs := []*common.Config{}
if inputConfig := l.getInputsConfigs(hints); inputConfig != nil {
var configs []*common.Config
for _, cfg := range inputConfig {
if config, err := common.NewConfigFrom(cfg); err == nil {
configs = append(configs, config)
} else {
l.log.Warnw("Failed to create config from input.", "error", err)
}
}
logp.Debug("hints.builder", "generated config %+v", configs)
l.log.Debugf("Generated %d input configs from hint.", len(configs))
// Apply information in event to the template to generate the final config
return template.ApplyConfigTemplate(event, configs)
}
Expand Down
11 changes: 6 additions & 5 deletions filebeat/autodiscover/builder/hints/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/bus"
Expand Down Expand Up @@ -692,9 +693,9 @@ func TestGenerateHints(t *testing.T) {
for _, test := range tests {
// Configure path for modules access
abs, _ := filepath.Abs("../../..")
err := paths.InitPaths(&paths.Path{
require.NoError(t, paths.InitPaths(&paths.Path{
Home: abs,
})
}))

l, err := NewLogHints(test.config)
if err != nil {
Expand Down Expand Up @@ -927,17 +928,17 @@ func TestGenerateHintsWithPaths(t *testing.T) {

// Configure path for modules access
abs, _ := filepath.Abs("../../..")
err := paths.InitPaths(&paths.Path{
require.NoError(t, paths.InitPaths(&paths.Path{
Home: abs,
})
}))

l, err := NewLogHints(cfg)
if err != nil {
t.Fatal(err)
}

cfgs := l.CreateConfig(test.event)
assert.Equal(t, test.len, len(cfgs), test.msg)
require.Equal(t, test.len, len(cfgs), test.msg)
if test.len != 0 {
config := common.MapStr{}
err := cfgs[0].Unpack(&config)
Expand Down
77 changes: 65 additions & 12 deletions filebeat/docs/autodiscover-hints.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,9 @@ You can label Docker containers with useful info to decode logs structured as JS
[float]
==== Nomad

Nomad autodiscover provider supports hints using the https://www.nomadproject.io/docs/job-specification/meta.html[`meta` stanza]. To enable it just set `hints.enabled`:
Nomad autodiscover provider supports hints using the
https://www.nomadproject.io/docs/job-specification/meta.html[`meta` stanza]. To
enable it just set `hints.enabled`:

[source,yaml]
-----
Expand All @@ -269,7 +271,8 @@ filebeat.autodiscover:
hints.enabled: true
-----

You can configure the default config that will be launched when a new job is seen, like this:
You can configure the default config that will be launched when a new job is
seen, like this:

[source,yaml]
-----
Expand All @@ -278,31 +281,81 @@ filebeat.autodiscover:
- type: nomad
hints.enabled: true
hints.default_config:
type: nomad
type: log
paths:
- /var/lib/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.*
- /opt/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.*
-----

You can also disable default settings entirely, so only Jobs annotated like `co.elastic.logs/enabled: true`
will be retrieved:
You can also disable the default config such that only logs from jobs explicitly
annotated with `"co.elastic.logs/enabled" = "true"` will be collected:

[source,yaml]
-----
filebeat.autodiscover:
providers:
- type: nomad
hints.enabled: true
hints.default_config.enabled: false
hints.default_config:
enabled: false
type: log
paths:
- /opt/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.*
-----

You can annotate Nomad Jobs using the `meta` stanza with useful info to spin up {beatname_uc} inputs
or modules:
You can annotate Nomad Jobs using the `meta` stanza with useful info to spin up
{beatname_uc} inputs or modules:

[source,hcl]
-----
meta {
"co.elastic.logs/multiline.pattern" = "^\["
"co.elastic.logs/multiline.negate" = true
"co.elastic.logs/multiline.match" = after
"co.elastic.logs/enabled" = "true"
"co.elastic.logs/multiline.pattern" = "^\\["
"co.elastic.logs/multiline.negate" = "true"
"co.elastic.logs/multiline.match" = "after"
}
-----

If you are using autodiscover then in most cases you will want to use the
<<add-nomad-metadata,`add_nomad_metadata`>> processor to enrich events with
Nomad metadata. This example configures {{beatname_uc}} to connect to the local
Nomad agent over HTTPS and adds the Nomad allocation ID to all events from the
input. Later in the pipeline the `add_nomad_metadata` processor will use that ID
to enrich the event.

[source,yaml]
-----
filebeat.autodiscover:
providers:
- type: nomad
address: https://localhost:4646
hints.enabled: true
hints.default_config:
enabled: false <1>
type: log
paths:
- /opt/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.*
processors:
- add_fields: <2>
target: nomad
fields:
allocation.id: ${data.nomad.allocation.id}
processors:
- add_nomad_metadata: <3>
when.has_fields.fields: [nomad.allocation.id]
address: https://localhost:4646
default_indexers.enabled: false
default_matchers.enabled: false
indexers:
- allocation_uuid:
matchers:
- fields:
lookup_fields:
- 'nomad.allocation.id'
-----
<1> The default config is disabled meaning any task without the
`"co.elastic.logs/enabled" = "true"` metadata will be ignored.
<2> The `add_fields` processor populates the `nomad.allocation.id` field with
the Nomad allocation UUID.
<3> The `add_nomad_metadata` processor is configured at the global level so
that it is only instantiated one time which saves resources.
4 changes: 2 additions & 2 deletions filebeat/docs/autodiscover-nomad-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ filebeat.autodiscover:
- /var/lib/nomad/alloc/${data.nomad.allocation.id}/alloc/logs/${data.nomad.task.name}.*
-------------------------------------------------------------------------------------
WARNING: The `docker` input is currently not supported. Nomad doesn't expose the container id
associated with the allocation. Without the container id, there is no way of generating the proper
WARNING: The `docker` input is currently not supported. Nomad doesn't expose the container ID
associated with the allocation. Without the container ID, there is no way of generating the proper
path for reading the container's logs.
2 changes: 1 addition & 1 deletion heartbeat/tests/system/test_autodiscovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_docker(self):
proc = self.start_beat()

self.wait_until(lambda: self.log_contains(
re.compile('autodiscover.+Got a start event:', re.I)))
re.compile('autodiscover.+Got a start event', re.I)))

self.wait_until(lambda: self.output_count(lambda x: x >= 1))

Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (a *Autodiscover) worker() {
}

func (a *Autodiscover) handleStart(event bus.Event) bool {
a.logger.Debugf("Got a start event: %v", event)
a.logger.Debugw("Got a start event.", "autodiscover.event", event)

eventID := getID(event)
if eventID == "" {
Expand Down
26 changes: 15 additions & 11 deletions libbeat/autodiscover/builder/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

const logName = "autodiscover.builder"

// GetContainerID returns the id of a container
func GetContainerID(container common.MapStr) string {
id, _ := container["id"].(string)
Expand Down Expand Up @@ -92,7 +94,7 @@ func GetProcessors(hints common.MapStr, key string) []common.MapStr {
if str, ok := value.(string); ok {
cfg := common.MapStr{}
if err := json.Unmarshal([]byte(str), &cfg); err != nil {
logp.Debug("autodiscover.builder", "unable to unmarshal json due to error: %v", err)
logp.NewLogger(logName).Debugw("Unable to unmarshal json due to error", "error", err)
continue
}
proc[key] = cfg
Expand Down Expand Up @@ -124,7 +126,7 @@ func GetConfigs(hints common.MapStr, key, name string) []common.MapStr {

var configs []common.MapStr
for _, key := range nums {
rawCfg, _ := raw[key]
rawCfg := raw[key]
if config, ok := rawCfg.(common.MapStr); ok {
configs = append(configs, config)
}
Expand Down Expand Up @@ -159,23 +161,23 @@ func GetHintAsConfigs(hints common.MapStr, key string) []common.MapStr {
if str[0] != '[' {
cfg := common.MapStr{}
if err := json.Unmarshal([]byte(str), &cfg); err != nil {
logp.Debug("autodiscover.builder", "unable to unmarshal json due to error: %v", err)
logp.NewLogger(logName).Debugw("Unable to unmarshal json due to error", "error", err)
return nil
}
return []common.MapStr{cfg}
}

cfg := []common.MapStr{}
var cfg []common.MapStr
if err := json.Unmarshal([]byte(str), &cfg); err != nil {
logp.Debug("autodiscover.builder", "unable to unmarshal json due to error: %v", err)
logp.NewLogger(logName).Debugw("Unable to unmarshal json due to error", "error", err)
return nil
}
return cfg
}
return nil
}

// IsEnabled will return true when 'enabled' is **explicity** set to true
// IsEnabled will return true when 'enabled' is **explicitly** set to true.
func IsEnabled(hints common.MapStr, key string) bool {
if value, err := hints.GetValue(fmt.Sprintf("%s.enabled", key)); err == nil {
enabled, _ := strconv.ParseBool(value.(string))
Expand All @@ -185,14 +187,16 @@ func IsEnabled(hints common.MapStr, key string) bool {
return false
}

// IsDisabled will return true when 'enabled' key is **explicity** set to false
// IsDisabled will return true when 'enabled' is **explicitly** set to false.
func IsDisabled(hints common.MapStr, key string) bool {
if value, err := hints.GetValue(fmt.Sprintf("%s.enabled", key)); err == nil {
enabled, err := strconv.ParseBool(value.(string))
if err == nil {
logp.Debug("autodiscover.builder", "error parsing 'enabled' hint from: %+v", hints)
return !enabled
if err != nil {
logp.NewLogger(logName).Debugw("Error parsing 'enabled' hint.",
"error", err, "autodiscover.hints", hints)
return false
}
return !enabled
}

// keep reading disable (deprecated) for backwards compatibility
Expand Down Expand Up @@ -271,7 +275,7 @@ func GetHintsAsList(hints common.MapStr, key string) []common.MapStr {

var configs []common.MapStr
for _, key := range nums {
rawCfg, _ := raw[key]
rawCfg := raw[key]
if config, ok := rawCfg.(common.MapStr); ok {
configs = append(configs, config)
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/autodiscover/builder/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type builderPlugin struct {
builder autodiscover.BuilderConstructor
}

var pluginKey = "libbeat.autodiscover.builder"
const pluginKey = "libbeat.autodiscover.builder"

// Plugin accepts a BuilderConstructor to be registered as a plugin
func Plugin(name string, b autodiscover.BuilderConstructor) map[string][]interface{} {
Expand Down
17 changes: 16 additions & 1 deletion libbeat/docs/shared-autodiscover.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,21 @@ The `nomad` autodiscover provider has the following configuration settings:

`namespace`:: (Optional) Namespace to use. If not provided the `default` namespace is used.

`secret_id`:: (Optional) SecretID to use if ACL is enabled in Nomad.
`secret_id`:: (Optional) SecretID to use if ACL is enabled in Nomad. This is an
example ACL policy to apply to the token.

[source,hcl]
----
namespace "*" {
policy = "read"
}
node {
policy = "read"
}
agent {
policy = "read"
}
----

`node`:: (Optional) Specify the node to scope {beatname_lc} to in case it
cannot be accurately detected when `node` scope is used.
Expand All @@ -495,6 +509,7 @@ The `nomad` autodiscover provider has the following configuration settings:

`allow_stale`:: (Optional) allows any Nomad server (non-leader) to service a read. This normally
means that the local node where filebeat is allocated will service filebeat's requests.
Defaults to `true`.

include::../../{beatname_lc}/docs/autodiscover-nomad-config.asciidoc[]

Expand Down
5 changes: 1 addition & 4 deletions x-pack/libbeat/autodiscover/providers/nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ type Config struct {
func defaultConfig() *Config {
return &Config{
Address: "http://127.0.0.1:4646",
Region: "",
Namespace: "",
SecretID: "",
Scope: ScopeNode,
allowStale: true,
waitTime: 15 * time.Second,
Expand All @@ -53,7 +50,7 @@ func defaultConfig() *Config {
}
}

// Validate ensures correctness of config
// Validate ensures correctness of config.
func (c *Config) Validate() error {
// Make sure that prefix doesn't ends with a '.'
if c.Prefix[len(c.Prefix)-1] == '.' && c.Prefix != "." {
Expand Down
Loading

0 comments on commit 5ded62f

Please sign in to comment.