Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.15](backport #40025) [metricbeat] - Allow metricsets to report their status via v2 protocol #40327

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619]
- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025]
- Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation

*Osquerybeat*
Expand Down
13 changes: 8 additions & 5 deletions metricbeat/helper/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package helper

import (
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -55,7 +56,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) {
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
content := []byte(test.Content)
tmpfile, err := ioutil.TempFile("", "token")
tmpfile, err := os.CreateTemp("", "token")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -236,14 +237,14 @@ func TestOverUnixSocket(t *testing.T) {
fmt.Fprintf(w, "ehlo!")
})

go http.Serve(l, mux)
go http.Serve(l, mux) //nolint:all // Ignore the error

return l
}

for title, c := range cases {
t.Run(title, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
tmpDir, err := os.MkdirTemp("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

Expand All @@ -262,7 +263,7 @@ func TestOverUnixSocket(t *testing.T) {
r, err := h.FetchResponse()
require.NoError(t, err)
defer r.Body.Close()
content, err := ioutil.ReadAll(r.Body)
content, err := io.ReadAll(r.Body)
require.NoError(t, err)
assert.Equal(t, []byte("ehlo!"), content)
})
Expand Down Expand Up @@ -327,3 +328,5 @@ func (*dummyModule) Config() mb.ModuleConfig {
func (*dummyModule) UnpackConfig(interface{}) error {
return nil
}
func (dummyModule) UpdateStatus(_ status.Status, _ string) {}
func (dummyModule) SetStatusReporter(_ status.StatusReporter) {}
28 changes: 22 additions & 6 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/url"
"time"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -62,9 +63,11 @@ const (

// Module is the common interface for all Module implementations.
type Module interface {
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent.
SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module.
}

// BaseModule implements the Module interface.
Expand All @@ -73,9 +76,10 @@ type Module interface {
// MetricSets, it can embed this type into another struct to satisfy the
// Module interface requirements.
type BaseModule struct {
name string
config ModuleConfig
rawConfig *conf.C
name string
config ModuleConfig
rawConfig *conf.C
statusReporter status.StatusReporter
}

func (m *BaseModule) String() string {
Expand All @@ -95,6 +99,18 @@ func (m *BaseModule) UnpackConfig(to interface{}) error {
return m.rawConfig.Unpack(to)
}

// UpdateStatus updates the status of the module. Reflected on elastic-agent.
func (m *BaseModule) UpdateStatus(status status.Status, msg string) {
if m.statusReporter != nil {
m.statusReporter.UpdateStatus(status, msg)
}
}

// SetStatusReporter sets the status repoter of the module.
func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) {
m.statusReporter = statusReporter
}

// WithConfig re-configures the module with the given raw configuration and returns a
// copy of the module.
// Intended to be called from module factories. Note that if metricsets are specified
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/mb/module/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand Down Expand Up @@ -123,3 +124,7 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup {
func (mr *runner) String() string {
return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets))
}

func (mr *runner) SetStatusReporter(reporter status.StatusReporter) {
mr.mod.SetStatusReporter(reporter)
}
9 changes: 9 additions & 0 deletions metricbeat/mb/module/runner_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/management/status"
)

type runnerGroup struct {
Expand All @@ -40,6 +41,14 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner {
}
}

func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) {
for _, runner := range rg.runners {
if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
runnerWithStatus.SetStatusReporter(reporter)
}
}
}

func (rg *runnerGroup) Start() {
rg.startOnce.Do(func() {
for _, runner := range rg.runners {
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -146,6 +147,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event {
registry.Add(metricsPath, msw.Metrics(), monitoring.Full)
monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String())

msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name()))
msw.run(done, out)
}(msw)
}
Expand Down Expand Up @@ -253,14 +255,20 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
Expand Down
10 changes: 7 additions & 3 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ that Metricbeat does it and with the same validations.
}
}
*/

package testing

import (
"context"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -72,9 +74,11 @@ type TestModule struct {
RawConfig *conf.C
}

func (m *TestModule) Name() string { return m.ModName }
func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig }
func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) }
func (m *TestModule) Name() string { return m.ModName }
func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig }
func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) }
func (m *TestModule) UpdateStatus(_ status.Status, _ string) {}
func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {}

func NewTestModule(t testing.TB, config interface{}) *TestModule {
c, err := conf.NewConfigFrom(config)
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/elasticsearch/node_stats/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package node_stats
import (
"testing"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -60,3 +61,6 @@ func (m mockModule) Config() mb.ModuleConfig {
func (m mockModule) UnpackConfig(to interface{}) error {
return nil
}

func (m mockModule) UpdateStatus(_ status.Status, _ string) {}
func (m mockModule) SetStatusReporter(_ status.StatusReporter) {}
Loading
Loading