diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d67fff8577ca4..7b6ac833f6452 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -508,11 +508,12 @@ private void runLoop() { "Will close out all assigned tasks and rejoin the consumer group."); taskManager.handleLostAll(); - mainConsumer.enforceRebalance(); + mainConsumer.unsubscribe(); + subscribeConsumer(); } } } - + private void subscribeConsumer() { if (builder.usesPatternSubscription()) { mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);