From 5818b8bf773e98b9664eb3b8e8bbd408afefc92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Babacar=20Diass=C3=A9?= Date: Wed, 24 Jun 2020 11:21:26 +0200 Subject: [PATCH] Add max.poll.interval.ms kafka producer setting --- src/main/config/secor.common.properties | 1 + src/main/java/com/pinterest/secor/common/SecorConfig.java | 4 ++++ .../com/pinterest/secor/reader/SecorKafkaMessageIterator.java | 1 + 3 files changed, 6 insertions(+) diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index 0ce0429bd..80dd0f422 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -186,6 +186,7 @@ kafka.new.consumer.ssl.keystore.password= kafka.new.consumer.ssl.truststore.location= kafka.new.consumer.ssl.truststore.password= kafka.new.consumer.isolation.level= +kafka.new.consumer.max.poll.interval.ms= kafka.new.consumer.max.poll.records= kafka.new.consumer.sasl.client.callback.handler.class= kafka.new.consumer.sasl.jaas.config= diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 6cd23caab..aaae1f122 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -145,6 +145,10 @@ public String getIsolationLevel() { return getString("kafka.new.consumer.isolation.level"); } + public String getMaxPollIntervalMs() { + return getString("kafka.new.consumer.max.poll.interval.ms"); + } + public String getMaxPollRecords() { return getString("kafka.new.consumer.max.poll.records"); } diff --git a/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java b/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java index b7598ec11..ce4d10cf9 100644 --- a/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java +++ b/src/main/java/com/pinterest/secor/reader/SecorKafkaMessageIterator.java @@ -103,6 +103,7 @@ public void init(SecorConfig config) throws UnknownHostException { optionalConfig(config.getSslTruststoreLocation(), conf -> props.put("ssl.truststore.location", conf)); optionalConfig(config.getSslTruststorePassword(), conf -> props.put("ssl.truststore.password", conf)); optionalConfig(config.getIsolationLevel(), conf -> props.put("isolation.level", conf)); + optionalConfig(config.getMaxPollIntervalMs(), conf -> props.put("max.poll.interval.ms", conf)); optionalConfig(config.getMaxPollRecords(), conf -> props.put("max.poll.records", conf)); optionalConfig(config.getSaslClientCallbackHandlerClass(), conf -> props.put("sasl.client.callback.handler.class", conf)); optionalConfig(config.getSaslJaasConfig(), conf -> props.put("sasl.jaas.config", conf));