Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9620: Do not throw in the middle of consumer user callbacks #8187

Merged
merged 6 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ protected void onJoinComplete(int generation,
ByteBuffer assignmentBuffer) {
log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);

// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
// Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;

Expand Down Expand Up @@ -377,12 +377,12 @@ protected void onJoinComplete(int generation,
);

if (!revokedPartitions.isEmpty()) {
// revoke partitions that were previously owned but no longer assigned;
// Revoke partitions that were previously owned but no longer assigned;
// note that we should only change the assignment (or update the assignor's state)
// AFTER we've triggered the revoke callback
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));

// if revoked any partitions, need to re-join the group afterwards
// If revoked any partitions, need to re-join the group afterwards
log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);
requestRejoin();
}
Expand All @@ -392,21 +392,32 @@ protected void onJoinComplete(int generation,
// were not explicitly requested, so we update the joined subscription here.
maybeUpdateJoinedSubscription(assignedPartitions);

// give the assignor a chance to update internal state based on the received assignment
// Give the assignor a chance to update internal state based on the received assignment
groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
assignor.onAssignment(assignment, groupMetadata);

// reschedule the auto commit starting from now
// Catch any exception here to make sure we could complete the user callback.
try {
assignor.onAssignment(assignment, groupMetadata);
} catch (Exception e) {
firstException.compareAndSet(null, e);
}

// Reschedule the auto commit starting from now
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);

subscriptions.assignFromSubscribed(assignedPartitions);

// add partitions that were not previously owned but are now assigned
// Add partitions that were not previously owned but are now assigned
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));

if (firstException.get() != null)
throw new KafkaException("User rebalance callback throws an error", firstException.get());
if (firstException.get() != null) {
if (firstException.get() instanceof KafkaException) {
throw (KafkaException) firstException.get();
} else {
throw new KafkaException("User rebalance callback throws an error", firstException.get());
}
}
}

void maybeUpdateSubscriptionMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ static void wipeStateStores(final Logger log, final ProcessorStateManager stateM
Utils.delete(stateMgr.baseDir());
} catch (final IOException fatalException) {
// since it is only called under dirty close, we always swallow the exception
log.warn("Failed to wiping state stores for task {}", stateMgr.taskId());
log.warn("Failed to wiping state stores for task {} due to {}", stateMgr.taskId(), fatalException);
Copy link
Contributor Author

@abbccdda abbccdda Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a logging to log the IO exception.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,19 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
}

if (!taskCloseExceptions.isEmpty()) {
for (final Map.Entry<TaskId, RuntimeException> entry : taskCloseExceptions.entrySet()) {
if (!(entry.getValue() instanceof TaskMigratedException)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use instanceof KafkaException as well here: if it is any inheritance of KafkaException, we should not wrap it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

throw new RuntimeException(
"Unexpected failure to close " + taskCloseExceptions.size() +
" task(s) [" + taskCloseExceptions.keySet() + "]. " +
"First unexpected exception (for task " + entry.getKey() + ") follows.", entry.getValue()
);
}
}

final Map.Entry<TaskId, RuntimeException> first = taskCloseExceptions.entrySet().iterator().next();
throw new RuntimeException(
"Unexpected failure to close " + taskCloseExceptions.size() +
" task(s) [" + taskCloseExceptions.keySet() + "]. " +
"First exception (for task " + first.getKey() + ") follows.", first.getValue()
);
// If all exceptions are task-migrated, we would just throw the first one.
throw first.getValue();
}

if (!activeTasksToCreate.isEmpty()) {
Expand Down Expand Up @@ -293,12 +300,11 @@ boolean tryToCompleteRestoration() {
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
final Set<TaskId> revokedTasks = new HashSet<>();
final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions);

for (final Task task : tasks.values()) {
if (remainingPartitions.containsAll(task.inputPartitions())) {
revokedTasks.add(task.id());
task.suspend();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify the logic here as we no longer throws before task suspend

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I follow: we can still throw from inside suspend right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that we do not throw if remaining partitions are not empty anymore. We could directly suspend a task here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

}
remainingPartitions.removeAll(task.inputPartitions());
}
Expand All @@ -308,11 +314,6 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
"due to race condition of consumer detecting the heartbeat failure, or the tasks " +
"have been cleaned up by the handleAssignment callback.", remainingPartitions);
}

for (final TaskId taskId : revokedTasks) {
final Task task = tasks.get(taskId);
task.suspend();
}
}

/**
Expand Down