Skip to content

Commit

Permalink
KAFKA-9645: Fallback to unsubscribe during Task Migrated (#8220)
Browse files Browse the repository at this point in the history
After #7312, we could still return data during the rebalance phase, which means it could be possible to find records without corresponding tasks. We have to fallback to the unsubscribe mode during task migrated as the assignment should be cleared out to keep sync with task manager state.

Reviewers: A. Sophie Blee-Goldman <[email protected]>, Guozhang Wang <[email protected]>
  • Loading branch information
Boyang Chen authored Mar 7, 2020
1 parent 2775572 commit fe0b704
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
private KafkaException offsetsException;
private AtomicBoolean wakeup;
private boolean closed;
private boolean shouldRebalance;

public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.subscriptions = new SubscriptionState(new LogContext(), offsetResetStrategy);
Expand All @@ -79,6 +80,7 @@ public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.pollException = null;
this.wakeup = new AtomicBoolean(false);
this.committed = new HashMap<>();
this.shouldRebalance = false;
}

@Override
Expand Down Expand Up @@ -356,21 +358,10 @@ public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
subscriptions.requestOffsetReset(partitions, OffsetResetStrategy.LATEST);
}

// needed for cases where you make a second call to endOffsets
public synchronized void addEndOffsets(final Map<TopicPartition, Long> newOffsets) {
innerUpdateEndOffsets(newOffsets, false);
}

public synchronized void updateEndOffsets(final Map<TopicPartition, Long> newOffsets) {
innerUpdateEndOffsets(newOffsets, true);
}

private void innerUpdateEndOffsets(final Map<TopicPartition, Long> newOffsets,
final boolean replace) {

for (final Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) {
List<Long> offsets = endOffsets.get(entry.getKey());
if (replace || offsets == null) {
if (offsets == null) {
offsets = new ArrayList<>();
}
offsets.add(entry.getValue());
Expand Down Expand Up @@ -568,6 +559,15 @@ public ConsumerGroupMetadata groupMetadata() {

@Override
public void enforceRebalance() {
shouldRebalance = true;
}

public boolean shouldRebalance() {
return shouldRebalance;
}

public void resetShouldRebalance() {
shouldRebalance = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,12 @@ private void runLoop() {
"Will close out all assigned tasks and rejoin the consumer group.");

taskManager.handleLostAll();
mainConsumer.enforceRebalance();
mainConsumer.unsubscribe();
subscribeConsumer();
}
}
}

private void subscribeConsumer() {
if (builder.usesPatternSubscription()) {
mainConsumer.subscribe(builder.sourceTopicPattern(), rebalanceListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
Expand Down Expand Up @@ -807,6 +808,66 @@ public void shouldShutdownTaskManagerOnClose() {
EasyMock.verify(taskManager);
}

@Test
public void shouldNotReturnDataAfterTaskMigrated() {
final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);

internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class);

EasyMock.expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2);

final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.LATEST);

consumer.subscribe(Collections.singletonList(topic1), new MockRebalanceListener());
consumer.rebalance(Collections.singletonList(t1p1));
consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L));
consumer.seekToEnd(Collections.singletonList(t1p1));

final ChangelogReader changelogReader = new MockChangelogReader() {

@Override
public void restore() {
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new byte[0], new byte[0]));
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new byte[1], new byte[0]));

throw new TaskMigratedException(
"Changelog restore found task migrated", new RuntimeException("restore task migrated"));
}
};

taskManager.handleLostAll();

EasyMock.replay(taskManager, internalTopologyBuilder);

final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);

final StreamThread thread = new StreamThread(
mockTime,
config,
null,
consumer,
consumer,
changelogReader,
null,
taskManager,
streamsMetrics,
internalTopologyBuilder,
CLIENT_ID,
new LogContext(""),
new AtomicInteger()
).updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));

final IllegalStateException thrown = assertThrows(
IllegalStateException.class, thread::run);

EasyMock.verify(taskManager);

// The Mock consumer shall throw as the assignment has been wiped out, but records are assigned.
assertEquals("No current assignment for partition topic1-1", thrown.getMessage());
assertFalse(consumer.shouldRebalance());
}

@Test
public void shouldShutdownTaskManagerOnCloseWithoutStart() {
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
Expand Down

0 comments on commit fe0b704

Please sign in to comment.