Skip to content

Commit

Permalink
KAFKA-9623: Keep polling until the task manager is no longer rebalanc…
Browse files Browse the repository at this point in the history
…ing in progress (#8190)

This bug is found via the flaky SmokeTestDriverIntegrationTest. Without this PR the test fails every 3-4 times, after this issue is fixed we've run the test 20+ locally without error.

Reviewers: A. Sophie Blee-Goldman <[email protected]>, John Roesler <[email protected]>
  • Loading branch information
guozhangwang authored Feb 28, 2020
1 parent ede0730 commit e0551ac
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,9 @@ public void run() {
private void runLoop() {
subscribeConsumer();

while (isRunning()) {
// if the thread is still in the middle of a rebalance, we should keep polling
// until the rebalance is completed before we close and commit the tasks
while (isRunning() || taskManager.isRebalanceInProgress()) {
try {
runOnce();
if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
Expand Down Expand Up @@ -806,6 +808,10 @@ void runOnce() {
// try to fetch some records with normal poll time
// in order to get long polling
records = pollRequests(pollTime);
} else if (state == State.PENDING_SHUTDOWN) {
// we are only here because there's rebalance in progress,
// just poll with zero to complete it
records = pollRequests(Duration.ZERO);
} else {
// any other state should not happen
log.error("Unexpected state {} during normal iteration", state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
log.error("Received error code {} - shutdown", streamThread.getAssignmentErrorCode());
streamThread.shutdown();
} else {
taskManager.handleRebalanceComplete();

streamThread.setState(State.PARTITIONS_ASSIGNED);
}

taskManager.handleRebalanceComplete();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ InternalTopologyBuilder builder() {
return builder;
}

boolean isRebalanceInProgress() {
return rebalanceInProgress;
}

void handleRebalanceStart(final Set<String> subscribedTopics) {
builder.addSubscribedTopicsFromMetadata(subscribedTopics, logPrefix);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,51 @@ public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable()
assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);
}

@Test
public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);

final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);

thread.start();
TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.STARTING,
10 * 1000,
"Thread never started.");

thread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
thread.taskManager().handleRebalanceStart(Collections.singleton(topic1));

final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>();
final List<TopicPartition> assignedPartitions = new ArrayList<>();

// assign single partition
assignedPartitions.add(t1p1);
assignedPartitions.add(t1p2);
activeTasks.put(task1, Collections.singleton(t1p1));
activeTasks.put(task2, Collections.singleton(t1p2));

thread.taskManager().handleAssignment(activeTasks, Collections.emptyMap());

thread.shutdown();

// even if thread is no longer running, it should still be polling
// as long as the rebalance is still ongoing
assertFalse(thread.isRunning());

Thread.sleep(1000);
assertEquals(Utils.mkSet(task1, task2), thread.taskManager().activeTaskIds());
assertEquals(StreamThread.State.PENDING_SHUTDOWN, thread.state());

thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);

TestUtils.waitForCondition(
() -> thread.state() == StreamThread.State.DEAD,
10 * 1000,
"Thread never shut down.");
assertEquals(Collections.emptySet(), thread.taskManager().activeTaskIds());
}

@Test
public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException {
internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
Expand Down

0 comments on commit e0551ac

Please sign in to comment.