From 41f9d7a82b36b3040551efa549202457aa9c5b0a Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 4 Mar 2020 13:54:07 -0800 Subject: [PATCH] MINOR: Add an extra check in StreamThreadTest (#8214) During the handle-corruption logic, we first remove the partitions from the changelog reader (and hence from the restore consumer), and then add them back during the task.revive() function. During this period the test main thread may happen to call addRecords which could throw IllegalStateException: a race condition. The fix here is to wait for a position() call to return 0, which means the partitions have been added back to the restore consumer, and the seek-to-beginning has been called as well. Reviewers: Matthias J. Sax --- .../kafka/streams/processor/internals/StreamThreadTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5651c4e4909c9..57cc1ed2073a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -1498,6 +1498,12 @@ public Set partitions() { } }); + // after handling the exception and reviving the task, the position + // should be reset to the beginning. + TestUtils.waitForCondition( + () -> mockRestoreConsumer.position(changelogPartition) == 0L, + "Never restore first record"); + mockRestoreConsumer.addRecord(new ConsumerRecord<>( "stream-thread-test-count-changelog", 0,