From ebd275007d48dde25bdcd0952296d4ad8797c08b Mon Sep 17 00:00:00 2001 From: Pier-Hugues Pellerin Date: Wed, 13 Mar 2019 19:12:01 -0400 Subject: [PATCH] Cherry-pick #10682 to 7.0: Expose the Metadata Refresh stragegy for the Kafka output. (#11161) Cherry-pick of PR #10682 to 7.0 branch. Original message: By default the Kafka output will periodically refresh the metadata information for all the available topics. This is the default strategy that beat uses. But if you have strict permissions in place for your Kafka cluster this could lead to errors in your log while trying to get information for a topic that you don't have permissions. This commit keep the default behavior but allow to change the strategy from the config file. --- CHANGELOG.next.asciidoc | 3 +- auditbeat/auditbeat.reference.yml | 7 +++-- filebeat/filebeat.reference.yml | 7 +++-- heartbeat/heartbeat.reference.yml | 7 +++-- journalbeat/journalbeat.reference.yml | 7 +++-- libbeat/_meta/config.reference.yml | 7 +++-- libbeat/docs/outputconfig.asciidoc | 28 +++++++++++-------- libbeat/outputs/kafka/config.go | 3 ++ metricbeat/metricbeat.reference.yml | 7 +++-- packetbeat/packetbeat.reference.yml | 7 +++-- winlogbeat/winlogbeat.reference.yml | 7 +++-- x-pack/auditbeat/auditbeat.reference.yml | 7 +++-- x-pack/filebeat/filebeat.reference.yml | 7 +++-- .../functionbeat/functionbeat.reference.yml | 7 +++-- x-pack/metricbeat/metricbeat.reference.yml | 7 +++-- 15 files changed, 81 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7d13a487f608..38d844b84bbb 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -54,6 +54,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di - Add missing host.* fields to fields.yml. {pull}11016[11016] - Include ip and boolean type when generating index pattern. {pull}10995[10995] - Using an environment variable for the password when enrolling a beat will now raise an error if the variable doesn't exist. {pull}10936[10936] +- Allow to configure Kafka fetching strategy for the topic metadata. {pull}10682[10682] *Auditbeat* @@ -66,7 +67,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di - Fix errors in filebeat Zeek dashboard and README files. Add notice.log support. {pull}10916[10916] - Fix a bug when converting NetFlow fields to snake_case. {pull}10950[10950] -- Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test +- Add on_failure handler for Zeek ingest pipelines. Fix one field name error for notice and add an additional test case. {issue}11004[11004] {pull}11105[11105] *Heartbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 534778657a13..381b3c04b3dc 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -228,7 +228,7 @@ auditbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -319,7 +319,7 @@ auditbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -621,6 +621,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index f854e0b87fb3..ebf72cb51cdd 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -929,7 +929,7 @@ filebeat.inputs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -1020,7 +1020,7 @@ filebeat.inputs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1322,6 +1322,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index be2ee368934d..2914a427fbd7 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -372,7 +372,7 @@ heartbeat.scheduler: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -463,7 +463,7 @@ heartbeat.scheduler: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -765,6 +765,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 006bfe9324c4..f72f40318a55 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -168,7 +168,7 @@ setup.template.settings: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -259,7 +259,7 @@ setup.template.settings: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -561,6 +561,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/libbeat/_meta/config.reference.yml b/libbeat/_meta/config.reference.yml index c88d26a22f19..88145350c823 100644 --- a/libbeat/_meta/config.reference.yml +++ b/libbeat/_meta/config.reference.yml @@ -116,7 +116,7 @@ # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -207,7 +207,7 @@ # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -509,6 +509,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/libbeat/docs/outputconfig.asciidoc b/libbeat/docs/outputconfig.asciidoc index c62546254343..c353e6786ab4 100644 --- a/libbeat/docs/outputconfig.asciidoc +++ b/libbeat/docs/outputconfig.asciidoc @@ -249,7 +249,7 @@ endif::no_dashboards[] You can set the index dynamically by using a format string to access any event field. For example, this configuration uses a custom field, `fields.log_type`, -to set the index: +to set the index: ["source","yaml",subs="attributes"] ------------------------------------------------------------------------------ @@ -261,13 +261,13 @@ output.elasticsearch: <1> We recommend including +{beat_version_key}+ in the name to avoid mapping issues when you upgrade. -With this configuration, all events with `log_type: normal` are sent to an +With this configuration, all events with `log_type: normal` are sent to an index named +normal-{version}-{localdate}+, and all events with `log_type: critical` are sent to an index named +critical-{version}-{localdate}+. TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the index dynamically. @@ -285,7 +285,7 @@ matches, the <> setting is used. Rule settings: *`index`*:: The index format string to use. If this string contains field -references, such as `%{[fields.name]}`, the fields must exist, or the rule fails. +references, such as `%{[fields.name]}`, the fields must exist, or the rule fails. *`mappings`*:: A dictionary that takes the value returned by `index` and maps it to a new name. @@ -347,7 +347,7 @@ ifndef::no_ilm[] [[ilm-es]] ===== `ilm` -Configuration options for index lifecycle management. +Configuration options for index lifecycle management. See <> for more information. endif::no_ilm[] @@ -369,7 +369,7 @@ For more information, see <>. You can set the ingest node pipeline dynamically by using a format string to access any event field. For example, this configuration uses a custom field, -`fields.log_type`, to set the pipeline for each event: +`fields.log_type`, to set the pipeline for each event: ["source","yaml",subs="attributes"] ------------------------------------------------------------------------------ @@ -384,7 +384,7 @@ named `normal_pipeline`, and all events with `log_type: critical` are sent to a pipeline named `critical_pipeline`. TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the ingest node pipeline dynamically. @@ -403,7 +403,7 @@ Rule settings: *`pipeline`*:: The pipeline format string to use. If this string contains field references, such as `%{[fields.name]}`, the fields must exist, or the rule -fails. +fails. *`mappings`*:: A dictionary that takes the value returned by `pipeline` and maps it to a new name. @@ -870,7 +870,7 @@ topic: '%{[fields.log_topic]}' ----- TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the topic dynamically. @@ -889,7 +889,7 @@ Rule settings: *`topic`*:: The topic format string to use. If this string contains field references, such as `%{[fields.name]}`, the fields must exist, or the rule -fails. +fails. *`mappings`*:: A dictionary that takes the value returned by `topic` and maps it to a new name. @@ -901,7 +901,7 @@ match. ifndef::no-processors[] All the <> supported by processors are also supported here. -endif::no-processors[] +endif::no-processors[] ===== `key` @@ -955,6 +955,10 @@ brokers, topics, partition, and active leaders to use for publishing. *`refresh_frequency`*:: Metadata refresh interval. Defaults to 10 minutes. +*`full`*:: Strategy to use when fetching metadata, when this option is `true`, the client will maintain +a full set of metadata for all the available topics, if the this option is set to `false` it will only refresh the +metadata for the configured topics. The default is true. + *`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3. *`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms. @@ -1099,7 +1103,7 @@ output.redis: TIP: To learn how to add custom fields to events, see the -<> option. +<> option. See the <> setting for other ways to set the key dynamically. diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 48a46491f918..b042a3038bea 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -62,6 +62,7 @@ type kafkaConfig struct { type metaConfig struct { Retry metaRetryConfig `config:"retry"` RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"` + Full bool `config:"full"` } type metaRetryConfig struct { @@ -90,6 +91,7 @@ func defaultConfig() kafkaConfig { Backoff: 250 * time.Millisecond, }, RefreshFreq: 10 * time.Minute, + Full: true, }, KeepAlive: 0, MaxMessageBytes: nil, // use library default @@ -177,6 +179,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Metadata.Retry.Max = config.Metadata.Retry.Max k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq + k.Metadata.Full = config.Metadata.Full // configure producer API properties if config.MaxMessageBytes != nil { diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index d6f0d0a79f72..6403c7a680dc 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -826,7 +826,7 @@ metricbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -917,7 +917,7 @@ metricbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1219,6 +1219,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 09e21d901db9..1eb03dae6014 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -596,7 +596,7 @@ packetbeat.ignore_outgoing: false # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -687,7 +687,7 @@ packetbeat.ignore_outgoing: false # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -989,6 +989,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index ec534c84fcf0..b13c6253ff8b 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -145,7 +145,7 @@ winlogbeat.event_logs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -236,7 +236,7 @@ winlogbeat.event_logs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -538,6 +538,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index 239da5691a1c..25eb815890f0 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -263,7 +263,7 @@ auditbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -354,7 +354,7 @@ auditbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -656,6 +656,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 531d2d01bfce..2dc1a6907547 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1000,7 +1000,7 @@ filebeat.inputs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -1091,7 +1091,7 @@ filebeat.inputs: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1393,6 +1393,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 05feb66beecb..7184c2f40acc 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -261,7 +261,7 @@ functionbeat.provider.aws.functions: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -352,7 +352,7 @@ functionbeat.provider.aws.functions: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -654,6 +654,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1 diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index a601828e2093..490d373be130 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -845,7 +845,7 @@ metricbeat.modules: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, +# The supported processors are drop_fields, drop_event, include_fields, # decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that @@ -936,7 +936,7 @@ metricbeat.modules: # match_pids: ["system.process.ppid"] # target: system.process.parent # -# The following example decodes fields containing JSON strings +# The following example decodes fields containing JSON strings # and replaces the strings with valid JSON objects. # #processors: @@ -1238,6 +1238,9 @@ output.elasticsearch: # Refresh metadata interval. Defaults to every 10 minutes. #refresh_frequency: 10m + # Strategy for fetching the topics metadata from the broker. Default is true. + #full: true + # The number of concurrent load-balanced Kafka output workers. #worker: 1