From eb5307ab0b2804921a49c41177b0e751e5fd2b5b Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Wed, 24 Jul 2024 15:04:12 +0530 Subject: [PATCH] [metricbeat] - Allow metricsets to report their status via v2 protocol (#40025) * fix: initial commit * tests: add integration test cases * fix: expand testing scenarios * fix: add comments * fix: move integration tests to system/process * cleanup * fix: ci * fix: ci and typos * chore: update changelog * fix: add helper * fix: remove extra space * fix: ci * fix: move integration tests to x-pack * fix: add null check * fix: ci * fix: remove unused code * fix: lint * fix: lint and imports * fix: ci windows * inting for windows * fix lint linux * fix: go imports * fix: switch to the generic way * chore: make error descriptive * fix: move status report after fetch * fix: typo * fix: remove nolint --------- Co-authored-by: Pierre HILBERT (cherry picked from commit c86330a88d6aba199863ad250fcbea74671d01bc) --- CHANGELOG.next.asciidoc | 1 + metricbeat/helper/http_test.go | 13 +- metricbeat/mb/mb.go | 28 ++- metricbeat/mb/module/runner.go | 5 + metricbeat/mb/module/runner_group.go | 9 + metricbeat/mb/module/wrapper.go | 8 + metricbeat/mb/testing/modules.go | 10 +- .../elasticsearch/node_stats/data_test.go | 4 + .../mbtest/system/process_integration_test.go | 220 ++++++++++++++++++ 9 files changed, 284 insertions(+), 14 deletions(-) create mode 100644 x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cd81cd32fff..ba19d526d4f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -53,6 +53,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553] - Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619] +- Allow metricsets to report their status via control v2 protocol. {pull}40025[40025] - Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation *Osquerybeat* diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index 2fbfea0d1ad..2f5a43d2dc4 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -19,7 +19,7 @@ package helper import ( "fmt" - "io/ioutil" + "io" "net" "net/http" "net/http/httptest" @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/mb/parse" @@ -55,7 +56,7 @@ func TestGetAuthHeaderFromToken(t *testing.T) { for _, test := range tests { t.Run(test.Name, func(t *testing.T) { content := []byte(test.Content) - tmpfile, err := ioutil.TempFile("", "token") + tmpfile, err := os.CreateTemp("", "token") if err != nil { t.Fatal(err) } @@ -236,14 +237,14 @@ func TestOverUnixSocket(t *testing.T) { fmt.Fprintf(w, "ehlo!") }) - go http.Serve(l, mux) + go http.Serve(l, mux) //nolint:all // Ignore the error return l } for title, c := range cases { t.Run(title, func(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "testsocket") + tmpDir, err := os.MkdirTemp("", "testsocket") require.NoError(t, err) defer os.RemoveAll(tmpDir) @@ -262,7 +263,7 @@ func TestOverUnixSocket(t *testing.T) { r, err := h.FetchResponse() require.NoError(t, err) defer r.Body.Close() - content, err := ioutil.ReadAll(r.Body) + content, err := io.ReadAll(r.Body) require.NoError(t, err) assert.Equal(t, []byte("ehlo!"), content) }) @@ -327,3 +328,5 @@ func (*dummyModule) Config() mb.ModuleConfig { func (*dummyModule) UnpackConfig(interface{}) error { return nil } +func (dummyModule) UpdateStatus(_ status.Status, _ string) {} +func (dummyModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/metricbeat/mb/mb.go b/metricbeat/mb/mb.go index 7e18dc9029d..0be1db7cef3 100644 --- a/metricbeat/mb/mb.go +++ b/metricbeat/mb/mb.go @@ -27,6 +27,7 @@ import ( "net/url" "time" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/helper/dialer" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -62,9 +63,11 @@ const ( // Module is the common interface for all Module implementations. type Module interface { - Name() string // Name returns the name of the Module. - Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. - UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. + Name() string // Name returns the name of the Module. + Config() ModuleConfig // Config returns the ModuleConfig used to create the Module. + UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object. + UpdateStatus(status status.Status, msg string) // UpdateStatus updates the status of the module. Reflected on elastic-agent. + SetStatusReporter(statusReporter status.StatusReporter) // SetStatusReporter updates the status reporter for the given module. } // BaseModule implements the Module interface. @@ -73,9 +76,10 @@ type Module interface { // MetricSets, it can embed this type into another struct to satisfy the // Module interface requirements. type BaseModule struct { - name string - config ModuleConfig - rawConfig *conf.C + name string + config ModuleConfig + rawConfig *conf.C + statusReporter status.StatusReporter } func (m *BaseModule) String() string { @@ -95,6 +99,18 @@ func (m *BaseModule) UnpackConfig(to interface{}) error { return m.rawConfig.Unpack(to) } +// UpdateStatus updates the status of the module. Reflected on elastic-agent. +func (m *BaseModule) UpdateStatus(status status.Status, msg string) { + if m.statusReporter != nil { + m.statusReporter.UpdateStatus(status, msg) + } +} + +// SetStatusReporter sets the status repoter of the module. +func (m *BaseModule) SetStatusReporter(statusReporter status.StatusReporter) { + m.statusReporter = statusReporter +} + // WithConfig re-configures the module with the given raw configuration and returns a // copy of the module. // Intended to be called from module factories. Note that if metricsets are specified diff --git a/metricbeat/mb/module/runner.go b/metricbeat/mb/module/runner.go index 1b0a621d705..aedb443e9a8 100644 --- a/metricbeat/mb/module/runner.go +++ b/metricbeat/mb/module/runner.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/elastic-agent-libs/monitoring" ) @@ -123,3 +124,7 @@ func (mr *runner) Diagnostics() []diagnostics.DiagnosticSetup { func (mr *runner) String() string { return fmt.Sprintf("%s [metricsets=%d]", mr.mod.Name(), len(mr.mod.metricSets)) } + +func (mr *runner) SetStatusReporter(reporter status.StatusReporter) { + mr.mod.SetStatusReporter(reporter) +} diff --git a/metricbeat/mb/module/runner_group.go b/metricbeat/mb/module/runner_group.go index e020cd87d55..b4d92d29f56 100644 --- a/metricbeat/mb/module/runner_group.go +++ b/metricbeat/mb/module/runner_group.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/cfgfile" "github.com/elastic/beats/v7/libbeat/common/diagnostics" + "github.com/elastic/beats/v7/libbeat/management/status" ) type runnerGroup struct { @@ -40,6 +41,14 @@ func newRunnerGroup(runners []cfgfile.Runner) cfgfile.Runner { } } +func (rg *runnerGroup) SetStatusReporter(reporter status.StatusReporter) { + for _, runner := range rg.runners { + if runnerWithStatus, ok := runner.(status.WithStatusReporter); ok { + runnerWithStatus.SetStatusReporter(reporter) + } + } +} + func (rg *runnerGroup) Start() { rg.startOnce.Do(func() { for _, runner := range rg.runners { diff --git a/metricbeat/mb/module/wrapper.go b/metricbeat/mb/module/wrapper.go index d41bdf01497..feaf11363fa 100644 --- a/metricbeat/mb/module/wrapper.go +++ b/metricbeat/mb/module/wrapper.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -146,6 +147,7 @@ func (mw *Wrapper) Start(done <-chan struct{}) <-chan beat.Event { registry.Add(metricsPath, msw.Metrics(), monitoring.Full) monitoring.NewString(msw.Metrics(), "starttime").Set(common.Time(time.Now()).String()) + msw.module.UpdateStatus(status.Starting, fmt.Sprintf("%s/%s is starting", msw.module.Name(), msw.Name())) msw.run(done, out) }(msw) } @@ -253,14 +255,20 @@ func (msw *metricSetWrapper) fetch(ctx context.Context, reporter reporter) { err := fetcher.Fetch(reporter.V2()) if err != nil { reporter.V2().Error(err) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.module.UpdateStatus(status.Running, "") } case mb.ReportingMetricSetV2WithContext: reporter.StartFetchTimer() err := fetcher.Fetch(ctx, reporter.V2()) if err != nil { reporter.V2().Error(err) + msw.module.UpdateStatus(status.Degraded, fmt.Sprintf("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.MetricSet.Name(), err)) logp.Err("Error fetching data for metricset %s.%s: %s", msw.module.Name(), msw.Name(), err) + } else { + msw.module.UpdateStatus(status.Running, "") } default: panic(fmt.Sprintf("unexpected fetcher type for %v", msw)) diff --git a/metricbeat/mb/testing/modules.go b/metricbeat/mb/testing/modules.go index 8c6e09df537..736bb1f40e6 100644 --- a/metricbeat/mb/testing/modules.go +++ b/metricbeat/mb/testing/modules.go @@ -53,6 +53,7 @@ that Metricbeat does it and with the same validations. } } */ + package testing import ( @@ -60,6 +61,7 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/go-concert/timed" "github.com/elastic/beats/v7/metricbeat/mb" @@ -72,9 +74,11 @@ type TestModule struct { RawConfig *conf.C } -func (m *TestModule) Name() string { return m.ModName } -func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } -func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } +func (m *TestModule) Name() string { return m.ModName } +func (m *TestModule) Config() mb.ModuleConfig { return m.ModConfig } +func (m *TestModule) UnpackConfig(to interface{}) error { return m.RawConfig.Unpack(to) } +func (m *TestModule) UpdateStatus(_ status.Status, _ string) {} +func (m *TestModule) SetStatusReporter(_ status.StatusReporter) {} func NewTestModule(t testing.TB, config interface{}) *TestModule { c, err := conf.NewConfigFrom(config) diff --git a/metricbeat/module/elasticsearch/node_stats/data_test.go b/metricbeat/module/elasticsearch/node_stats/data_test.go index e6151555701..2317418eeaf 100644 --- a/metricbeat/module/elasticsearch/node_stats/data_test.go +++ b/metricbeat/module/elasticsearch/node_stats/data_test.go @@ -22,6 +22,7 @@ package node_stats import ( "testing" + "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/metricbeat/module/elasticsearch" ) @@ -60,3 +61,6 @@ func (m mockModule) Config() mb.ModuleConfig { func (m mockModule) UnpackConfig(to interface{}) error { return nil } + +func (m mockModule) UpdateStatus(_ status.Status, _ string) {} +func (m mockModule) SetStatusReporter(_ status.StatusReporter) {} diff --git a/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go new file mode 100644 index 00000000000..5e22332f2e4 --- /dev/null +++ b/x-pack/libbeat/management/tests/mbtest/system/process_integration_test.go @@ -0,0 +1,220 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package tests + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/common/reload" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" + + conf "github.com/elastic/elastic-agent-libs/config" +) + +func TestProcessStatusReporter(t *testing.T) { + unitOneID := mock.NewID() + unitOutID := mock.NewID() + token := mock.NewID() + + tests.InitBeatsForTest(t, cmd.RootCmd) + + filename := fmt.Sprintf("test-%d", time.Now().Unix()) + outPath := filepath.Join(os.TempDir(), filename) + t.Logf("writing output to file %s", outPath) + err := os.Mkdir(outPath, 0775) + require.NoError(t, err) + defer func() { + err := os.RemoveAll(outPath) + require.NoError(t, err) + }() + + /* + * process with pid=-1 doesn't exist. This should degrade the input for a while */ + inputStreamIncorrectPid := getInputStream(unitOneID, -1, 1) + /* + * process with valid pid. This should change state to healthy */ + inputStreamCorrectPid := getInputStream(unitOneID, os.Getpid(), 2) + outputExpectedStream := proto.UnitExpected{ + Id: unitOutID, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Type: "file", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "type": "file", + "enabled": true, + "path": outPath, + "filename": "beat-out", + "number_of_files": 7, + }), + }, + } + + observedStates := make(chan *proto.CheckinObserved) + expectedUnits := make(chan []*proto.UnitExpected) + done := make(chan struct{}) + // V2 mock server + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + select { + case observedStates <- observed: + return &proto.CheckinExpected{ + Units: <-expectedUnits, + } + case <-done: + return nil + } + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + } + require.NoError(t, server.Start()) + defer server.Stop() + + // start the client + client := client.NewV2(fmt.Sprintf(":%d", server.Port), token, client.VersionInfo{ + Name: "program", + Meta: map[string]string{ + "key": "value", + }, + }, client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) + + lbmanagement.SetManagerFactory(func(cfg *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { + c := management.DefaultConfig() + if err := cfg.Unpack(&c); err != nil { + return nil, err + } + return management.NewV2AgentManagerWithClient(c, registry, client, management.WithStopOnEmptyUnits) + }) + + go func() { + t.Logf("Running beats...") + err := cmd.RootCmd.Execute() + require.NoError(t, err) + }() + + scenarios := []struct { + expectedStatus proto.State + nextInputunit *proto.UnitExpected + }{ + { + proto.State_HEALTHY, + &inputStreamIncorrectPid, + }, + { + proto.State_DEGRADED, + &inputStreamCorrectPid, + }, + { + proto.State_HEALTHY, + &inputStreamCorrectPid, + }, + // wait for one more checkin, just to be sure it's healthy + { + proto.State_HEALTHY, + &inputStreamCorrectPid, + }, + } + + timer := time.NewTimer(2 * time.Minute) + id := 0 + for id < len(scenarios) { + select { + case observed := <-observedStates: + state := extractState(observed.GetUnits(), unitOneID) + expectedUnits <- []*proto.UnitExpected{ + scenarios[id].nextInputunit, + &outputExpectedStream, + } + if state != scenarios[id].expectedStatus { + continue + } + // always ensure that output is healthy + outputState := extractState(observed.GetUnits(), unitOutID) + require.Equal(t, outputState, proto.State_HEALTHY) + + timer.Reset(2 * time.Minute) + id++ + case <-timer.C: + t.Fatal("timeout waiting for checkin") + default: + } + } +} + +func extractState(units []*proto.UnitObserved, idx string) proto.State { + for _, unit := range units { + if unit.Id == idx { + return unit.GetState() + } + } + return -1 +} + +func getInputStream(id string, pid int, stateIdx int) proto.UnitExpected { + return proto.UnitExpected{ + Id: id, + Type: proto.UnitType_INPUT, + ConfigStateIdx: uint64(stateIdx), + State: proto.State_HEALTHY, + Config: &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Streams: []*proto.Stream{{ + Id: "system/metrics-system.process-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.process", + Type: "metrics", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "metricsets": []interface{}{"process"}, + "process.pid": pid, + }), + }}, + Type: "system/metrics", + Id: "system/metrics-system-default-system", + Name: "system-1", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + }, + } +}