Skip to content

Commit

Permalink
[metricbeat][system/process, system/process_summary]: mark module as …
Browse files Browse the repository at this point in the history
…healthy if metrics are partially filled (#40565)

* chore: mark module as healthy if metrics are partially filled

* chore: mark module as healthy if metrics are partially filled

* fix: use errors.As

* fix: fix lint

* Update metricbeat/mb/event.go

Co-authored-by: Mauri de Souza Meneguzzo <[email protected]>

* fix: changelog

---------

Co-authored-by: Mauri de Souza Meneguzzo <[email protected]>
  • Loading branch information
VihasMakwana and mauri870 authored Aug 23, 2024
1 parent 94ca509 commit 1f033c9
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for Kibana status metricset in v8 format {pull}40275[40275]
- Add new metrics for the vSphere Datastore metricset. {pull}40441[40441]
- Update metrics for the vSphere Host metricset. {pull}40429[40429]
- Mark system process metricsets as running if metrics are partially available {pull}40565[40565]
- Added back `elasticsearch.node.stats.jvm.mem.pools.*` to the `node_stats` metricset {pull}40571[40571]

*Osquerybeat*
Expand Down
15 changes: 15 additions & 0 deletions metricbeat/mb/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,18 @@ func tryToMapStr(v interface{}) (mapstr.M, bool) {
return nil, false
}
}

// PartialMetricsError indicates that metrics are only partially filled.
// This will be removed once we fix the underlying problem.
// See https://github.com/elastic/beats/issues/40542 for more details.
type PartialMetricsError struct {
Err error
}

func (p PartialMetricsError) Error() string {
return p.Err.Error()
}

func (p PartialMetricsError) Unwrap() error {
return p.Err
}
17 changes: 15 additions & 2 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package module

import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
Expand Down Expand Up @@ -255,7 +256,13 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
if errors.As(err, &mb.PartialMetricsError{}) {
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
} else {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
Expand All @@ -265,7 +272,13 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
if errors.As(err, &mb.PartialMetricsError{}) {
// mark module as running if metrics are partially available and display the error message
msw.module.UpdateStatus(status.Running, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
} else {
// mark it as degraded for any other issue encountered
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %v", msw.module.Name(), msw.MetricSet.Name(), err))
}
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/system/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
if err != nil && !errors.Is(err, process.NonFatalErr{}) {
// return only if the error is fatal in nature
return fmt.Errorf("process stats: %w", err)
} else if (err != nil && errors.Is(err, process.NonFatalErr{})) {
err = mb.PartialMetricsError{Err: err}
}

for evtI := range procs {
Expand Down
2 changes: 2 additions & 0 deletions metricbeat/module/system/process_summary/process_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
if degradeErr != nil && !errors.Is(degradeErr, process.NonFatalErr{}) {
// return only if the error is fatal in nature
return fmt.Errorf("error fetching process list: %w", degradeErr)
} else if (degradeErr != nil && errors.Is(degradeErr, process.NonFatalErr{})) {
degradeErr = mb.PartialMetricsError{Err: degradeErr}
}

procStates := map[string]int{}
Expand Down

0 comments on commit 1f033c9

Please sign in to comment.