Skip to content

Commit

Permalink
GH-2943: Fix KT.clusterId for concurrency (#2944)
Browse files Browse the repository at this point in the history
Fixes: #2943

The `if (this.kafkaAdmin != null && this.clusterId == null) {` condition
might be always true for concurrent threads (especially virtual).
Therefore, all of those threads are calling `this.kafkaAdmin.clusterId()`
making unnecessary network chats to Kafka broker

* Surround `this.kafkaAdmin.clusterId()` call with `Lock`

**Cherry-pick to `3.0.x`**
  • Loading branch information
artembilan authored and sobychacko committed Dec 17, 2023
1 parent 654c531 commit e813218
Showing 1 changed file with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -117,6 +119,8 @@ public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationCo

private final Map<String, String> micrometerTags = new HashMap<>();

private final Lock clusterIdLock = new ReentrantLock();

private String beanName = "kafkaTemplate";

private ApplicationContext applicationContext;
Expand Down Expand Up @@ -500,7 +504,15 @@ else if (this.micrometerEnabled) {
@Nullable
private String clusterId() {
if (this.kafkaAdmin != null && this.clusterId == null) {
this.clusterId = this.kafkaAdmin.clusterId();
this.clusterIdLock.lock();
try {
if (this.clusterId == null) {
this.clusterId = this.kafkaAdmin.clusterId();
}
}
finally {
this.clusterIdLock.unlock();
}
}
return this.clusterId;
}
Expand Down

0 comments on commit e813218

Please sign in to comment.