From c7bc9faad1349789f705b95aafc14b5c6803c71c Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Thu, 18 Oct 2018 12:34:07 +0200 Subject: [PATCH] Add kafka fields needed for dashboards (#8504) Kafka dashboards calculate cardinalities of composed fields, this is expensive in query time, but can be optimized if the composed fields are precalculated. Continues with #7767, needed for #8457. (cherry picked from commit 78ef1203fbe66826f4f03a6a79473774d6f2e4f7) --- CHANGELOG.asciidoc | 1 + metricbeat/docs/fields.asciidoc | 18 ++++++++++++++++++ metricbeat/module/kafka/_meta/fields.yml | 10 ++++++++++ .../module/kafka/consumergroup/_meta/data.json | 7 ++++--- .../kafka/consumergroup/consumergroup.go | 7 ++++++- metricbeat/module/kafka/fields.go | 2 +- .../module/kafka/partition/_meta/data.json | 9 ++++++--- metricbeat/module/kafka/partition/partition.go | 13 ++++++++++--- 8 files changed, 56 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e9211ee4aaa..a7ba5fa12e9 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -134,6 +134,7 @@ https://github.com/elastic/beats/compare/v6.4.0...6.x[Check the HEAD diff] - Support for Kafka 2.0.0 {pull}8399[8399] - Added support for query params in configuration {issue}8286[8286] {pull}8292[8292] - Add container image for docker metricsets. {issue}8214[8214] {pull}8438[8438] +- Precalculate composed id fields for kafka dashboards. {pull}8504[8504] *Packetbeat* diff --git a/metricbeat/docs/fields.asciidoc b/metricbeat/docs/fields.asciidoc index 4d844e15862..5ae8a7209aa 100644 --- a/metricbeat/docs/fields.asciidoc +++ b/metricbeat/docs/fields.asciidoc @@ -7108,6 +7108,24 @@ type: long Partition id. +-- + +*`kafka.partition.topic_id`*:: ++ +-- +type: keyword + +Unique id of the partition in the topic. + +-- + +*`kafka.partition.topic_broker_id`*:: ++ +-- +type: keyword + +Unique id of the partition in the topic and the broker. + -- [float] diff --git a/metricbeat/module/kafka/_meta/fields.yml b/metricbeat/module/kafka/_meta/fields.yml index 99e93eafd22..e6b507c1449 100644 --- a/metricbeat/module/kafka/_meta/fields.yml +++ b/metricbeat/module/kafka/_meta/fields.yml @@ -39,3 +39,13 @@ type: long description: > Partition id. + + - name: partition.topic_id + type: keyword + description: + Unique id of the partition in the topic. + + - name: partition.topic_broker_id + type: keyword + description: + Unique id of the partition in the topic and the broker. diff --git a/metricbeat/module/kafka/consumergroup/_meta/data.json b/metricbeat/module/kafka/consumergroup/_meta/data.json index b163e6f7475..55cc9cefda8 100644 --- a/metricbeat/module/kafka/consumergroup/_meta/data.json +++ b/metricbeat/module/kafka/consumergroup/_meta/data.json @@ -17,7 +17,7 @@ "client": { "host": "172.18.0.1", "id": "sarama", - "member_id": "sarama-714cfb8b-39e5-4128-9109-e5056c5e8f56" + "member_id": "sarama-fcb5a5db-0474-4f3a-a5af-29e2f14549c5" }, "error": { "code": 0 @@ -29,7 +29,8 @@ "topic": "metricbeat-test" }, "partition": { - "id": 0 + "id": 0, + "topic_id": "0-metricbeat-test" }, "topic": { "name": "metricbeat-test" @@ -41,4 +42,4 @@ "name": "consumergroup", "rtt": 115 } -} \ No newline at end of file +} diff --git a/metricbeat/module/kafka/consumergroup/consumergroup.go b/metricbeat/module/kafka/consumergroup/consumergroup.go index 713e27e5df9..c79ad06d1ad 100644 --- a/metricbeat/module/kafka/consumergroup/consumergroup.go +++ b/metricbeat/module/kafka/consumergroup/consumergroup.go @@ -19,6 +19,7 @@ package consumergroup import ( "crypto/tls" + "fmt" "github.com/pkg/errors" @@ -111,6 +112,9 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } emitEvent := func(event common.MapStr) { + // Helpful IDs to avoid scripts on queries + partitionTopicID := fmt.Sprintf("%d-%s", event["partition"], event["topic"]) + // TODO (deprecation): Remove fields from MetricSetFields moved to ModuleFields event["broker"] = brokerInfo r.Event(mb.Event{ @@ -120,7 +124,8 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { "name": event["topic"], }, "partition": common.MapStr{ - "id": event["partition"], + "id": event["partition"], + "topic_id": partitionTopicID, }, }, MetricSetFields: event, diff --git a/metricbeat/module/kafka/fields.go b/metricbeat/module/kafka/fields.go index d303ae9568c..80e88dd2b5b 100644 --- a/metricbeat/module/kafka/fields.go +++ b/metricbeat/module/kafka/fields.go @@ -31,5 +31,5 @@ func init() { // Asset returns asset data func Asset() string { - return "eJy8WE2P2joU3fMrrmbFLCZv9d4iiye1M1WFWrXVtHtk4huwcGxkG6b8+8oOIYkTOx8wzar1kHPO/b7OE+zxnMKe5HuyADDMcEzh4Yv9/8MCgKLOFDsYJkUK/y8AANzfoJD0yHEBoHdSmXUmRc62KeSEa3uqkCPRmMIGjQXOGXKqUwfwBIIUWJPax5wPmMJWyePhctLD3IZpQm2U3KO6HvfhBTHL56NDgGcp9LFABZ/tq7ASuVQFsS/AjpwQNogCFBIKuZIFLC+v7YignIltC9LsELIKz0l5TBo/8G1p2sNo67iyh0uPImpSwyxGF708hFKFWveS7fH8JpUvZBwfoSdUhmmkV4pOzIw8sCyx/+7ErUsdof1lcRxmiAOVkirJJO0yeR4dpHFQYKGSLtuBKMPsu0krflOZflQwwGjSIakyys/tyRkfAvJLt3pi+dqpv7CiUtVBYUYM0hT+S/6dlWL3rFQYrtaQByBWtRCr3BGmQquCq5Mgf381w0BFT9Phc0xpXyM6ih/VlziRq+9JXKMTr9FWohKuRT++Y4/WUHeCIUfIPNdoZg2Na9KXGMCEkbVVsEEmtq6Q4gqKdreo+Q3+9oW1+Y/ayKKWYYGAEkNAG9Ws1F7a3t4+3vhurrut5Krmn9oRzdYflZRxhqI/Fv3NMKrng9ZsK5BeYF0kbETwhMLosrmFcvCGhjWyVzyXolYvsCwdp9EYK69UmzD6ONy3dlL77porpAUVJCyw2KBa324+EwaVINybHBeCZtXG2sXk0d0HMmdsR7rGnEw9EcbJhuMFV4PM3VjdshOKxnY0MUsFvmEkQeaP1m8OuOp7F7FBmQ23cfo+gr474GFBMwbRjHjW08d247+xDMUG4wjBEFyegyo5EtpZXEcoHdTx1QEDo7Asl+PONtlwlQ4rIEqR80wJzOWShQdGdZhf4YGzzJ/e9/DCa4nc74awP4Q+i2w9JGsjJUfip/tIZStBmc01DSyvHABMAxMZP1KkwISrPiaerJjqJwbthIPl6ufrKEv0eiDD3sUIK9zjDUoMLlBwh/h/uu5M5aKSo8l2dj0Y29QiF/iovptumKZz1Q9L875fwP0uIF1dY68kZaUlUz4i3X4f99g8LXM+NN2uKXhZXfwJAAD//8X6Jzc=" + return "eJy8WE1v4zYQvftXDHJyDlFP7UGHAm1SFEaL3UV292zQ4sgmTJFaknbW/35ByrIlSqQ+7ESnmJHeexwO3wz5BHs8pbAn+Z4sAAwzHFN4+M/+flgAUNSZYqVhUqTw5wIAwP0PCkkPHBcAeieVWWdS5GybQk64tqMKORKNKWzQWOCcIac6dQBPIEiBV1L7mFOJKWyVPJTnkR7mNkwTaqPkHtVluA8viFk9fzsEeJZCHwpU8K/9FFYil6og9gPYkSPCBlGAQkIhV7KA5fmzHRGUM7FtQZodQlbjOSmPSeMFfy7N+TDaGq7nw6VHEZ1SY1qMLnp5CKUKte4l2+PpTSpfyDg+Qo+oDNNILxSdNTOyZFli/+6sW5c6QvvN4jjMEAcqJVWSSdpl8iI6SOOgwEIlXbaSKMPst0lr/aYyfalhgNEoi5vduocrHr8W2XfBfhwQGAWZu4wtr+zCDVQxHNZR7cGPkQNEUPerIk064up95zvAZF8IAfkGVz+xXd1xqbCiSlWpMCMGaQp/JL/P2oj39DMY9rRQBCDmbRDztxFThZbP1SNB/n7PgwHfm6bD55hi8iN811/VlziR2zKTuEYnXsN8oxIu23h8XRut4eqXQ4GQea7RzCqtl6SvMIAJIxvmtEEmtm4jxRUUbbe48hv86Qtr8x+0kcVVhgUCSgwBbVRzp/bS9lbA8ZPv5rrr3S5qfrsGolkgo5IyzlD0r0W/GUb1/KU12wqkZ1i3EnZF8IjC6MrcQjl4g2GN9IrnStTqBZZV4DQaY+VVahNGH4d9aye1H665QlpQQcICi41f0GexMmFQCcK9ynEmaO7amF1MLt19IHPKdsQ15mTqkTBONhzPuLrudLbsiKLRVU3MUoFvGEmQ+aX1kwOufc9vy3yZjbBx+j6CPjvgYUEzCtGM9bxWH+vGH9EMxQrjCMHQOWLUo0GVHAntNK4jlA7q+N8B23Z/WTXHnW6yESodVkCUIqeZEpjLJQsPjOowv8KSs8yv3veIwmuF3B+GcDyEPolsPSRrIyVH4qf7SGUrQZnNNQ0srwMATAMTGT9QpPWhjIknK6Z+xaCtcLBcfX0dNRO9Hsiwd5mEFe7xBiUGGyi4w/r/c+mZqkYlR5PtbHsw1tQi1xxRfTedME3nQiQszbvlgfsdQLq6xh5JzlcHU67abj+Pe2yeljnXcbdrCh5WF78CAAD//1a2ff4=" } diff --git a/metricbeat/module/kafka/partition/_meta/data.json b/metricbeat/module/kafka/partition/_meta/data.json index 519feb1bf75..f8a2bbf5085 100644 --- a/metricbeat/module/kafka/partition/_meta/data.json +++ b/metricbeat/module/kafka/partition/_meta/data.json @@ -14,6 +14,7 @@ "address": "172.18.0.2:9092", "id": 0 }, + "id": 0, "offset": { "newest": 0, "oldest": 0 @@ -26,11 +27,13 @@ "replica": 0 }, "topic": { - "name": "foo-1532945633-185372965" - } + "name": "foo-1538389014-739473801" + }, + "topic_broker_id": "0-foo-1538389014-739473801-0", + "topic_id": "0-foo-1538389014-739473801" }, "topic": { - "name": "foo-1532945633-185372965" + "name": "foo-1538389014-739473801" } }, "metricset": { diff --git a/metricbeat/module/kafka/partition/partition.go b/metricbeat/module/kafka/partition/partition.go index e9cbbf9154e..bd68c155415 100644 --- a/metricbeat/module/kafka/partition/partition.go +++ b/metricbeat/module/kafka/partition/partition.go @@ -20,6 +20,7 @@ package partition import ( "crypto/tls" "errors" + "fmt" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" @@ -167,8 +168,17 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { } } + // Helpful IDs to avoid scripts on queries + partitionTopicID := fmt.Sprintf("%d-%s", partition.ID, topic.Name) + partitionTopicBrokerID := fmt.Sprintf("%s-%d", partitionTopicID, id) + // create event event := common.MapStr{ + // Common `kafka.partition` fields + "id": partition.ID, + "topic_id": partitionTopicID, + "topic_broker_id": partitionTopicBrokerID, + "topic": evtTopic, "broker": evtBroker, "partition": partitionEvent, @@ -183,9 +193,6 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) { ModuleFields: common.MapStr{ "broker": evtBroker, "topic": evtTopic, - "partition": common.MapStr{ - "id": partition.ID, - }, }, MetricSetFields: event, })