From caf40c0c9041b62dd7e7ca9e02e6fe0bec6e55f6 Mon Sep 17 00:00:00 2001 From: Monica Sarbu Date: Wed, 5 Oct 2016 21:58:39 +0200 Subject: [PATCH] Send float fields as float values to Elasticsearch, to remove defining the mapping in advance (#2627) (#2687) --- CHANGELOG.asciidoc | 2 ++ libbeat/common/event.go | 10 +++++++++- libbeat/common/event_test.go | 18 ++++++++++++++++++ libbeat/processors/condition.go | 4 ++-- metricbeat/tests/system/test_processors.py | 2 +- 5 files changed, 32 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 0efdb4b24c8c..a2556b83b0f7 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -28,6 +28,8 @@ https://github.com/elastic/beats/compare/v5.0.0-beta1...master[Check the HEAD di ==== Bugfixes *Affecting all Beats* +- Make sure Beats sent always float values when they are defined as float by sending 5.00000 instead of 5. + {pull}2627[2627] *Metricbeat* diff --git a/libbeat/common/event.go b/libbeat/common/event.go index 1ffd4c8a8198..28b36209add1 100644 --- a/libbeat/common/event.go +++ b/libbeat/common/event.go @@ -19,6 +19,8 @@ var eventDebugf = logp.MakeDebug(eventDebugSelector) var textMarshalerType = reflect.TypeOf((*encoding.TextMarshaler)(nil)).Elem() +type Float float64 + // ConvertToGenericEvent normalizes the types contained in the given MapStr. // // Nil values in maps are dropped during the conversion. Any unsupported types @@ -130,6 +132,7 @@ func normalizeValue(value interface{}, keys ...string) (interface{}, []error) { case uint, uint8, uint16, uint32, uint64: case []uint, []uint8, []uint16, []uint32, []uint64: case float32, float64: + return Float(value.(float64)), nil case []float32, []float64: case complex64, complex128: case []complex64, []complex128: @@ -156,7 +159,7 @@ func normalizeValue(value interface{}, keys ...string) (interface{}, []error) { case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64: return v.Uint(), nil case reflect.Float32, reflect.Float64: - return v.Float(), nil + return Float(v.Float()), nil case reflect.Complex64, reflect.Complex128: return v.Complex(), nil case reflect.String: @@ -221,3 +224,8 @@ func joinKeys(keys ...string) string { } return strings.Join(keys, ".") } + +// Defines the marshal of the Float type +func (f Float) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%.6f", f)), nil +} diff --git a/libbeat/common/event_test.go b/libbeat/common/event_test.go index 7cb6b2ca9d6c..6a8a7fa6a324 100644 --- a/libbeat/common/event_test.go +++ b/libbeat/common/event_test.go @@ -1,6 +1,7 @@ package common import ( + "encoding/json" "testing" "github.com/elastic/beats/libbeat/logp" @@ -297,6 +298,23 @@ func TestMarshalUnmarshalArray(t *testing.T) { } } +func TestMarshalFloatValues(t *testing.T) { + + assert := assert.New(t) + + var f float64 + + f = 5 + + a := MapStr{ + "f": Float(f), + } + + b, err := json.Marshal(a) + assert.Nil(err) + assert.Equal(string(b), "{\"f\":5.000000}") +} + // Uses TextMarshaler interface. func BenchmarkConvertToGenericEventNetString(b *testing.B) { for i := 0; i < b.N; i++ { diff --git a/libbeat/processors/condition.go b/libbeat/processors/condition.go index 152cbf258a18..944fc82de2e5 100644 --- a/libbeat/processors/condition.go +++ b/libbeat/processors/condition.go @@ -369,7 +369,7 @@ func (c *Condition) checkRange(event common.MapStr) bool { return false } - case float64, float32: + case float64, float32, common.Float: floatValue := reflect.ValueOf(value).Float() if !checkValue(floatValue, rangeValue) { @@ -377,7 +377,7 @@ func (c *Condition) checkRange(event common.MapStr) bool { } default: - logp.Warn("unexpected type %T in range condition as it accepts only strings. ", value) + logp.Warn("unexpected type %T in range condition. ", value) return false } diff --git a/metricbeat/tests/system/test_processors.py b/metricbeat/tests/system/test_processors.py index e01b2ff27f7f..e5b0202d6d2c 100644 --- a/metricbeat/tests/system/test_processors.py +++ b/metricbeat/tests/system/test_processors.py @@ -134,7 +134,7 @@ def test_dropevent_with_complex_condition(self): output = self.read_output( required_fields=["@timestamp", "type"], ) - assert len(output) == 1 + assert len(output) >= 1 def test_include_fields(self):