diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index b8579c45dff20..33702c3144e37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -66,6 +66,7 @@ public class MockConsumer implements Consumer { private KafkaException offsetsException; private AtomicBoolean wakeup; private boolean closed; + private boolean shouldRebalance; public MockConsumer(OffsetResetStrategy offsetResetStrategy) { this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy); @@ -79,6 +80,7 @@ public MockConsumer(OffsetResetStrategy offsetResetStrategy) { this.pollException = null; this.wakeup = new AtomicBoolean(false); this.committed = new HashMap<>(); + this.shouldRebalance = false; } @Override @@ -356,21 +358,10 @@ public synchronized void seekToEnd(Collection partitions) { subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.LATEST); } - // needed for cases where you make a second call to endOffsets - public synchronized void addEndOffsets(final Map newOffsets) { - innerUpdateEndOffsets(newOffsets, false); - } - public synchronized void updateEndOffsets(final Map newOffsets) { - innerUpdateEndOffsets(newOffsets, true); - } - - private void innerUpdateEndOffsets(final Map newOffsets, - final boolean replace) { - for (final Map.Entry entry : newOffsets.entrySet()) { List offsets = endOffsets.get(entry.getKey()); - if (replace || offsets == null) { + if (offsets == null) { offsets = new ArrayList<>(); } offsets.add(entry.getValue()); @@ -568,6 +559,15 @@ public ConsumerGroupMetadata groupMetadata() { @Override public void enforceRebalance() { + shouldRebalance = true; + } + + public boolean shouldRebalance() { + return shouldRebalance; + } + + public void resetShouldRebalance() { + shouldRebalance = false; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d67fff8577ca4..7b6ac833f6452 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -508,11 +508,12 @@ private void runLoop() { "Will close out all assigned tasks and rejoin the consumer group."); taskManager.handleLostAll(); - mainConsumer.enforceRebalance(); + mainConsumer.unsubscribe(); + subscribeConsumer(); } } } - + private void subscribeConsumer() { if (builder.usesPatternSubscription()) { mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 13a669d0112a0..1bb4c9c6c71cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; @@ -807,6 +808,66 @@ public void shouldShutdownTaskManagerOnClose() { EasyMock.verify(taskManager); } + @Test + public void shouldNotReturnDataAfterTaskMigrated() { + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + + internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class); + + EasyMock.expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2); + + final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + consumer.subscribe(Collections.singletonList(topic1), new MockRebalanceListener()); + consumer.rebalance(Collections.singletonList(t1p1)); + consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L)); + consumer.seekToEnd(Collections.singletonList(t1p1)); + + final ChangelogReader changelogReader = new MockChangelogReader() { + + @Override + public void restore() { + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0])); + + throw new TaskMigratedException( + "Changelog restore found task migrated", new RuntimeException("restore task migrated")); + } + }; + + taskManager.handleLostAll(); + + EasyMock.replay(taskManager, internalTopologyBuilder); + + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST); + + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + consumer, + consumer, + changelogReader, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + CLIENT_ID, + new LogContext(""), + new AtomicInteger() + ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + + final IllegalStateException thrown = assertThrows( + IllegalStateException.class, thread::run); + + EasyMock.verify(taskManager); + + // The Mock consumer shall throw as the assignment has been wiped out, but records are assigned. + assertEquals("No current assignment for partition topic1-1", thrown.getMessage()); + assertFalse(consumer.shouldRebalance()); + } + @Test public void shouldShutdownTaskManagerOnCloseWithoutStart() { final Consumer consumer = EasyMock.createNiceMock(Consumer.class);