diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 93832b8a1876..6b7b963f9c82 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -282,6 +282,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - [Kubernetes] Remove redundant dockersock volume mount {pull}22009[22009] - Revert change to report `process.memory.rss` as `process.memory.wss` on Windows. {pull}22055[22055] - Remove io.time from windows {pull}22237[22237] +- Stop generating NaN values from Cloud Foundry module to avoid errors in outputs. {pull}22634[22634] *Packetbeat* diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index 10ea50dd9280..96349eeb7ea2 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -111,7 +111,7 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF if !filterFn(env) { continue } - event := envelopeToEvent(env) + event := EnvelopeToEvent(env) if event == nil { c.log.Debugf("Envelope couldn't be converted to event: %+v", env) continue diff --git a/x-pack/libbeat/common/cloudfoundry/events.go b/x-pack/libbeat/common/cloudfoundry/events.go index adaa944773c6..4d1a67e2b1bd 100644 --- a/x-pack/libbeat/common/cloudfoundry/events.go +++ b/x-pack/libbeat/common/cloudfoundry/events.go @@ -461,7 +461,7 @@ func newEventError(env *events.Envelope) *EventError { } } -func envelopeToEvent(env *events.Envelope) Event { +func EnvelopeToEvent(env *events.Envelope) Event { switch *env.EventType { case events.Envelope_HttpStartStop: return newEventHttpAccess(env) diff --git a/x-pack/libbeat/common/cloudfoundry/rlplistener.go b/x-pack/libbeat/common/cloudfoundry/rlplistener.go index e80db747c8e0..0c08b12a7fdf 100644 --- a/x-pack/libbeat/common/cloudfoundry/rlplistener.go +++ b/x-pack/libbeat/common/cloudfoundry/rlplistener.go @@ -79,7 +79,7 @@ func (c *RlpListener) Start(ctx context.Context) { for i := range envelopes { v1s := conversion.ToV1(envelopes[i]) for _, v := range v1s { - evt := envelopeToEvent(v) + evt := EnvelopeToEvent(v) if evt.EventType() == EventTypeHttpAccess && c.callbacks.HttpAccess != nil { c.callbacks.HttpAccess(evt.(*EventHttpAccess)) } else if evt.EventType() == EventTypeLog && c.callbacks.Log != nil { diff --git a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go index 961827469dd0..1486c9b14c0e 100644 --- a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go +++ b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go @@ -22,19 +22,30 @@ func init() { } type Module interface { + mb.Module RunCounterReporter(mb.PushReporterV2) RunContainerReporter(mb.PushReporterV2) RunValueReporter(mb.PushReporterV2) } func newModule(base mb.BaseModule) (mb.Module, error) { + factory := func(cfg *cfcommon.Config, name string, log *logp.Logger) CloudfoundryHub { + return &HubAdapter{cfcommon.NewHub(cfg, name, log)} + } + return NewModuleWithHubFactory(base, factory) +} + +type hubFactory func(cfg *cfcommon.Config, name string, log *logp.Logger) CloudfoundryHub + +// NewModuleWithHubFactory initializes a module with a hub created with a hub factory +func NewModuleWithHubFactory(base mb.BaseModule, hubFactory hubFactory) (mb.Module, error) { var cfg cfcommon.Config if err := base.UnpackConfig(&cfg); err != nil { return nil, err } log := logp.NewLogger("cloudfoundry") - hub := cfcommon.NewHub(&cfg, "metricbeat", log) + hub := hubFactory(&cfg, "metricbeat", log) switch cfg.Version { case cfcommon.ConsumerVersionV1: diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container.go b/x-pack/metricbeat/module/cloudfoundry/container/container.go index 4f8c62271036..50287ef8c49e 100644 --- a/x-pack/metricbeat/module/cloudfoundry/container/container.go +++ b/x-pack/metricbeat/module/cloudfoundry/container/container.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" ) @@ -41,5 +42,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Run method provides the module with a reporter with which events can be reported. func (m *MetricSet) Run(reporter mb.PushReporterV2) { - m.mod.RunContainerReporter(reporter) + m.mod.RunContainerReporter(&containerReporter{reporter, m.Logger()}) +} + +type containerReporter struct { + mb.PushReporterV2 + + logger *logp.Logger +} + +func (r *containerReporter) Event(event mb.Event) bool { + cpuPctKey := "cloudfoundry.container.cpu.pct" + found, err := cloudfoundry.HasNonNumericFloat(event.RootFields, cpuPctKey) + if err != nil { + r.logger.Debugf("Unexpected failure while checking for non-numeric values: %v", err) + } + if found { + event.RootFields.Delete(cpuPctKey) + } + return r.PushReporterV2.Event(event) } diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container_test.go b/x-pack/metricbeat/module/cloudfoundry/container/container_test.go new file mode 100644 index 000000000000..46b1206f1b38 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/container/container_test.go @@ -0,0 +1,163 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package container + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" +) + +func init() { + if err := mb.Registry.AddModule("cloudfoundrytest", mtest.NewModuleMock); err != nil { + panic(err) + } + mb.Registry.MustAddMetricSet("cloudfoundrytest", "test", newTestMetricSet, + mb.WithHostParser(parse.EmptyHostParser), + mb.DefaultMetricSet(), + ) +} + +func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + return New(base) +} + +func TestMetricSet(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "1234", memory: 1024, cpupct: 12.34})) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) + require.NotEmpty(t, events) + + expectedFields := common.MapStr{ + "cloudfoundry.app.id": "1234", + "cloudfoundry.container.cpu.pct": float64(12.34), + "cloudfoundry.container.disk.bytes": uint64(0), + "cloudfoundry.container.disk.quota.bytes": uint64(0), + "cloudfoundry.container.instance_index": int32(0), + "cloudfoundry.container.memory.bytes": uint64(1024), + "cloudfoundry.container.memory.quota.bytes": uint64(0), + "cloudfoundry.envelope.deployment": "test", + "cloudfoundry.envelope.index": "index", + "cloudfoundry.envelope.ip": "127.0.0.1", + "cloudfoundry.envelope.job": "test", + "cloudfoundry.envelope.origin": "test", + "cloudfoundry.type": "container", + } + require.Equal(t, expectedFields, events[0].RootFields.Flatten()) +} + +func TestMetricValuesAreNumbers(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "0000", memory: 1024, cpupct: math.NaN()})) + hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "1234", memory: 1024, cpupct: 12.34})) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 2, ms) + require.NotEmpty(t, events) + + for _, e := range events { + memory, err := e.RootFields.GetValue("cloudfoundry.container.memory.bytes") + if assert.NoError(t, err, "checking memory") { + assert.Equal(t, uint64(1024), memory.(uint64)) + } + + app, err := e.RootFields.GetValue("cloudfoundry.app.id") + require.NoError(t, err, "getting app id") + + cpuPctKey := "cloudfoundry.container.cpu.pct" + switch app { + case "0000": + _, err := e.RootFields.GetValue(cpuPctKey) + require.Error(t, err, "non-numeric metric shouldn't be there") + case "1234": + v, err := e.RootFields.GetValue(cpuPctKey) + if assert.NoError(t, err, "checking cpu pct") { + assert.Equal(t, 12.34, v.(float64)) + } + default: + t.Errorf("unexpected app: %s", app) + } + } +} + +type containerMetrics struct { + app string + instance int32 + cpupct float64 + memory uint64 + disk uint64 + memoryQuota uint64 + diskQuota uint64 +} + +func containerMetricsEnvelope(metrics containerMetrics) *events.Envelope { + eventType := events.Envelope_ContainerMetric + origin := "test" + deployment := "test" + job := "test" + ip := "127.0.0.1" + index := "index" + timestamp := time.Now().Unix() + return &events.Envelope{ + EventType: &eventType, + Timestamp: ×tamp, + Origin: &origin, + Deployment: &deployment, + Job: &job, + Ip: &ip, + Index: &index, + ContainerMetric: &events.ContainerMetric{ + ApplicationId: &metrics.app, + InstanceIndex: &metrics.instance, + CpuPercentage: &metrics.cpupct, + MemoryBytes: &metrics.memory, + DiskBytes: &metrics.disk, + MemoryBytesQuota: &metrics.memoryQuota, + DiskBytesQuota: &metrics.diskQuota, + }, + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/counter/counter_test.go b/x-pack/metricbeat/module/cloudfoundry/counter/counter_test.go new file mode 100644 index 000000000000..a315709878e1 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/counter/counter_test.go @@ -0,0 +1,96 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package counter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" +) + +func init() { + if err := mb.Registry.AddModule("cloudfoundrytest", mtest.NewModuleMock); err != nil { + panic(err) + } + mb.Registry.MustAddMetricSet("cloudfoundrytest", "test", newTestMetricSet, + mb.WithHostParser(parse.EmptyHostParser), + mb.DefaultMetricSet(), + ) +} + +func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + return New(base) +} + +func TestMetricSet(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(counterMetricEnvelope("requests", 1234, 123)) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) + require.NotEmpty(t, events) + + expectedFields := common.MapStr{ + "cloudfoundry.counter.delta": uint64(123), + "cloudfoundry.counter.name": "requests", + "cloudfoundry.counter.total": uint64(1234), + "cloudfoundry.envelope.deployment": "test", + "cloudfoundry.envelope.index": "index", + "cloudfoundry.envelope.ip": "127.0.0.1", + "cloudfoundry.envelope.job": "test", + "cloudfoundry.envelope.origin": "test", + "cloudfoundry.type": "counter", + } + require.Equal(t, expectedFields, events[0].RootFields.Flatten()) +} + +func counterMetricEnvelope(name string, total uint64, delta uint64) *events.Envelope { + eventType := events.Envelope_CounterEvent + origin := "test" + deployment := "test" + job := "test" + ip := "127.0.0.1" + index := "index" + timestamp := time.Now().Unix() + return &events.Envelope{ + EventType: &eventType, + Timestamp: ×tamp, + Origin: &origin, + Deployment: &deployment, + Job: &job, + Ip: &ip, + Index: &index, + CounterEvent: &events.CounterEvent{ + Name: &name, + Total: &total, + Delta: &delta, + }, + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/hub.go b/x-pack/metricbeat/module/cloudfoundry/hub.go new file mode 100644 index 000000000000..5057a7bdbc55 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/hub.go @@ -0,0 +1,43 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "context" + + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +// DopplerConsumer is the interface that a Doppler Consumer must implement for the Cloud Foundry module. +type DopplerConsumer interface { + Run() + Stop() +} + +// RlpListener is the interface that a RLP listener must implement for the Cloud Foundry module. +type RlpListener interface { + Start(context.Context) + Stop() +} + +// CloudfoundryHub is the interface that a Hub must implement for the Cloud Foundry module. +type CloudfoundryHub interface { + DopplerConsumer(cfcommon.DopplerCallbacks) (DopplerConsumer, error) + RlpListener(cfcommon.RlpListenerCallbacks) (RlpListener, error) +} + +// HubAdapter adapt a cloudfoundry Hub to the hub expected by the metricbeat module. +// This adaptation is needed to return different but compatible types, so the Hub can be mocked. +type HubAdapter struct { + hub *cfcommon.Hub +} + +func (h *HubAdapter) DopplerConsumer(cbs cfcommon.DopplerCallbacks) (DopplerConsumer, error) { + return h.hub.DopplerConsumer(cbs) +} + +func (h *HubAdapter) RlpListener(cbs cfcommon.RlpListenerCallbacks) (RlpListener, error) { + return h.hub.RlpListener(cbs) +} diff --git a/x-pack/metricbeat/module/cloudfoundry/mtest/modulemock.go b/x-pack/metricbeat/module/cloudfoundry/mtest/modulemock.go new file mode 100644 index 000000000000..22b5260fd663 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/mtest/modulemock.go @@ -0,0 +1,94 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mtest + +import ( + "fmt" + + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry" +) + +// ModuleMock is a Module with a mocked hub +type ModuleMock struct { + cloudfoundry.Module + + Hub *HubMock +} + +// NewModuleMock creates a mocked module. It contains a mocked hub that can be used to +// send envelopes for testing. +func NewModuleMock(base mb.BaseModule) (mb.Module, error) { + module := ModuleMock{} + factory := func(*cfcommon.Config, string, *logp.Logger) cloudfoundry.CloudfoundryHub { + if module.Hub == nil { + module.Hub = NewHubMock() + } + return module.Hub + } + m, err := cloudfoundry.NewModuleWithHubFactory(base, factory) + if err != nil { + return nil, err + } + + module.Module = m.(cloudfoundry.Module) + return &module, nil +} + +// HubMock is a mocked hub, it can be used to send envelopes for testing. +type HubMock struct { + envelopes chan *events.Envelope +} + +// NewHubMock creates a mocked hub, it cannot be shared between metricsets. +func NewHubMock() *HubMock { + return &HubMock{ + envelopes: make(chan *events.Envelope), + } +} + +// SendEnvelope is the main method to be used on testing, it sends an envelope through the hub. +func (h *HubMock) SendEnvelope(envelope *events.Envelope) { + h.envelopes <- envelope +} + +// DopplerConsumer creates a doppler consumer for testing, this consumer receives the events +// sent with `SendEnvelope()`. +func (h *HubMock) DopplerConsumer(cbs cfcommon.DopplerCallbacks) (cloudfoundry.DopplerConsumer, error) { + return &MockedDopplerConsumer{h, cbs}, nil +} + +// RlpListener creates a RLP listener for testing, this consumer receives the events +// sent with `SendEnvelope()`. +func (h *HubMock) RlpListener(cbs cfcommon.RlpListenerCallbacks) (cloudfoundry.RlpListener, error) { + return nil, fmt.Errorf("mocked hub doesn't support RLP yet: not implemented") +} + +// MokedDopplerConsumer is a mocked doppler consumer, it receives events sent through a mocked hub. +// It only supports the "Metrics" callback. +type MockedDopplerConsumer struct { + hub *HubMock + cbs cfcommon.DopplerCallbacks +} + +// Run runs the doppler consumer. +// Only supports the metrics callback, what is enough for Metricbeat. +// To generalize it a dispatching mechanism should be implemented. +func (c *MockedDopplerConsumer) Run() { + go func() { + for envelope := range c.hub.envelopes { + c.cbs.Metric(cfcommon.EnvelopeToEvent(envelope)) + } + }() +} + +// Stop stops the doppler consumer and the hub it uses. +func (c *MockedDopplerConsumer) Stop() { + close(c.hub.envelopes) +} diff --git a/x-pack/metricbeat/module/cloudfoundry/util.go b/x-pack/metricbeat/module/cloudfoundry/util.go new file mode 100644 index 000000000000..4775ac539beb --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/util.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "fmt" + "math" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// HasNonNumericFloat checks if an event has a non-numeric float in the specific key. +// It returns false and an error if the key cannot be found in the event +func HasNonNumericFloat(event common.MapStr, key string) (bool, error) { + v, err := event.GetValue(key) + if err != nil { + return false, fmt.Errorf("getting value for key %s: %w", key, err) + } + + if v, ok := v.(float64); ok && (math.IsNaN(v) || math.IsInf(v, 0)) { + return true, nil + } + + return false, nil +} diff --git a/x-pack/metricbeat/module/cloudfoundry/util_test.go b/x-pack/metricbeat/module/cloudfoundry/util_test.go new file mode 100644 index 000000000000..3570607707f5 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/util_test.go @@ -0,0 +1,70 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestHasNonNumericFloat(t *testing.T) { + type caseKey struct { + key string + expectedFound bool + expectedErr bool + } + cases := []struct { + title string + event common.MapStr + keys []caseKey + }{ + { + title: "Empty event", + event: common.MapStr{}, + keys: []caseKey{ + {"", false, true}, + {"somekey", false, true}, + }, + }, + { + title: "Event with non-numeric values", + event: common.MapStr{ + "someobject": common.MapStr{ + "inf": math.Inf(1), + "nan": math.NaN(), + "number": int64(42), + "float": float64(42), + }, + }, + keys: []caseKey{ + {"", false, true}, + {"someobject", false, false}, + {"someobject.inf", true, false}, + {"someobject.nan", true, false}, + {"someobject.number", false, false}, + {"someobject.float", false, false}, + {"someobject.notexists", false, true}, + }, + }, + } + + for _, c := range cases { + for _, k := range c.keys { + t.Run(c.title+"/"+k.key, func(t *testing.T) { + found, err := HasNonNumericFloat(c.event, k.key) + assert.Equal(t, k.expectedFound, found, "key has numeric float") + if k.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go index 7d9daf24673f..db7f6b500fa5 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v1.go +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -17,13 +17,13 @@ type ModuleV1 struct { log *logp.Logger running atomic.Bool - consumer *cfcommon.DopplerConsumer + consumer DopplerConsumer events chan cfcommon.Event subscriptions chan subscription } -func newModuleV1(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (*ModuleV1, error) { +func newModuleV1(base mb.BaseModule, hub CloudfoundryHub, log *logp.Logger) (*ModuleV1, error) { m := ModuleV1{ BaseModule: base, log: log, diff --git a/x-pack/metricbeat/module/cloudfoundry/v2.go b/x-pack/metricbeat/module/cloudfoundry/v2.go index 5cf7de6c103e..d2987f3c4013 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v2.go +++ b/x-pack/metricbeat/module/cloudfoundry/v2.go @@ -18,8 +18,8 @@ type ModuleV2 struct { log *logp.Logger - hub *cfcommon.Hub - listener *cfcommon.RlpListener + hub CloudfoundryHub + listener RlpListener listenerLock sync.Mutex counterReporter mb.PushReporterV2 @@ -27,7 +27,7 @@ type ModuleV2 struct { containerReporter mb.PushReporterV2 } -func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (mb.Module, error) { +func newModuleV2(base mb.BaseModule, hub CloudfoundryHub, log *logp.Logger) (mb.Module, error) { // early check that listener can be created _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) if err != nil { diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value.go b/x-pack/metricbeat/module/cloudfoundry/value/value.go index 55cb6ca689ac..de2b3d60a694 100644 --- a/x-pack/metricbeat/module/cloudfoundry/value/value.go +++ b/x-pack/metricbeat/module/cloudfoundry/value/value.go @@ -7,9 +7,9 @@ package value import ( "fmt" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry" - + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry" ) // init registers the MetricSet with the central registry. @@ -41,5 +41,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Run method provides the module with a reporter with which events can be reported. func (m *MetricSet) Run(reporter mb.PushReporterV2) { - m.mod.RunValueReporter(reporter) + m.mod.RunValueReporter(&valueReporter{reporter, m.Logger()}) +} + +type valueReporter struct { + mb.PushReporterV2 + + logger *logp.Logger +} + +func (r *valueReporter) Event(event mb.Event) bool { + found, err := cloudfoundry.HasNonNumericFloat(event.RootFields, "cloudfoundry.value.value") + if err != nil { + r.logger.Debugf("Unexpected failure while checking for non-numeric values: %v", err) + } + if found { + r.logger.Debugf("Ignored event with float value that is not a number: %+v", event) + return true + } + return r.PushReporterV2.Event(event) } diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value_test.go b/x-pack/metricbeat/module/cloudfoundry/value/value_test.go new file mode 100644 index 000000000000..2003388e2283 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/value/value_test.go @@ -0,0 +1,131 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package value + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" +) + +func init() { + if err := mb.Registry.AddModule("cloudfoundrytest", mtest.NewModuleMock); err != nil { + panic(err) + } + mb.Registry.MustAddMetricSet("cloudfoundrytest", "test", newTestMetricSet, + mb.WithHostParser(parse.EmptyHostParser), + mb.DefaultMetricSet(), + ) +} + +func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + return New(base) +} + +func TestMetricSet(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(valueMetricEnvelope("duration", 12.34, "ms")) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) + require.NotEmpty(t, events) + + expectedFields := common.MapStr{ + "cloudfoundry.envelope.deployment": "test", + "cloudfoundry.envelope.index": "index", + "cloudfoundry.envelope.ip": "127.0.0.1", + "cloudfoundry.envelope.job": "test", + "cloudfoundry.envelope.origin": "test", + "cloudfoundry.type": "value", + "cloudfoundry.value.name": "duration", + "cloudfoundry.value.unit": "ms", + "cloudfoundry.value.value": float64(12.34), + } + require.Equal(t, expectedFields, events[0].RootFields.Flatten()) +} + +func TestValuesAreNumbers(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(valueMetricEnvelope("duration", math.NaN(), "ms")) + hub.SendEnvelope(valueMetricEnvelope("duration", 12.34, "ms")) + hub.SendEnvelope(valueMetricEnvelope("duration", math.Inf(1), "ms")) + hub.SendEnvelope(valueMetricEnvelope("duration", 34.56, "ms")) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 2, ms) + require.NotEmpty(t, events) + + for _, e := range events { + value, err := e.RootFields.GetValue("cloudfoundry.value.value") + if assert.NoError(t, err) { + assert.False(t, math.IsNaN(value.(float64))) + assert.False(t, math.IsInf(value.(float64), 0)) + } + } +} + +func valueMetricEnvelope(name string, value float64, unit string) *events.Envelope { + eventType := events.Envelope_ValueMetric + origin := "test" + deployment := "test" + job := "test" + ip := "127.0.0.1" + index := "index" + timestamp := time.Now().Unix() + return &events.Envelope{ + EventType: &eventType, + Timestamp: ×tamp, + Origin: &origin, + Deployment: &deployment, + Job: &job, + Ip: &ip, + Index: &index, + ValueMetric: &events.ValueMetric{ + Name: &name, + Value: &value, + Unit: &unit, + }, + } +}