Skip to content

Commit

Permalink
Add failureThreshold to elastic-agent self-monitoring config (#5999)
Browse files Browse the repository at this point in the history
* Add failureThreshold to elastic-agent self-monitoring config

(cherry picked from commit 2a46509)
  • Loading branch information
pchila authored and mergify[bot] committed Nov 20, 2024
1 parent 730233b commit 6f2fa1d
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ agent:
metrics_period: ""
namespace: ""
pprof: null
failure_threshold: null
traces: true
apm:
hosts:
Expand Down
55 changes: 54 additions & 1 deletion internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"unicode"
Expand Down Expand Up @@ -48,6 +49,7 @@ const (
monitoringKey = "monitoring"
useOutputKey = "use_output"
monitoringMetricsPeriodKey = "metrics_period"
failureThresholdKey = "failure_threshold"
monitoringOutput = "monitoring"
defaultMonitoringNamespace = "default"
agentName = "elastic-agent"
Expand All @@ -60,6 +62,10 @@ const (
// metricset execution period used for the monitoring metrics inputs
// we set this to 60s to reduce the load/data volume on the monitoring cluster
defaultMetricsCollectionInterval = 60 * time.Second

// metricset stream failure threshold before the stream is marked as DEGRADED
// to avoid marking the agent degraded for transient errors, we set the default threshold to 2
defaultMetricsStreamFailureThreshold = uint(2)
)

var (
Expand Down Expand Up @@ -131,6 +137,7 @@ func (b *BeatsMonitor) MonitoringConfig(

monitoringOutputName := defaultOutputName
metricsCollectionIntervalString := b.config.C.MetricsPeriod
failureThreshold := b.config.C.FailureThreshold
if agentCfg, found := policy[agentKey]; found {
// The agent section is required for feature flags
cfg[agentKey] = agentCfg
Expand All @@ -151,6 +158,25 @@ func (b *BeatsMonitor) MonitoringConfig(
metricsCollectionIntervalString = metricsPeriodStr
}
}

if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found {
switch policyValue := policyFailureThresholdRaw.(type) {
case uint:
failureThreshold = &policyValue
case int:
unsignedValue := uint(policyValue)
failureThreshold = &unsignedValue
case string:
parsedPolicyValue, err := strconv.Atoi(policyValue)
if err != nil {
return nil, fmt.Errorf("failed to convert policy failure threshold string to int: %w", err)
}
uintPolicyValue := uint(parsedPolicyValue)
failureThreshold = &uintPolicyValue
default:
return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw)
}
}
}
}
}
Expand All @@ -173,7 +199,7 @@ func (b *BeatsMonitor) MonitoringConfig(
}

if b.config.C.MonitorMetrics {
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString); err != nil {
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString, failureThreshold); err != nil {
return nil, errors.New(err, "failed to inject monitoring output")
}
}
Expand Down Expand Up @@ -556,12 +582,20 @@ func (b *BeatsMonitor) injectMetricsInput(
componentList []component.Component,
existingStateServicePids map[string]uint64,
metricsCollectionIntervalString string,
failureThreshold *uint,
) error {
if metricsCollectionIntervalString == "" {
metricsCollectionIntervalString = defaultMetricsCollectionInterval.String()
}

if failureThreshold == nil {
defaultValue := defaultMetricsStreamFailureThreshold
failureThreshold = &defaultValue
}
monitoringNamespace := b.monitoringNamespace()
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")
// beatStreams and streams MUST be []interface{} even if in reality they are []map[string]interface{}:
// if those are declared as slices of maps the message "proto: invalid type: []map[string]interface{}" will pop up
beatsStreams := make([]interface{}, 0, len(componentIDToBinary))
streams := []interface{}{
map[string]interface{}{
Expand Down Expand Up @@ -866,6 +900,25 @@ func (b *BeatsMonitor) injectMetricsInput(

}

if failureThreshold != nil {
// add failure threshold to all streams and beatStreams
for _, s := range streams {
if streamMap, ok := s.(map[string]interface{}); ok {
streamMap[failureThresholdKey] = *failureThreshold
} else {
return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s)
}

}
for _, s := range beatsStreams {
if streamMap, ok := s.(map[string]interface{}); ok {
streamMap[failureThresholdKey] = *failureThreshold
} else {
return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s)
}
}
}

inputs := []interface{}{
map[string]interface{}{
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
Expand Down
206 changes: 205 additions & 1 deletion internal/pkg/agent/application/monitoring/v1_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) {
// check the streams created for the input, should be a list of objects
if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) &&
assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) {
// loop over streams and cast to map[string]any to access keys
// loop over streams and access keys
for _, rawStream := range input["streams"].([]any) {
if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) {
stream := rawStream.(map[string]any)
Expand All @@ -258,6 +258,210 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) {
}
}

func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) {

agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")

sampleFiveErrorsStreamThreshold := uint(5)
sampleTenErrorsStreamThreshold := uint(10)

tcs := []struct {
name string
monitoringCfg *monitoringConfig
policy map[string]any
expectedThreshold uint
}{
{
name: "default failure threshold",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: defaultMetricsStreamFailureThreshold,
},
{
name: "agent config failure threshold",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleFiveErrorsStreamThreshold,
},
{
name: "policy failure threshold uint",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: sampleTenErrorsStreamThreshold,
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
{
name: "policy failure threshold int",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: 10,
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
{
name: "policy failure threshold string",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: "10",
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
}

for _, tc := range tcs {

t.Run(tc.name, func(t *testing.T) {
b := &BeatsMonitor{
enabled: true,
config: tc.monitoringCfg,
operatingSystem: runtime.GOOS,
agentInfo: agentInfo,
}
got, err := b.MonitoringConfig(tc.policy, nil, map[string]string{"foobeat": "filebeat"}, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input
assert.NoError(t, err)

rawInputs, ok := got["inputs"]
require.True(t, ok, "monitoring config contains no input")
inputs, ok := rawInputs.([]any)
require.True(t, ok, "monitoring inputs are not a list")
marshaledInputs, err := yaml.Marshal(inputs)
if assert.NoError(t, err, "error marshaling monitoring inputs") {
t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs)
}

// loop over the created inputs
for _, i := range inputs {
input, ok := i.(map[string]any)
if assert.Truef(t, ok, "input is not represented as a map: %v", i) {
inputID := input["id"]
t.Logf("input %q", inputID)
// check the streams created for the input, should be a list of objects
if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) &&
assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) {

// loop over streams and cast to map[string]any to access keys
for _, rawStream := range input["streams"].([]any) {
if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) {
stream := rawStream.(map[string]any)
// check period and assert its value
streamID := stream["id"]
if assert.Containsf(t, stream, failureThresholdKey, "stream %q for input %q does not contain a failureThreshold", streamID, inputID) &&
assert.IsType(t, uint(0), stream[failureThresholdKey], "period for stream %q of input %q is not represented as a string", streamID, inputID) {
actualFailureThreshold := stream[failureThresholdKey].(uint)
assert.Equalf(t, actualFailureThreshold, tc.expectedThreshold, "unexpected failure threshold for stream %q of input %q", streamID, inputID)
}
}
}
}
}
}
})
}
}

func TestMonitoringConfigComponentFields(t *testing.T) {
agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")
Expand Down
23 changes: 12 additions & 11 deletions internal/pkg/core/monitoring/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ const (

// MonitoringConfig describes a configuration of a monitoring
type MonitoringConfig struct {
Enabled bool `yaml:"enabled" config:"enabled"`
MonitorLogs bool `yaml:"logs" config:"logs"`
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"`
LogMetrics bool `yaml:"-" config:"-"`
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
Namespace string `yaml:"namespace" config:"namespace"`
Pprof *PprofConfig `yaml:"pprof" config:"pprof"`
MonitorTraces bool `yaml:"traces" config:"traces"`
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
Enabled bool `yaml:"enabled" config:"enabled"`
MonitorLogs bool `yaml:"logs" config:"logs"`
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"`
FailureThreshold *uint `yaml:"failure_threshold" config:"failure_threshold"`
LogMetrics bool `yaml:"-" config:"-"`
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
Namespace string `yaml:"namespace" config:"namespace"`
Pprof *PprofConfig `yaml:"pprof" config:"pprof"`
MonitorTraces bool `yaml:"traces" config:"traces"`
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
}

// MonitoringHTTPConfig is a config defining HTTP endpoint published by agent
Expand Down

0 comments on commit 6f2fa1d

Please sign in to comment.