From e9284936ad7a369fb6b47935aa74c6af75805106 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Wed, 25 Nov 2020 11:23:18 +0100 Subject: [PATCH] Avoid sending non-numeric floats in cloud foundry integrations (#22634) Cloud Foundry integrations are sending some values as they are received from the Firehose, some of these values can be floats with non-numeric values (NaN/Inf), that are not supported by JSON and Elasticsearch. Add defensive code to avoid sending these values to the outputs. Also, add unit tests using mocked cloud foundry hubs. (cherry picked from commit 0619788ce117d09aa292f2d38654b4ba0d8e31b4) --- CHANGELOG.next.asciidoc | 1 + .../common/cloudfoundry/dopplerconsumer.go | 2 +- x-pack/libbeat/common/cloudfoundry/events.go | 2 +- .../common/cloudfoundry/rlplistener.go | 2 +- .../module/cloudfoundry/cloudfoundry.go | 13 +- .../cloudfoundry/container/container.go | 21 ++- .../cloudfoundry/container/container_test.go | 163 ++++++++++++++++++ .../cloudfoundry/counter/counter_test.go | 96 +++++++++++ x-pack/metricbeat/module/cloudfoundry/hub.go | 43 +++++ .../module/cloudfoundry/mtest/modulemock.go | 94 ++++++++++ x-pack/metricbeat/module/cloudfoundry/util.go | 27 +++ .../module/cloudfoundry/util_test.go | 70 ++++++++ x-pack/metricbeat/module/cloudfoundry/v1.go | 4 +- x-pack/metricbeat/module/cloudfoundry/v2.go | 6 +- .../module/cloudfoundry/value/value.go | 24 ++- .../module/cloudfoundry/value/value_test.go | 131 ++++++++++++++ 16 files changed, 686 insertions(+), 13 deletions(-) create mode 100644 x-pack/metricbeat/module/cloudfoundry/container/container_test.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/counter/counter_test.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/hub.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/mtest/modulemock.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/util.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/util_test.go create mode 100644 x-pack/metricbeat/module/cloudfoundry/value/value_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 93832b8a187..6b7b963f9c8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -282,6 +282,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - [Kubernetes] Remove redundant dockersock volume mount {pull}22009[22009] - Revert change to report `process.memory.rss` as `process.memory.wss` on Windows. {pull}22055[22055] - Remove io.time from windows {pull}22237[22237] +- Stop generating NaN values from Cloud Foundry module to avoid errors in outputs. {pull}22634[22634] *Packetbeat* diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index 10ea50dd928..96349eeb7ea 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -111,7 +111,7 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF if !filterFn(env) { continue } - event := envelopeToEvent(env) + event := EnvelopeToEvent(env) if event == nil { c.log.Debugf("Envelope couldn't be converted to event: %+v", env) continue diff --git a/x-pack/libbeat/common/cloudfoundry/events.go b/x-pack/libbeat/common/cloudfoundry/events.go index adaa944773c..4d1a67e2b1b 100644 --- a/x-pack/libbeat/common/cloudfoundry/events.go +++ b/x-pack/libbeat/common/cloudfoundry/events.go @@ -461,7 +461,7 @@ func newEventError(env *events.Envelope) *EventError { } } -func envelopeToEvent(env *events.Envelope) Event { +func EnvelopeToEvent(env *events.Envelope) Event { switch *env.EventType { case events.Envelope_HttpStartStop: return newEventHttpAccess(env) diff --git a/x-pack/libbeat/common/cloudfoundry/rlplistener.go b/x-pack/libbeat/common/cloudfoundry/rlplistener.go index e80db747c8e..0c08b12a7fd 100644 --- a/x-pack/libbeat/common/cloudfoundry/rlplistener.go +++ b/x-pack/libbeat/common/cloudfoundry/rlplistener.go @@ -79,7 +79,7 @@ func (c *RlpListener) Start(ctx context.Context) { for i := range envelopes { v1s := conversion.ToV1(envelopes[i]) for _, v := range v1s { - evt := envelopeToEvent(v) + evt := EnvelopeToEvent(v) if evt.EventType() == EventTypeHttpAccess && c.callbacks.HttpAccess != nil { c.callbacks.HttpAccess(evt.(*EventHttpAccess)) } else if evt.EventType() == EventTypeLog && c.callbacks.Log != nil { diff --git a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go index 961827469dd..1486c9b14c0 100644 --- a/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go +++ b/x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go @@ -22,19 +22,30 @@ func init() { } type Module interface { + mb.Module RunCounterReporter(mb.PushReporterV2) RunContainerReporter(mb.PushReporterV2) RunValueReporter(mb.PushReporterV2) } func newModule(base mb.BaseModule) (mb.Module, error) { + factory := func(cfg *cfcommon.Config, name string, log *logp.Logger) CloudfoundryHub { + return &HubAdapter{cfcommon.NewHub(cfg, name, log)} + } + return NewModuleWithHubFactory(base, factory) +} + +type hubFactory func(cfg *cfcommon.Config, name string, log *logp.Logger) CloudfoundryHub + +// NewModuleWithHubFactory initializes a module with a hub created with a hub factory +func NewModuleWithHubFactory(base mb.BaseModule, hubFactory hubFactory) (mb.Module, error) { var cfg cfcommon.Config if err := base.UnpackConfig(&cfg); err != nil { return nil, err } log := logp.NewLogger("cloudfoundry") - hub := cfcommon.NewHub(&cfg, "metricbeat", log) + hub := hubFactory(&cfg, "metricbeat", log) switch cfg.Version { case cfcommon.ConsumerVersionV1: diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container.go b/x-pack/metricbeat/module/cloudfoundry/container/container.go index 4f8c6227103..50287ef8c49 100644 --- a/x-pack/metricbeat/module/cloudfoundry/container/container.go +++ b/x-pack/metricbeat/module/cloudfoundry/container/container.go @@ -9,6 +9,7 @@ import ( "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" ) @@ -41,5 +42,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Run method provides the module with a reporter with which events can be reported. func (m *MetricSet) Run(reporter mb.PushReporterV2) { - m.mod.RunContainerReporter(reporter) + m.mod.RunContainerReporter(&containerReporter{reporter, m.Logger()}) +} + +type containerReporter struct { + mb.PushReporterV2 + + logger *logp.Logger +} + +func (r *containerReporter) Event(event mb.Event) bool { + cpuPctKey := "cloudfoundry.container.cpu.pct" + found, err := cloudfoundry.HasNonNumericFloat(event.RootFields, cpuPctKey) + if err != nil { + r.logger.Debugf("Unexpected failure while checking for non-numeric values: %v", err) + } + if found { + event.RootFields.Delete(cpuPctKey) + } + return r.PushReporterV2.Event(event) } diff --git a/x-pack/metricbeat/module/cloudfoundry/container/container_test.go b/x-pack/metricbeat/module/cloudfoundry/container/container_test.go new file mode 100644 index 00000000000..46b1206f1b3 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/container/container_test.go @@ -0,0 +1,163 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package container + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" +) + +func init() { + if err := mb.Registry.AddModule("cloudfoundrytest", mtest.NewModuleMock); err != nil { + panic(err) + } + mb.Registry.MustAddMetricSet("cloudfoundrytest", "test", newTestMetricSet, + mb.WithHostParser(parse.EmptyHostParser), + mb.DefaultMetricSet(), + ) +} + +func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + return New(base) +} + +func TestMetricSet(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "1234", memory: 1024, cpupct: 12.34})) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) + require.NotEmpty(t, events) + + expectedFields := common.MapStr{ + "cloudfoundry.app.id": "1234", + "cloudfoundry.container.cpu.pct": float64(12.34), + "cloudfoundry.container.disk.bytes": uint64(0), + "cloudfoundry.container.disk.quota.bytes": uint64(0), + "cloudfoundry.container.instance_index": int32(0), + "cloudfoundry.container.memory.bytes": uint64(1024), + "cloudfoundry.container.memory.quota.bytes": uint64(0), + "cloudfoundry.envelope.deployment": "test", + "cloudfoundry.envelope.index": "index", + "cloudfoundry.envelope.ip": "127.0.0.1", + "cloudfoundry.envelope.job": "test", + "cloudfoundry.envelope.origin": "test", + "cloudfoundry.type": "container", + } + require.Equal(t, expectedFields, events[0].RootFields.Flatten()) +} + +func TestMetricValuesAreNumbers(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "0000", memory: 1024, cpupct: math.NaN()})) + hub.SendEnvelope(containerMetricsEnvelope(containerMetrics{app: "1234", memory: 1024, cpupct: 12.34})) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 2, ms) + require.NotEmpty(t, events) + + for _, e := range events { + memory, err := e.RootFields.GetValue("cloudfoundry.container.memory.bytes") + if assert.NoError(t, err, "checking memory") { + assert.Equal(t, uint64(1024), memory.(uint64)) + } + + app, err := e.RootFields.GetValue("cloudfoundry.app.id") + require.NoError(t, err, "getting app id") + + cpuPctKey := "cloudfoundry.container.cpu.pct" + switch app { + case "0000": + _, err := e.RootFields.GetValue(cpuPctKey) + require.Error(t, err, "non-numeric metric shouldn't be there") + case "1234": + v, err := e.RootFields.GetValue(cpuPctKey) + if assert.NoError(t, err, "checking cpu pct") { + assert.Equal(t, 12.34, v.(float64)) + } + default: + t.Errorf("unexpected app: %s", app) + } + } +} + +type containerMetrics struct { + app string + instance int32 + cpupct float64 + memory uint64 + disk uint64 + memoryQuota uint64 + diskQuota uint64 +} + +func containerMetricsEnvelope(metrics containerMetrics) *events.Envelope { + eventType := events.Envelope_ContainerMetric + origin := "test" + deployment := "test" + job := "test" + ip := "127.0.0.1" + index := "index" + timestamp := time.Now().Unix() + return &events.Envelope{ + EventType: &eventType, + Timestamp: ×tamp, + Origin: &origin, + Deployment: &deployment, + Job: &job, + Ip: &ip, + Index: &index, + ContainerMetric: &events.ContainerMetric{ + ApplicationId: &metrics.app, + InstanceIndex: &metrics.instance, + CpuPercentage: &metrics.cpupct, + MemoryBytes: &metrics.memory, + DiskBytes: &metrics.disk, + MemoryBytesQuota: &metrics.memoryQuota, + DiskBytesQuota: &metrics.diskQuota, + }, + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/counter/counter_test.go b/x-pack/metricbeat/module/cloudfoundry/counter/counter_test.go new file mode 100644 index 00000000000..a315709878e --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/counter/counter_test.go @@ -0,0 +1,96 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package counter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" +) + +func init() { + if err := mb.Registry.AddModule("cloudfoundrytest", mtest.NewModuleMock); err != nil { + panic(err) + } + mb.Registry.MustAddMetricSet("cloudfoundrytest", "test", newTestMetricSet, + mb.WithHostParser(parse.EmptyHostParser), + mb.DefaultMetricSet(), + ) +} + +func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + return New(base) +} + +func TestMetricSet(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(counterMetricEnvelope("requests", 1234, 123)) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) + require.NotEmpty(t, events) + + expectedFields := common.MapStr{ + "cloudfoundry.counter.delta": uint64(123), + "cloudfoundry.counter.name": "requests", + "cloudfoundry.counter.total": uint64(1234), + "cloudfoundry.envelope.deployment": "test", + "cloudfoundry.envelope.index": "index", + "cloudfoundry.envelope.ip": "127.0.0.1", + "cloudfoundry.envelope.job": "test", + "cloudfoundry.envelope.origin": "test", + "cloudfoundry.type": "counter", + } + require.Equal(t, expectedFields, events[0].RootFields.Flatten()) +} + +func counterMetricEnvelope(name string, total uint64, delta uint64) *events.Envelope { + eventType := events.Envelope_CounterEvent + origin := "test" + deployment := "test" + job := "test" + ip := "127.0.0.1" + index := "index" + timestamp := time.Now().Unix() + return &events.Envelope{ + EventType: &eventType, + Timestamp: ×tamp, + Origin: &origin, + Deployment: &deployment, + Job: &job, + Ip: &ip, + Index: &index, + CounterEvent: &events.CounterEvent{ + Name: &name, + Total: &total, + Delta: &delta, + }, + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/hub.go b/x-pack/metricbeat/module/cloudfoundry/hub.go new file mode 100644 index 00000000000..5057a7bdbc5 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/hub.go @@ -0,0 +1,43 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "context" + + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" +) + +// DopplerConsumer is the interface that a Doppler Consumer must implement for the Cloud Foundry module. +type DopplerConsumer interface { + Run() + Stop() +} + +// RlpListener is the interface that a RLP listener must implement for the Cloud Foundry module. +type RlpListener interface { + Start(context.Context) + Stop() +} + +// CloudfoundryHub is the interface that a Hub must implement for the Cloud Foundry module. +type CloudfoundryHub interface { + DopplerConsumer(cfcommon.DopplerCallbacks) (DopplerConsumer, error) + RlpListener(cfcommon.RlpListenerCallbacks) (RlpListener, error) +} + +// HubAdapter adapt a cloudfoundry Hub to the hub expected by the metricbeat module. +// This adaptation is needed to return different but compatible types, so the Hub can be mocked. +type HubAdapter struct { + hub *cfcommon.Hub +} + +func (h *HubAdapter) DopplerConsumer(cbs cfcommon.DopplerCallbacks) (DopplerConsumer, error) { + return h.hub.DopplerConsumer(cbs) +} + +func (h *HubAdapter) RlpListener(cbs cfcommon.RlpListenerCallbacks) (RlpListener, error) { + return h.hub.RlpListener(cbs) +} diff --git a/x-pack/metricbeat/module/cloudfoundry/mtest/modulemock.go b/x-pack/metricbeat/module/cloudfoundry/mtest/modulemock.go new file mode 100644 index 00000000000..22b5260fd66 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/mtest/modulemock.go @@ -0,0 +1,94 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mtest + +import ( + "fmt" + + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + cfcommon "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry" +) + +// ModuleMock is a Module with a mocked hub +type ModuleMock struct { + cloudfoundry.Module + + Hub *HubMock +} + +// NewModuleMock creates a mocked module. It contains a mocked hub that can be used to +// send envelopes for testing. +func NewModuleMock(base mb.BaseModule) (mb.Module, error) { + module := ModuleMock{} + factory := func(*cfcommon.Config, string, *logp.Logger) cloudfoundry.CloudfoundryHub { + if module.Hub == nil { + module.Hub = NewHubMock() + } + return module.Hub + } + m, err := cloudfoundry.NewModuleWithHubFactory(base, factory) + if err != nil { + return nil, err + } + + module.Module = m.(cloudfoundry.Module) + return &module, nil +} + +// HubMock is a mocked hub, it can be used to send envelopes for testing. +type HubMock struct { + envelopes chan *events.Envelope +} + +// NewHubMock creates a mocked hub, it cannot be shared between metricsets. +func NewHubMock() *HubMock { + return &HubMock{ + envelopes: make(chan *events.Envelope), + } +} + +// SendEnvelope is the main method to be used on testing, it sends an envelope through the hub. +func (h *HubMock) SendEnvelope(envelope *events.Envelope) { + h.envelopes <- envelope +} + +// DopplerConsumer creates a doppler consumer for testing, this consumer receives the events +// sent with `SendEnvelope()`. +func (h *HubMock) DopplerConsumer(cbs cfcommon.DopplerCallbacks) (cloudfoundry.DopplerConsumer, error) { + return &MockedDopplerConsumer{h, cbs}, nil +} + +// RlpListener creates a RLP listener for testing, this consumer receives the events +// sent with `SendEnvelope()`. +func (h *HubMock) RlpListener(cbs cfcommon.RlpListenerCallbacks) (cloudfoundry.RlpListener, error) { + return nil, fmt.Errorf("mocked hub doesn't support RLP yet: not implemented") +} + +// MokedDopplerConsumer is a mocked doppler consumer, it receives events sent through a mocked hub. +// It only supports the "Metrics" callback. +type MockedDopplerConsumer struct { + hub *HubMock + cbs cfcommon.DopplerCallbacks +} + +// Run runs the doppler consumer. +// Only supports the metrics callback, what is enough for Metricbeat. +// To generalize it a dispatching mechanism should be implemented. +func (c *MockedDopplerConsumer) Run() { + go func() { + for envelope := range c.hub.envelopes { + c.cbs.Metric(cfcommon.EnvelopeToEvent(envelope)) + } + }() +} + +// Stop stops the doppler consumer and the hub it uses. +func (c *MockedDopplerConsumer) Stop() { + close(c.hub.envelopes) +} diff --git a/x-pack/metricbeat/module/cloudfoundry/util.go b/x-pack/metricbeat/module/cloudfoundry/util.go new file mode 100644 index 00000000000..4775ac539be --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/util.go @@ -0,0 +1,27 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "fmt" + "math" + + "github.com/elastic/beats/v7/libbeat/common" +) + +// HasNonNumericFloat checks if an event has a non-numeric float in the specific key. +// It returns false and an error if the key cannot be found in the event +func HasNonNumericFloat(event common.MapStr, key string) (bool, error) { + v, err := event.GetValue(key) + if err != nil { + return false, fmt.Errorf("getting value for key %s: %w", key, err) + } + + if v, ok := v.(float64); ok && (math.IsNaN(v) || math.IsInf(v, 0)) { + return true, nil + } + + return false, nil +} diff --git a/x-pack/metricbeat/module/cloudfoundry/util_test.go b/x-pack/metricbeat/module/cloudfoundry/util_test.go new file mode 100644 index 00000000000..3570607707f --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/util_test.go @@ -0,0 +1,70 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cloudfoundry + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestHasNonNumericFloat(t *testing.T) { + type caseKey struct { + key string + expectedFound bool + expectedErr bool + } + cases := []struct { + title string + event common.MapStr + keys []caseKey + }{ + { + title: "Empty event", + event: common.MapStr{}, + keys: []caseKey{ + {"", false, true}, + {"somekey", false, true}, + }, + }, + { + title: "Event with non-numeric values", + event: common.MapStr{ + "someobject": common.MapStr{ + "inf": math.Inf(1), + "nan": math.NaN(), + "number": int64(42), + "float": float64(42), + }, + }, + keys: []caseKey{ + {"", false, true}, + {"someobject", false, false}, + {"someobject.inf", true, false}, + {"someobject.nan", true, false}, + {"someobject.number", false, false}, + {"someobject.float", false, false}, + {"someobject.notexists", false, true}, + }, + }, + } + + for _, c := range cases { + for _, k := range c.keys { + t.Run(c.title+"/"+k.key, func(t *testing.T) { + found, err := HasNonNumericFloat(c.event, k.key) + assert.Equal(t, k.expectedFound, found, "key has numeric float") + if k.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } + } +} diff --git a/x-pack/metricbeat/module/cloudfoundry/v1.go b/x-pack/metricbeat/module/cloudfoundry/v1.go index 7d9daf24673..db7f6b500fa 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v1.go +++ b/x-pack/metricbeat/module/cloudfoundry/v1.go @@ -17,13 +17,13 @@ type ModuleV1 struct { log *logp.Logger running atomic.Bool - consumer *cfcommon.DopplerConsumer + consumer DopplerConsumer events chan cfcommon.Event subscriptions chan subscription } -func newModuleV1(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (*ModuleV1, error) { +func newModuleV1(base mb.BaseModule, hub CloudfoundryHub, log *logp.Logger) (*ModuleV1, error) { m := ModuleV1{ BaseModule: base, log: log, diff --git a/x-pack/metricbeat/module/cloudfoundry/v2.go b/x-pack/metricbeat/module/cloudfoundry/v2.go index 5cf7de6c103..d2987f3c401 100644 --- a/x-pack/metricbeat/module/cloudfoundry/v2.go +++ b/x-pack/metricbeat/module/cloudfoundry/v2.go @@ -18,8 +18,8 @@ type ModuleV2 struct { log *logp.Logger - hub *cfcommon.Hub - listener *cfcommon.RlpListener + hub CloudfoundryHub + listener RlpListener listenerLock sync.Mutex counterReporter mb.PushReporterV2 @@ -27,7 +27,7 @@ type ModuleV2 struct { containerReporter mb.PushReporterV2 } -func newModuleV2(base mb.BaseModule, hub *cfcommon.Hub, log *logp.Logger) (mb.Module, error) { +func newModuleV2(base mb.BaseModule, hub CloudfoundryHub, log *logp.Logger) (mb.Module, error) { // early check that listener can be created _, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{}) if err != nil { diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value.go b/x-pack/metricbeat/module/cloudfoundry/value/value.go index 55cb6ca689a..de2b3d60a69 100644 --- a/x-pack/metricbeat/module/cloudfoundry/value/value.go +++ b/x-pack/metricbeat/module/cloudfoundry/value/value.go @@ -7,9 +7,9 @@ package value import ( "fmt" - "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry" - + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry" ) // init registers the MetricSet with the central registry. @@ -41,5 +41,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Run method provides the module with a reporter with which events can be reported. func (m *MetricSet) Run(reporter mb.PushReporterV2) { - m.mod.RunValueReporter(reporter) + m.mod.RunValueReporter(&valueReporter{reporter, m.Logger()}) +} + +type valueReporter struct { + mb.PushReporterV2 + + logger *logp.Logger +} + +func (r *valueReporter) Event(event mb.Event) bool { + found, err := cloudfoundry.HasNonNumericFloat(event.RootFields, "cloudfoundry.value.value") + if err != nil { + r.logger.Debugf("Unexpected failure while checking for non-numeric values: %v", err) + } + if found { + r.logger.Debugf("Ignored event with float value that is not a number: %+v", event) + return true + } + return r.PushReporterV2.Event(event) } diff --git a/x-pack/metricbeat/module/cloudfoundry/value/value_test.go b/x-pack/metricbeat/module/cloudfoundry/value/value_test.go new file mode 100644 index 00000000000..2003388e228 --- /dev/null +++ b/x-pack/metricbeat/module/cloudfoundry/value/value_test.go @@ -0,0 +1,131 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build !integration + +package value + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cloudfoundry/sonde-go/events" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" + "github.com/elastic/beats/v7/metricbeat/mb/parse" + mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" + "github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest" +) + +func init() { + if err := mb.Registry.AddModule("cloudfoundrytest", mtest.NewModuleMock); err != nil { + panic(err) + } + mb.Registry.MustAddMetricSet("cloudfoundrytest", "test", newTestMetricSet, + mb.WithHostParser(parse.EmptyHostParser), + mb.DefaultMetricSet(), + ) +} + +func newTestMetricSet(base mb.BaseMetricSet) (mb.MetricSet, error) { + return New(base) +} + +func TestMetricSet(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(valueMetricEnvelope("duration", 12.34, "ms")) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms) + require.NotEmpty(t, events) + + expectedFields := common.MapStr{ + "cloudfoundry.envelope.deployment": "test", + "cloudfoundry.envelope.index": "index", + "cloudfoundry.envelope.ip": "127.0.0.1", + "cloudfoundry.envelope.job": "test", + "cloudfoundry.envelope.origin": "test", + "cloudfoundry.type": "value", + "cloudfoundry.value.name": "duration", + "cloudfoundry.value.unit": "ms", + "cloudfoundry.value.value": float64(12.34), + } + require.Equal(t, expectedFields, events[0].RootFields.Flatten()) +} + +func TestValuesAreNumbers(t *testing.T) { + logp.TestingSetup(logp.WithSelectors("cloudfoundry")) + + config := map[string]interface{}{ + "module": "cloudfoundrytest", + "client_id": "dummy", + "client_secret": "dummy", + "api_address": "dummy", + "shard_id": "dummy", + } + + ms := mbtest.NewPushMetricSetV2(t, config) + hub := ms.Module().(*mtest.ModuleMock).Hub + + go func() { + hub.SendEnvelope(valueMetricEnvelope("duration", math.NaN(), "ms")) + hub.SendEnvelope(valueMetricEnvelope("duration", 12.34, "ms")) + hub.SendEnvelope(valueMetricEnvelope("duration", math.Inf(1), "ms")) + hub.SendEnvelope(valueMetricEnvelope("duration", 34.56, "ms")) + }() + + events := mbtest.RunPushMetricSetV2(10*time.Second, 2, ms) + require.NotEmpty(t, events) + + for _, e := range events { + value, err := e.RootFields.GetValue("cloudfoundry.value.value") + if assert.NoError(t, err) { + assert.False(t, math.IsNaN(value.(float64))) + assert.False(t, math.IsInf(value.(float64), 0)) + } + } +} + +func valueMetricEnvelope(name string, value float64, unit string) *events.Envelope { + eventType := events.Envelope_ValueMetric + origin := "test" + deployment := "test" + job := "test" + ip := "127.0.0.1" + index := "index" + timestamp := time.Now().Unix() + return &events.Envelope{ + EventType: &eventType, + Timestamp: ×tamp, + Origin: &origin, + Deployment: &deployment, + Job: &job, + Ip: &ip, + Index: &index, + ValueMetric: &events.ValueMetric{ + Name: &name, + Value: &value, + Unit: &unit, + }, + } +}