From fe0b704285ebc916ce5080a5248d91b4dc3c60e0 Mon Sep 17 00:00:00 2001 From: Boyang Chen Date: Sat, 7 Mar 2020 08:08:23 -0800 Subject: [PATCH] KAFKA-9645: Fallback to unsubscribe during Task Migrated (#8220) After #7312, we could still return data during the rebalance phase, which means it could be possible to find records without corresponding tasks. We have to fallback to the unsubscribe mode during task migrated as the assignment should be cleared out to keep sync with task manager state. Reviewers: A. Sophie Blee-Goldman , Guozhang Wang --- .../kafka/clients/consumer/MockConsumer.java | 24 ++++---- .../processor/internals/StreamThread.java | 5 +- .../processor/internals/StreamThreadTest.java | 61 +++++++++++++++++++ 3 files changed, 76 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index b8579c45dff20..33702c3144e37 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -66,6 +66,7 @@ public class MockConsumer implements Consumer { private KafkaException offsetsException; private AtomicBoolean wakeup; private boolean closed; + private boolean shouldRebalance; public MockConsumer(OffsetResetStrategy offsetResetStrategy) { this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy); @@ -79,6 +80,7 @@ public MockConsumer(OffsetResetStrategy offsetResetStrategy) { this.pollException = null; this.wakeup = new AtomicBoolean(false); this.committed = new HashMap<>(); + this.shouldRebalance = false; } @Override @@ -356,21 +358,10 @@ public synchronized void seekToEnd(Collection partitions) { subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.LATEST); } - // needed for cases where you make a second call to endOffsets - public synchronized void addEndOffsets(final Map newOffsets) { - innerUpdateEndOffsets(newOffsets, false); - } - public synchronized void updateEndOffsets(final Map newOffsets) { - innerUpdateEndOffsets(newOffsets, true); - } - - private void innerUpdateEndOffsets(final Map newOffsets, - final boolean replace) { - for (final Map.Entry entry : newOffsets.entrySet()) { List offsets = endOffsets.get(entry.getKey()); - if (replace || offsets == null) { + if (offsets == null) { offsets = new ArrayList<>(); } offsets.add(entry.getValue()); @@ -568,6 +559,15 @@ public ConsumerGroupMetadata groupMetadata() { @Override public void enforceRebalance() { + shouldRebalance = true; + } + + public boolean shouldRebalance() { + return shouldRebalance; + } + + public void resetShouldRebalance() { + shouldRebalance = false; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index d67fff8577ca4..7b6ac833f6452 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -508,11 +508,12 @@ private void runLoop() { "Will close out all assigned tasks and rejoin the consumer group."); taskManager.handleLostAll(); - mainConsumer.enforceRebalance(); + mainConsumer.unsubscribe(); + subscribeConsumer(); } } } - + private void subscribeConsumer() { if (builder.usesPatternSubscription()) { mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 13a669d0112a0..1bb4c9c6c71cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.MockRebalanceListener; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Cluster; @@ -807,6 +808,66 @@ public void shouldShutdownTaskManagerOnClose() { EasyMock.verify(taskManager); } + @Test + public void shouldNotReturnDataAfterTaskMigrated() { + final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); + + internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class); + + EasyMock.expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2); + + final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + consumer.subscribe(Collections.singletonList(topic1), new MockRebalanceListener()); + consumer.rebalance(Collections.singletonList(t1p1)); + consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L)); + consumer.seekToEnd(Collections.singletonList(t1p1)); + + final ChangelogReader changelogReader = new MockChangelogReader() { + + @Override + public void restore() { + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0])); + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0])); + + throw new TaskMigratedException( + "Changelog restore found task migrated", new RuntimeException("restore task migrated")); + } + }; + + taskManager.handleLostAll(); + + EasyMock.replay(taskManager, internalTopologyBuilder); + + final StreamsMetricsImpl streamsMetrics = + new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST); + + final StreamThread thread = new StreamThread( + mockTime, + config, + null, + consumer, + consumer, + changelogReader, + null, + taskManager, + streamsMetrics, + internalTopologyBuilder, + CLIENT_ID, + new LogContext(""), + new AtomicInteger() + ).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID)); + + final IllegalStateException thrown = assertThrows( + IllegalStateException.class, thread::run); + + EasyMock.verify(taskManager); + + // The Mock consumer shall throw as the assignment has been wiped out, but records are assigned. + assertEquals("No current assignment for partition topic1-1", thrown.getMessage()); + assertFalse(consumer.shouldRebalance()); + } + @Test public void shouldShutdownTaskManagerOnCloseWithoutStart() { final Consumer consumer = EasyMock.createNiceMock(Consumer.class);