Skip to content

Commit

Permalink
[metricbeat] - Allow metricsets to report their status via v2 protocol (
Browse files Browse the repository at this point in the history
#40025)

* fix: initial commit

* tests: add integration test cases

* fix: expand testing scenarios

* fix: add comments

* fix: move integration tests to system/process

* cleanup

* fix: ci

* fix: ci and typos

* chore: update changelog

* fix: add helper

* fix: remove extra space

* fix: ci

* fix: move integration tests to x-pack

* fix: add null check

* fix: ci

* fix: remove unused code

* fix: lint

* fix: lint and imports

* fix: ci windows

* inting for windows

* fix lint linux

* fix: go imports

* fix: switch to the generic way

* chore: make error descriptive

* fix: move status report after fetch

* fix: typo

* fix: remove nolint

---------

Co-authored-by: Pierre HILBERT <[email protected]>
  • Loading branch information
VihasMakwana and pierrehilbert authored Jul 24, 2024
1 parent 78a1ee9 commit c86330a
Show file tree
Hide file tree
Showing 9 changed files with 284 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619]
- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025]
- Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation
- Add support for Kibana status metricset in v8 format {pull}40275[40275]

Expand Down
13 changes: 8 additions & 5 deletions metricbeat/helper/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package helper

import (
"fmt"
"io/ioutil"
"io"
"net"
"net/http"
"net/http/httptest"
Expand All @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"
Expand All @@ -55,7 +56,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) {
for _, test := range tests {
t.Run(test.Name, func(t *testing.T) {
content := []byte(test.Content)
tmpfile, err := ioutil.TempFile("", "token")
tmpfile, err := os.CreateTemp("", "token")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -236,14 +237,14 @@ func TestOverUnixSocket(t *testing.T) {
fmt.Fprintf(w, "ehlo!")
})

go http.Serve(l, mux)
go http.Serve(l, mux) //nolint:all // Ignore the error

return l
}

for title, c := range cases {
t.Run(title, func(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "testsocket")
tmpDir, err := os.MkdirTemp("", "testsocket")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)

Expand All @@ -262,7 +263,7 @@ func TestOverUnixSocket(t *testing.T) {
r, err := h.FetchResponse()
require.NoError(t, err)
defer r.Body.Close()
content, err := ioutil.ReadAll(r.Body)
content, err := io.ReadAll(r.Body)
require.NoError(t, err)
assert.Equal(t, []byte("ehlo!"), content)
})
Expand Down Expand Up @@ -327,3 +328,5 @@ func (*dummyModule) Config() mb.ModuleConfig {
func (*dummyModule) UnpackConfig(interface{}) error {
return nil
}
func (dummyModule) UpdateStatus(_ status.Status, _ string) {}
func (dummyModule) SetStatusReporter(_ status.StatusReporter) {}
28 changes: 22 additions & 6 deletions metricbeat/mb/mb.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"net/url"
"time"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -62,9 +63,11 @@ const (

// Module is the common interface for all Module implementations.
type Module interface {
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent.
SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module.
}

// BaseModule implements the Module interface.
Expand All @@ -73,9 +76,10 @@ type Module interface {
// MetricSets, it can embed this type into another struct to satisfy the
// Module interface requirements.
type BaseModule struct {
name string
config ModuleConfig
rawConfig *conf.C
name string
config ModuleConfig
rawConfig *conf.C
statusReporter status.StatusReporter
}

func (m *BaseModule) String() string {
Expand All @@ -95,6 +99,18 @@ func (m *BaseModule) UnpackConfig(to interface{}) error {
return m.rawConfig.Unpack(to)
}

// UpdateStatus updates the status of the module. Reflected on elastic-agent.
func (m *BaseModule) UpdateStatus(status status.Status, msg string) {
if m.statusReporter != nil {
m.statusReporter.UpdateStatus(status, msg)
}
}

// SetStatusReporter sets the status repoter of the module.
func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) {
m.statusReporter = statusReporter
}

// WithConfig re-configures the module with the given raw configuration and returns a
// copy of the module.
// Intended to be called from module factories. Note that if metricsets are specified
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/mb/module/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand Down Expand Up @@ -123,3 +124,7 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup {
func (mr *runner) String() string {
return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets))
}

func (mr *runner) SetStatusReporter(reporter status.StatusReporter) {
mr.mod.SetStatusReporter(reporter)
}
9 changes: 9 additions & 0 deletions metricbeat/mb/module/runner_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/diagnostics"
"github.com/elastic/beats/v7/libbeat/management/status"
)

type runnerGroup struct {
Expand All @@ -40,6 +41,14 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner {
}
}

func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) {
for _, runner := range rg.runners {
if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok {
runnerWithStatus.SetStatusReporter(reporter)
}
}
}

func (rg *runnerGroup) Start() {
rg.startOnce.Do(func() {
for _, runner := range rg.runners {
Expand Down
8 changes: 8 additions & 0 deletions metricbeat/mb/module/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -146,6 +147,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event {
registry.Add(metricsPath, msw.Metrics(), monitoring.Full)
monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String())

msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name()))
msw.run(done, out)
}(msw)
}
Expand Down Expand Up @@ -253,14 +255,20 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) {
err := fetcher.Fetch(reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
case mb.ReportingMetricSetV2WithContext:
reporter.StartFetchTimer()
err := fetcher.Fetch(ctx, reporter.V2())
if err != nil {
reporter.V2().Error(err)
msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err))
logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err)
} else {
msw.module.UpdateStatus(status.Running, "")
}
default:
panic(fmt.Sprintf("unexpected fetcher type for %v", msw))
Expand Down
10 changes: 7 additions & 3 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,15 @@ that Metricbeat does it and with the same validations.
}
}
*/

package testing

import (
"context"
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -72,9 +74,11 @@ type TestModule struct {
RawConfig *conf.C
}

func (m *TestModule) Name() string { return m.ModName }
func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig }
func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) }
func (m *TestModule) Name() string { return m.ModName }
func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig }
func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) }
func (m *TestModule) UpdateStatus(_ status.Status, _ string) {}
func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {}

func NewTestModule(t testing.TB, config interface{}) *TestModule {
c, err := conf.NewConfigFrom(config)
Expand Down
4 changes: 4 additions & 0 deletions metricbeat/module/elasticsearch/node_stats/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package node_stats
import (
"testing"

"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/elasticsearch"
)
Expand Down Expand Up @@ -60,3 +61,6 @@ func (m mockModule) Config() mb.ModuleConfig {
func (m mockModule) UnpackConfig(to interface{}) error {
return nil
}

func (m mockModule) UpdateStatus(_ status.Status, _ string) {}
func (m mockModule) SetStatusReporter(_ status.StatusReporter) {}
Loading

0 comments on commit c86330a

Please sign in to comment.