diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5651c4e4909c9..57cc1ed2073a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1498,6 +1498,12 @@ public Set partitions() { } }); + // after handling the exception and reviving the task, the position + // should be reset to the beginning. + TestUtils.waitForCondition( + () -> mockRestoreConsumer.position(changelogPartition) == 0L, + "Never restore first record"); + mockRestoreConsumer.addRecord(new ConsumerRecord<>( "stream-thread-test-count-changelog", 0,