From ae5d3798dfcfeaa9c73bbd08d6f97e27805ae7a2 Mon Sep 17 00:00:00 2001 From: ruflin Date: Wed, 27 Dec 2017 10:16:48 +1100 Subject: [PATCH] Dedot keys for jolokia metricsets This should prevent name collisions and allow filtering on these fields. The feature is enabled by default and cannot be disable as otherwise ingestion could stop as Elasticsearch is returning an error. Currently not config option is provided as this should not be disabled. * Introduces DeDot function in common package * Adds tests to jmx for new functionality * Use new function also for docker labels Related to https://github.com/elastic/beats/issues/5942 Closes https://github.com/elastic/beats/pull/5916 --- CHANGELOG.asciidoc | 1 + libbeat/common/event.go | 6 +++ metricbeat/module/docker/helper.go | 2 +- .../jmx/_meta/test/jolokia_response.json | 19 ++++++++ metricbeat/module/jolokia/jmx/data.go | 17 +++++++- metricbeat/module/jolokia/jmx/data_test.go | 43 +++++++++++++------ 6 files changed, 73 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 07168f4aba50..9c8a36ad5655 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,6 +38,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di - Rename `http.response.status_code` field to `http.response.code`. {pull}5521[5521] - Rename `golang.heap.system.optained` field to `golang.heap.system.obtained`. {issue}5703[5703] - Support haproxy stats gathering using http (additionaly to tcp socket). {pull}5819[5819] +- De dot keys in jolokia/jmx metricset to prevent collisions. {pull}5957[5957] *Packetbeat* diff --git a/libbeat/common/event.go b/libbeat/common/event.go index 24addc3b19c5..d81ff8c59885 100644 --- a/libbeat/common/event.go +++ b/libbeat/common/event.go @@ -250,3 +250,9 @@ func joinKeys(keys ...string) string { func (f Float) MarshalJSON() ([]byte, error) { return []byte(fmt.Sprintf("%.6f", f)), nil } + +// DeDot a string by replacing all . with _ +// This helps when sending data to Elasticsearch to prevent object and key collisions. +func DeDot(s string) string { + return strings.Replace(s, ".", "_", -1) +} diff --git a/metricbeat/module/docker/helper.go b/metricbeat/module/docker/helper.go index 3c950c95bd18..285d8e2c4165 100644 --- a/metricbeat/module/docker/helper.go +++ b/metricbeat/module/docker/helper.go @@ -54,7 +54,7 @@ func DeDotLabels(labels map[string]string) common.MapStr { for k, v := range labels { // This is necessary so that ES does not interpret '.' fields as new // nested JSON objects, and also makes this compatible with ES 2.x. - label := strings.Replace(k, ".", "_", -1) + label := common.DeDot(k) outputLabels.Put(label, v) } diff --git a/metricbeat/module/jolokia/jmx/_meta/test/jolokia_response.json b/metricbeat/module/jolokia/jmx/_meta/test/jolokia_response.json index effa8f9fc510..3d3082c02487 100644 --- a/metricbeat/module/jolokia/jmx/_meta/test/jolokia_response.json +++ b/metricbeat/module/jolokia/jmx/_meta/test/jolokia_response.json @@ -52,5 +52,24 @@ }, "timestamp": 1472298687, "status": 200 + }, + { + "request": { + "mbean": "org.springframework.boot:type=Endpoint,name=metricsEndpoint", + "attribute": [ + "Metrics" + ], + "type": "read" + }, + "value": { + "Metrics": { + "atomikos.nbTransactions": 0.000000, + "classes": 18857.000000, + "classes.loaded": 19127.000000, + "classes.unloaded": 270.000000 + } + }, + "timestamp": 1472298687, + "status": 200 } ] diff --git a/metricbeat/module/jolokia/jmx/data.go b/metricbeat/module/jolokia/jmx/data.go index b9a0c6c9e9be..294c426004b6 100644 --- a/metricbeat/module/jolokia/jmx/data.go +++ b/metricbeat/module/jolokia/jmx/data.go @@ -81,9 +81,22 @@ func parseResponseEntry( key, exists := mapping[metricName] if !exists { - return errors.Errorf("metric key '%v' not found in response", key) + return errors.Errorf("metric key '%v' not found in response", metricName) + } + + var err error + + // In case the attributeValue is a map the keys are dedotted + c, ok := attibuteValue.(map[string]interface{}) + if ok { + newData := map[string]interface{}{} + for k, v := range c { + newData[common.DeDot(k)] = v + } + _, err = event.Put(key, newData) + } else { + _, err = event.Put(key, attibuteValue) } - _, err := event.Put(key, attibuteValue) return err } diff --git a/metricbeat/module/jolokia/jmx/data_test.go b/metricbeat/module/jolokia/jmx/data_test.go index 83d85325c228..864fbc412254 100644 --- a/metricbeat/module/jolokia/jmx/data_test.go +++ b/metricbeat/module/jolokia/jmx/data_test.go @@ -26,20 +26,39 @@ func TestEventMapper(t *testing.T) { "java.lang:type=GarbageCollector,name=ConcurrentMarkSweep_CollectionCount": "gc.cms_collection_count", "java.lang:type=Memory_HeapMemoryUsage": "memory.heap_usage", "java.lang:type=Memory_NonHeapMemoryUsage": "memory.non_heap_usage", + "org.springframework.boot:type=Endpoint,name=metricsEndpoint_Metrics": "metrics", } event, err := eventMapping(jolokiaResponse, mapping) - assert.Nil(t, err) - assert.EqualValues(t, 47283, event["uptime"]) - assert.EqualValues(t, 53, event["gc"].(common.MapStr)["cms_collection_time"]) - assert.EqualValues(t, 1, event["gc"].(common.MapStr)["cms_collection_count"]) - assert.EqualValues(t, 1073741824, event["memory"].(common.MapStr)["heap_usage"].(map[string]interface{})["init"]) - assert.EqualValues(t, 1037959168, event["memory"].(common.MapStr)["heap_usage"].(map[string]interface{})["committed"]) - assert.EqualValues(t, 1037959168, event["memory"].(common.MapStr)["heap_usage"].(map[string]interface{})["max"]) - assert.EqualValues(t, 227420472, event["memory"].(common.MapStr)["heap_usage"].(map[string]interface{})["used"]) - assert.EqualValues(t, 2555904, event["memory"].(common.MapStr)["non_heap_usage"].(map[string]interface{})["init"]) - assert.EqualValues(t, 53477376, event["memory"].(common.MapStr)["non_heap_usage"].(map[string]interface{})["committed"]) - assert.EqualValues(t, -1, event["memory"].(common.MapStr)["non_heap_usage"].(map[string]interface{})["max"]) - assert.EqualValues(t, 50519768, event["memory"].(common.MapStr)["non_heap_usage"].(map[string]interface{})["used"]) + + expected := common.MapStr{ + "uptime": float64(47283), + "gc": common.MapStr{ + "cms_collection_time": float64(53), + "cms_collection_count": float64(1), + }, + "memory": common.MapStr{ + "heap_usage": map[string]interface{}{ + "init": float64(1073741824), + "committed": float64(1037959168), + "max": float64(1037959168), + "used": float64(227420472), + }, + "non_heap_usage": map[string]interface{}{ + "init": float64(2555904), + "committed": float64(53477376), + "max": float64(-1), + "used": float64(50519768), + }, + }, + "metrics": map[string]interface{}{ + "atomikos_nbTransactions": float64(0), + "classes": float64(18857), + "classes_loaded": float64(19127), + "classes_unloaded": float64(270), + }, + } + + assert.Equal(t, expected, event) }