From 1c160757af215444a8294c540b65b438ebf992a5 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 16 Sep 2024 15:31:49 -0400 Subject: [PATCH] Fix race condition in the `AbstractMessageListenerContainer` lifecycle The `stop()` and `childStopped()` in the `ConcurrentMessageListenerContainer` use the same `lifecycleLock`, however the last one is called from the thread of listener consumer in the child application context. * Fix `AbstractMessageListenerContainer.stop(wait)` logic to release the `lifecycleLock` before going to the `latch.await()`. This still blocks the `stop()` call, but allows the other lifecycle conditions to be fulfilled, even from different threads --- .../AbstractMessageListenerContainer.java | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 2721b18c7e..80c732f46e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -75,7 +75,7 @@ */ public abstract class AbstractMessageListenerContainer implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, - ApplicationContextAware { + ApplicationContextAware { /** * The default {@link org.springframework.context.SmartLifecycle} phase for listener @@ -143,7 +143,6 @@ public abstract class AbstractMessageListenerContainer @Nullable private KafkaAdmin kafkaAdmin; - /** * Construct an instance with the provided factory and properties. * @param consumerFactory the factory. @@ -609,28 +608,36 @@ public final void stop() { * @since 2.3.8 */ public final void stop(boolean wait) { - this.lifecycleLock.lock(); - try { - if (isRunning()) { - if (wait) { - final CountDownLatch latch = new CountDownLatch(1); + if (isRunning()) { + if (wait) { + final CountDownLatch latch = new CountDownLatch(1); + this.lifecycleLock.lock(); + try { + doStop(latch::countDown); - try { - latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR - publishContainerStoppedEvent(); - } - catch (@SuppressWarnings("unused") InterruptedException e) { - Thread.currentThread().interrupt(); - } } - else { + finally { + this.lifecycleLock.unlock(); + } + try { + latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR + publishContainerStoppedEvent(); + } + catch (@SuppressWarnings("unused") InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + else { + this.lifecycleLock.lock(); + try { doStop(this::publishContainerStoppedEvent); } + finally { + this.lifecycleLock.unlock(); + } + } } - finally { - this.lifecycleLock.unlock(); - } } @Override @@ -706,7 +713,7 @@ public void onPartitionsAssigned(Collection partitions) { @Override public void onPartitionsLost(Collection partitions) { AbstractMessageListenerContainer.this.logger.info(() -> - getGroupId() + ": partitions lost: " + partitions); + getGroupId() + ": partitions lost: " + partitions); } };