Skip to content

Commit

Permalink
KAFKA-2798: Use prefixedd configurations for Kafka Connect producer a…
Browse files Browse the repository at this point in the history
…nd consumer settings so they do not conflict with the distributed herder's settings.

Author: Ewen Cheslack-Postava <[email protected]>

Reviewers: Gwen Shapira

Closes apache#486 from ewencp/kafka-2798-conflicting-herder-producer-consumer-configs
  • Loading branch information
ewencp authored and gwenshap committed Nov 10, 2015
1 parent ae5a5d7 commit 403d89e
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,6 @@ public Set<String> unused() {
return keys;
}

public Map<String, Object> unusedConfigs() {
Set<String> unusedKeys = this.unused();
Map<String, Object> unusedProps = new HashMap<>();
for (String key : unusedKeys)
unusedProps.put(key, this.originals.get(key));
return unusedProps;
}

public Map<String, Object> originals() {
Map<String, Object> copy = new RecordingMap<>();
copy.putAll(originals);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void start() {
producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1");

producerProps.putAll(config.unusedConfigs());
producerProps.putAll(config.originalsWithPrefix("producer."));

producer = new KafkaProducer<>(producerProps);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ public WorkerConfig workerConfig() {
private KafkaConsumer<byte[], byte[]> createConsumer() {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
Map<String, Object> props = workerConfig.unusedConfigs();
Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
Expand All @@ -242,6 +243,8 @@ private KafkaConsumer<byte[], byte[]> createConsumer() {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");

props.putAll(workerConfig.originalsWithPrefix("consumer."));

KafkaConsumer<byte[], byte[]> newConsumer;
try {
newConsumer = new KafkaConsumer<>(props);
Expand Down

0 comments on commit 403d89e

Please sign in to comment.