Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #5999) Add failureThreshold to elastic-agent self-monitoring config #6090

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ agent:
metrics_period: ""
namespace: ""
pprof: null
failure_threshold: null
traces: true
apm:
hosts:
Expand Down
55 changes: 54 additions & 1 deletion internal/pkg/agent/application/monitoring/v1_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
"unicode"
Expand Down Expand Up @@ -48,6 +49,7 @@ const (
monitoringKey = "monitoring"
useOutputKey = "use_output"
monitoringMetricsPeriodKey = "metrics_period"
failureThresholdKey = "failure_threshold"
monitoringOutput = "monitoring"
defaultMonitoringNamespace = "default"
agentName = "elastic-agent"
Expand All @@ -60,6 +62,10 @@ const (
// metricset execution period used for the monitoring metrics inputs
// we set this to 60s to reduce the load/data volume on the monitoring cluster
defaultMetricsCollectionInterval = 60 * time.Second

// metricset stream failure threshold before the stream is marked as DEGRADED
// to avoid marking the agent degraded for transient errors, we set the default threshold to 2
defaultMetricsStreamFailureThreshold = uint(2)
)

var (
Expand Down Expand Up @@ -131,6 +137,7 @@ func (b *BeatsMonitor) MonitoringConfig(

monitoringOutputName := defaultOutputName
metricsCollectionIntervalString := b.config.C.MetricsPeriod
failureThreshold := b.config.C.FailureThreshold
if agentCfg, found := policy[agentKey]; found {
// The agent section is required for feature flags
cfg[agentKey] = agentCfg
Expand All @@ -151,6 +158,25 @@ func (b *BeatsMonitor) MonitoringConfig(
metricsCollectionIntervalString = metricsPeriodStr
}
}

if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found {
switch policyValue := policyFailureThresholdRaw.(type) {
case uint:
failureThreshold = &policyValue
case int:
unsignedValue := uint(policyValue)
failureThreshold = &unsignedValue
case string:
parsedPolicyValue, err := strconv.Atoi(policyValue)
if err != nil {
return nil, fmt.Errorf("failed to convert policy failure threshold string to int: %w", err)
}
uintPolicyValue := uint(parsedPolicyValue)
failureThreshold = &uintPolicyValue
default:
return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw)
}
}
}
}
}
Expand All @@ -173,7 +199,7 @@ func (b *BeatsMonitor) MonitoringConfig(
}

if b.config.C.MonitorMetrics {
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString); err != nil {
if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString, failureThreshold); err != nil {
return nil, errors.New(err, "failed to inject monitoring output")
}
}
Expand Down Expand Up @@ -556,12 +582,20 @@ func (b *BeatsMonitor) injectMetricsInput(
componentList []component.Component,
existingStateServicePids map[string]uint64,
metricsCollectionIntervalString string,
failureThreshold *uint,
) error {
if metricsCollectionIntervalString == "" {
metricsCollectionIntervalString = defaultMetricsCollectionInterval.String()
}

if failureThreshold == nil {
defaultValue := defaultMetricsStreamFailureThreshold
failureThreshold = &defaultValue
}
monitoringNamespace := b.monitoringNamespace()
fixedAgentName := strings.ReplaceAll(agentName, "-", "_")
// beatStreams and streams MUST be []interface{} even if in reality they are []map[string]interface{}:
// if those are declared as slices of maps the message "proto: invalid type: []map[string]interface{}" will pop up
beatsStreams := make([]interface{}, 0, len(componentIDToBinary))
streams := []interface{}{
map[string]interface{}{
Expand Down Expand Up @@ -866,6 +900,25 @@ func (b *BeatsMonitor) injectMetricsInput(

}

if failureThreshold != nil {
// add failure threshold to all streams and beatStreams
for _, s := range streams {
if streamMap, ok := s.(map[string]interface{}); ok {
streamMap[failureThresholdKey] = *failureThreshold
} else {
return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s)
}

}
for _, s := range beatsStreams {
if streamMap, ok := s.(map[string]interface{}); ok {
streamMap[failureThresholdKey] = *failureThreshold
} else {
return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s)
}
}
}

inputs := []interface{}{
map[string]interface{}{
idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID),
Expand Down
206 changes: 205 additions & 1 deletion internal/pkg/agent/application/monitoring/v1_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) {
// check the streams created for the input, should be a list of objects
if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) &&
assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) {
// loop over streams and cast to map[string]any to access keys
// loop over streams and access keys
for _, rawStream := range input["streams"].([]any) {
if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) {
stream := rawStream.(map[string]any)
Expand All @@ -258,6 +258,210 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) {
}
}

func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) {

agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")

sampleFiveErrorsStreamThreshold := uint(5)
sampleTenErrorsStreamThreshold := uint(10)

tcs := []struct {
name string
monitoringCfg *monitoringConfig
policy map[string]any
expectedThreshold uint
}{
{
name: "default failure threshold",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: defaultMetricsStreamFailureThreshold,
},
{
name: "agent config failure threshold",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleFiveErrorsStreamThreshold,
},
{
name: "policy failure threshold uint",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: sampleTenErrorsStreamThreshold,
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
{
name: "policy failure threshold int",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: 10,
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
{
name: "policy failure threshold string",
monitoringCfg: &monitoringConfig{
C: &monitoringcfg.MonitoringConfig{
Enabled: true,
MonitorMetrics: true,
HTTP: &monitoringcfg.MonitoringHTTPConfig{
Enabled: false,
},
FailureThreshold: &sampleFiveErrorsStreamThreshold,
},
},
policy: map[string]any{
"agent": map[string]any{
"monitoring": map[string]any{
"metrics": true,
"http": map[string]any{
"enabled": false,
},
failureThresholdKey: "10",
},
},
"outputs": map[string]any{
"default": map[string]any{},
},
},
expectedThreshold: sampleTenErrorsStreamThreshold,
},
}

for _, tc := range tcs {

t.Run(tc.name, func(t *testing.T) {
b := &BeatsMonitor{
enabled: true,
config: tc.monitoringCfg,
operatingSystem: runtime.GOOS,
agentInfo: agentInfo,
}
got, err := b.MonitoringConfig(tc.policy, nil, map[string]string{"foobeat": "filebeat"}, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input
assert.NoError(t, err)

rawInputs, ok := got["inputs"]
require.True(t, ok, "monitoring config contains no input")
inputs, ok := rawInputs.([]any)
require.True(t, ok, "monitoring inputs are not a list")
marshaledInputs, err := yaml.Marshal(inputs)
if assert.NoError(t, err, "error marshaling monitoring inputs") {
t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs)
}

// loop over the created inputs
for _, i := range inputs {
input, ok := i.(map[string]any)
if assert.Truef(t, ok, "input is not represented as a map: %v", i) {
inputID := input["id"]
t.Logf("input %q", inputID)
// check the streams created for the input, should be a list of objects
if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) &&
assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) {

// loop over streams and cast to map[string]any to access keys
for _, rawStream := range input["streams"].([]any) {
if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) {
stream := rawStream.(map[string]any)
// check period and assert its value
streamID := stream["id"]
if assert.Containsf(t, stream, failureThresholdKey, "stream %q for input %q does not contain a failureThreshold", streamID, inputID) &&
assert.IsType(t, uint(0), stream[failureThresholdKey], "period for stream %q of input %q is not represented as a string", streamID, inputID) {
actualFailureThreshold := stream[failureThresholdKey].(uint)
assert.Equalf(t, actualFailureThreshold, tc.expectedThreshold, "unexpected failure threshold for stream %q of input %q", streamID, inputID)
}
}
}
}
}
}
})
}
}

func TestMonitoringConfigComponentFields(t *testing.T) {
agentInfo, err := info.NewAgentInfo(context.Background(), false)
require.NoError(t, err, "Error creating agent info")
Expand Down
23 changes: 12 additions & 11 deletions internal/pkg/core/monitoring/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@ const (

// MonitoringConfig describes a configuration of a monitoring
type MonitoringConfig struct {
Enabled bool `yaml:"enabled" config:"enabled"`
MonitorLogs bool `yaml:"logs" config:"logs"`
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"`
LogMetrics bool `yaml:"-" config:"-"`
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
Namespace string `yaml:"namespace" config:"namespace"`
Pprof *PprofConfig `yaml:"pprof" config:"pprof"`
MonitorTraces bool `yaml:"traces" config:"traces"`
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
Enabled bool `yaml:"enabled" config:"enabled"`
MonitorLogs bool `yaml:"logs" config:"logs"`
MonitorMetrics bool `yaml:"metrics" config:"metrics"`
MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"`
FailureThreshold *uint `yaml:"failure_threshold" config:"failure_threshold"`
LogMetrics bool `yaml:"-" config:"-"`
HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"`
Namespace string `yaml:"namespace" config:"namespace"`
Pprof *PprofConfig `yaml:"pprof" config:"pprof"`
MonitorTraces bool `yaml:"traces" config:"traces"`
APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"`
Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"`
}

// MonitoringHTTPConfig is a config defining HTTP endpoint published by agent
Expand Down