Skip to content

Commit

Permalink
hold exception until the finish of all callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
abbccdda committed Feb 27, 2020
1 parent bbdbb84 commit d6bfdb8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
Expand Down Expand Up @@ -333,7 +334,7 @@ protected void onJoinComplete(int generation,
ByteBuffer assignmentBuffer) {
log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);

// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
// Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;

Expand Down Expand Up @@ -377,12 +378,12 @@ protected void onJoinComplete(int generation,
);

if (!revokedPartitions.isEmpty()) {
// revoke partitions that were previously owned but no longer assigned;
// Revoke partitions that were previously owned but no longer assigned;
// note that we should only change the assignment (or update the assignor's state)
// AFTER we've triggered the revoke callback
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));

// if revoked any partitions, need to re-join the group afterwards
// If revoked any partitions, need to re-join the group afterwards
log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);
requestRejoin();
}
Expand All @@ -392,21 +393,33 @@ protected void onJoinComplete(int generation,
// were not explicitly requested, so we update the joined subscription here.
maybeUpdateJoinedSubscription(assignedPartitions);

// give the assignor a chance to update internal state based on the received assignment
// Give the assignor a chance to update internal state based on the received assignment
groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);
assignor.onAssignment(assignment, groupMetadata);

// reschedule the auto commit starting from now
// Catch any exception here to make sure we could complete the user callback.
try {
assignor.onAssignment(assignment, groupMetadata);
} catch (Exception e) {
firstException.compareAndSet(null, e);
}

// Reschedule the auto commit starting from now
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);

subscriptions.assignFromSubscribed(assignedPartitions);

// add partitions that were not previously owned but are now assigned
// Add partitions that were not previously owned but are now assigned
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
Exception exceptionCaught = firstException.get();

if (firstException.get() != null)
throw new KafkaException("User rebalance callback throws an error", firstException.get());
if (exceptionCaught != null) {
if (exceptionCaught instanceof KafkaException) {
throw (KafkaException) exceptionCaught;
} else {
throw new KafkaException("User rebalance callback throws an error", exceptionCaught);
}
}
}

void maybeUpdateSubscriptionMetadata() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
Expand Down Expand Up @@ -199,11 +200,18 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,

if (!taskCloseExceptions.isEmpty()) {
final Map.Entry<TaskId, RuntimeException> first = taskCloseExceptions.entrySet().iterator().next();
throw new RuntimeException(
"Unexpected failure to close " + taskCloseExceptions.size() +
" task(s) [" + taskCloseExceptions.keySet() + "]. " +
"First exception (for task " + first.getKey() + ") follows.", first.getValue()
);
for (Map.Entry<TaskId, RuntimeException> entry : taskCloseExceptions.entrySet()) {
if (!(entry.getValue() instanceof TaskMigratedException)) {
throw new RuntimeException(
"Unexpected failure to close " + taskCloseExceptions.size() +
" task(s) [" + taskCloseExceptions.keySet() + "]. " +
"First unexpected exception (for task " + entry.getKey() + ") follows.", entry.getValue()
);
}
}

// If all exceptions are task-migrated, we would just throw the first one.
throw first.getValue();
}

if (!activeTasksToCreate.isEmpty()) {
Expand Down Expand Up @@ -311,7 +319,13 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {

for (final TaskId taskId : revokedTasks) {
final Task task = tasks.get(taskId);
task.suspend();
try {
task.suspend();
} catch (RuntimeException e) {
log.error("Failed to suspend task {} due to {}, Closing it uncleanly.", e, task.id());

task.closeDirty();
}
}
}

Expand Down

0 comments on commit d6bfdb8

Please sign in to comment.