diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 07b64c044d745..10293561ac851 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -105,14 +105,6 @@ public Set unused() { return keys; } - public Map unusedConfigs() { - Set unusedKeys = this.unused(); - Map unusedProps = new HashMap<>(); - for (String key : unusedKeys) - unusedProps.put(key, this.originals.get(key)); - return unusedProps; - } - public Map originals() { Map copy = new RecordingMap<>(); copy.putAll(originals); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 359a79c8e1e8b..f5b23ecda6b7e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -101,7 +101,7 @@ public void start() { producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1"); - producerProps.putAll(config.unusedConfigs()); + producerProps.putAll(config.originalsWithPrefix("producer.")); producer = new KafkaProducer<>(producerProps); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 643b10e25acfb..e0a3e04b639f0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -233,7 +233,8 @@ public WorkerConfig workerConfig() { private KafkaConsumer createConsumer() { // Include any unknown worker configs so consumer configs can be set globally on the worker // and through to the task - Map props = workerConfig.unusedConfigs(); + Map props = new HashMap<>(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector()); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ",")); @@ -242,6 +243,8 @@ private KafkaConsumer createConsumer() { props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + props.putAll(workerConfig.originalsWithPrefix("consumer.")); + KafkaConsumer newConsumer; try { newConsumer = new KafkaConsumer<>(props);