diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 9c72400e8e8..27149f790b9 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -100,6 +100,7 @@ agent: metrics_period: "" namespace: "" pprof: null + failure_threshold: null traces: true apm: hosts: diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 7512b989a98..7efcd155e15 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "runtime" + "strconv" "strings" "time" "unicode" @@ -48,6 +49,7 @@ const ( monitoringKey = "monitoring" useOutputKey = "use_output" monitoringMetricsPeriodKey = "metrics_period" + failureThresholdKey = "failure_threshold" monitoringOutput = "monitoring" defaultMonitoringNamespace = "default" agentName = "elastic-agent" @@ -60,6 +62,10 @@ const ( // metricset execution period used for the monitoring metrics inputs // we set this to 60s to reduce the load/data volume on the monitoring cluster defaultMetricsCollectionInterval = 60 * time.Second + + // metricset stream failure threshold before the stream is marked as DEGRADED + // to avoid marking the agent degraded for transient errors, we set the default threshold to 2 + defaultMetricsStreamFailureThreshold = uint(2) ) var ( @@ -131,6 +137,7 @@ func (b *BeatsMonitor) MonitoringConfig( monitoringOutputName := defaultOutputName metricsCollectionIntervalString := b.config.C.MetricsPeriod + failureThreshold := b.config.C.FailureThreshold if agentCfg, found := policy[agentKey]; found { // The agent section is required for feature flags cfg[agentKey] = agentCfg @@ -151,6 +158,25 @@ func (b *BeatsMonitor) MonitoringConfig( metricsCollectionIntervalString = metricsPeriodStr } } + + if policyFailureThresholdRaw, found := monitoringMap[failureThresholdKey]; found { + switch policyValue := policyFailureThresholdRaw.(type) { + case uint: + failureThreshold = &policyValue + case int: + unsignedValue := uint(policyValue) + failureThreshold = &unsignedValue + case string: + parsedPolicyValue, err := strconv.Atoi(policyValue) + if err != nil { + return nil, fmt.Errorf("failed to convert policy failure threshold string to int: %w", err) + } + uintPolicyValue := uint(parsedPolicyValue) + failureThreshold = &uintPolicyValue + default: + return nil, fmt.Errorf("unsupported type for policy failure threshold: %T", policyFailureThresholdRaw) + } + } } } } @@ -173,7 +199,7 @@ func (b *BeatsMonitor) MonitoringConfig( } if b.config.C.MonitorMetrics { - if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString); err != nil { + if err := b.injectMetricsInput(cfg, componentIDToBinary, components, componentIDPidMap, metricsCollectionIntervalString, failureThreshold); err != nil { return nil, errors.New(err, "failed to inject monitoring output") } } @@ -556,12 +582,20 @@ func (b *BeatsMonitor) injectMetricsInput( componentList []component.Component, existingStateServicePids map[string]uint64, metricsCollectionIntervalString string, + failureThreshold *uint, ) error { if metricsCollectionIntervalString == "" { metricsCollectionIntervalString = defaultMetricsCollectionInterval.String() } + + if failureThreshold == nil { + defaultValue := defaultMetricsStreamFailureThreshold + failureThreshold = &defaultValue + } monitoringNamespace := b.monitoringNamespace() fixedAgentName := strings.ReplaceAll(agentName, "-", "_") + // beatStreams and streams MUST be []interface{} even if in reality they are []map[string]interface{}: + // if those are declared as slices of maps the message "proto: invalid type: []map[string]interface{}" will pop up beatsStreams := make([]interface{}, 0, len(componentIDToBinary)) streams := []interface{}{ map[string]interface{}{ @@ -866,6 +900,25 @@ func (b *BeatsMonitor) injectMetricsInput( } + if failureThreshold != nil { + // add failure threshold to all streams and beatStreams + for _, s := range streams { + if streamMap, ok := s.(map[string]interface{}); ok { + streamMap[failureThresholdKey] = *failureThreshold + } else { + return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s) + } + + } + for _, s := range beatsStreams { + if streamMap, ok := s.(map[string]interface{}); ok { + streamMap[failureThresholdKey] = *failureThreshold + } else { + return fmt.Errorf("unable to set %s: %d in monitoring stream %q: unexpected type %T", failureThresholdKey, *failureThreshold, s, s) + } + } + } + inputs := []interface{}{ map[string]interface{}{ idKey: fmt.Sprintf("%s-beats", monitoringMetricsUnitID), diff --git a/internal/pkg/agent/application/monitoring/v1_monitor_test.go b/internal/pkg/agent/application/monitoring/v1_monitor_test.go index 813d74bbd92..4d3449d244c 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor_test.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor_test.go @@ -235,7 +235,7 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { // check the streams created for the input, should be a list of objects if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { - // loop over streams and cast to map[string]any to access keys + // loop over streams and access keys for _, rawStream := range input["streams"].([]any) { if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) { stream := rawStream.(map[string]any) @@ -258,6 +258,210 @@ func TestMonitoringConfigMetricsInterval(t *testing.T) { } } +func TestMonitoringConfigMetricsFailureThreshold(t *testing.T) { + + agentInfo, err := info.NewAgentInfo(context.Background(), false) + require.NoError(t, err, "Error creating agent info") + + sampleFiveErrorsStreamThreshold := uint(5) + sampleTenErrorsStreamThreshold := uint(10) + + tcs := []struct { + name string + monitoringCfg *monitoringConfig + policy map[string]any + expectedThreshold uint + }{ + { + name: "default failure threshold", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: defaultMetricsStreamFailureThreshold, + }, + { + name: "agent config failure threshold", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + FailureThreshold: &sampleFiveErrorsStreamThreshold, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: sampleFiveErrorsStreamThreshold, + }, + { + name: "policy failure threshold uint", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + FailureThreshold: &sampleFiveErrorsStreamThreshold, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + failureThresholdKey: sampleTenErrorsStreamThreshold, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: sampleTenErrorsStreamThreshold, + }, + { + name: "policy failure threshold int", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + FailureThreshold: &sampleFiveErrorsStreamThreshold, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + failureThresholdKey: 10, + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: sampleTenErrorsStreamThreshold, + }, + { + name: "policy failure threshold string", + monitoringCfg: &monitoringConfig{ + C: &monitoringcfg.MonitoringConfig{ + Enabled: true, + MonitorMetrics: true, + HTTP: &monitoringcfg.MonitoringHTTPConfig{ + Enabled: false, + }, + FailureThreshold: &sampleFiveErrorsStreamThreshold, + }, + }, + policy: map[string]any{ + "agent": map[string]any{ + "monitoring": map[string]any{ + "metrics": true, + "http": map[string]any{ + "enabled": false, + }, + failureThresholdKey: "10", + }, + }, + "outputs": map[string]any{ + "default": map[string]any{}, + }, + }, + expectedThreshold: sampleTenErrorsStreamThreshold, + }, + } + + for _, tc := range tcs { + + t.Run(tc.name, func(t *testing.T) { + b := &BeatsMonitor{ + enabled: true, + config: tc.monitoringCfg, + operatingSystem: runtime.GOOS, + agentInfo: agentInfo, + } + got, err := b.MonitoringConfig(tc.policy, nil, map[string]string{"foobeat": "filebeat"}, map[string]uint64{}) // put a componentID/binary mapping to have something in the beats monitoring input + assert.NoError(t, err) + + rawInputs, ok := got["inputs"] + require.True(t, ok, "monitoring config contains no input") + inputs, ok := rawInputs.([]any) + require.True(t, ok, "monitoring inputs are not a list") + marshaledInputs, err := yaml.Marshal(inputs) + if assert.NoError(t, err, "error marshaling monitoring inputs") { + t.Logf("marshaled monitoring inputs:\n%s\n", marshaledInputs) + } + + // loop over the created inputs + for _, i := range inputs { + input, ok := i.(map[string]any) + if assert.Truef(t, ok, "input is not represented as a map: %v", i) { + inputID := input["id"] + t.Logf("input %q", inputID) + // check the streams created for the input, should be a list of objects + if assert.Contains(t, input, "streams", "input %q does not contain any stream", inputID) && + assert.IsTypef(t, []any{}, input["streams"], "streams for input %q are not a list of objects", inputID) { + + // loop over streams and cast to map[string]any to access keys + for _, rawStream := range input["streams"].([]any) { + if assert.IsTypef(t, map[string]any{}, rawStream, "stream %v for input %q is not a map", rawStream, inputID) { + stream := rawStream.(map[string]any) + // check period and assert its value + streamID := stream["id"] + if assert.Containsf(t, stream, failureThresholdKey, "stream %q for input %q does not contain a failureThreshold", streamID, inputID) && + assert.IsType(t, uint(0), stream[failureThresholdKey], "period for stream %q of input %q is not represented as a string", streamID, inputID) { + actualFailureThreshold := stream[failureThresholdKey].(uint) + assert.Equalf(t, actualFailureThreshold, tc.expectedThreshold, "unexpected failure threshold for stream %q of input %q", streamID, inputID) + } + } + } + } + } + } + }) + } +} + func TestMonitoringConfigComponentFields(t *testing.T) { agentInfo, err := info.NewAgentInfo(context.Background(), false) require.NoError(t, err, "Error creating agent info") diff --git a/internal/pkg/core/monitoring/config/config.go b/internal/pkg/core/monitoring/config/config.go index 6688301fb1b..263caddc08f 100644 --- a/internal/pkg/core/monitoring/config/config.go +++ b/internal/pkg/core/monitoring/config/config.go @@ -21,17 +21,18 @@ const ( // MonitoringConfig describes a configuration of a monitoring type MonitoringConfig struct { - Enabled bool `yaml:"enabled" config:"enabled"` - MonitorLogs bool `yaml:"logs" config:"logs"` - MonitorMetrics bool `yaml:"metrics" config:"metrics"` - MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"` - LogMetrics bool `yaml:"-" config:"-"` - HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` - Namespace string `yaml:"namespace" config:"namespace"` - Pprof *PprofConfig `yaml:"pprof" config:"pprof"` - MonitorTraces bool `yaml:"traces" config:"traces"` - APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"` - Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"` + Enabled bool `yaml:"enabled" config:"enabled"` + MonitorLogs bool `yaml:"logs" config:"logs"` + MonitorMetrics bool `yaml:"metrics" config:"metrics"` + MetricsPeriod string `yaml:"metrics_period" config:"metrics_period"` + FailureThreshold *uint `yaml:"failure_threshold" config:"failure_threshold"` + LogMetrics bool `yaml:"-" config:"-"` + HTTP *MonitoringHTTPConfig `yaml:"http" config:"http"` + Namespace string `yaml:"namespace" config:"namespace"` + Pprof *PprofConfig `yaml:"pprof" config:"pprof"` + MonitorTraces bool `yaml:"traces" config:"traces"` + APM APMConfig `yaml:"apm,omitempty" config:"apm,omitempty" json:"apm,omitempty"` + Diagnostics Diagnostics `yaml:"diagnostics,omitempty" json:"diagnostics,omitempty"` } // MonitoringHTTPConfig is a config defining HTTP endpoint published by agent