Skip to content

Commit

Permalink
simplify revocation
Browse files Browse the repository at this point in the history
  • Loading branch information
abbccdda committed Feb 27, 2020
1 parent d6bfdb8 commit 9e51267
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
Expand Down Expand Up @@ -411,13 +410,12 @@ protected void onJoinComplete(int generation,

// Add partitions that were not previously owned but are now assigned
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
Exception exceptionCaught = firstException.get();

if (exceptionCaught != null) {
if (exceptionCaught instanceof KafkaException) {
throw (KafkaException) exceptionCaught;
if (firstException.get() != null) {
if (firstException.get() instanceof KafkaException) {
throw (KafkaException) firstException.get();
} else {
throw new KafkaException("User rebalance callback throws an error", exceptionCaught);
throw new KafkaException("User rebalance callback throws an error", firstException.get());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
Expand Down Expand Up @@ -199,8 +198,7 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
}

if (!taskCloseExceptions.isEmpty()) {
final Map.Entry<TaskId, RuntimeException> first = taskCloseExceptions.entrySet().iterator().next();
for (Map.Entry<TaskId, RuntimeException> entry : taskCloseExceptions.entrySet()) {
for (final Map.Entry<TaskId, RuntimeException> entry : taskCloseExceptions.entrySet()) {
if (!(entry.getValue() instanceof TaskMigratedException)) {
throw new RuntimeException(
"Unexpected failure to close " + taskCloseExceptions.size() +
Expand All @@ -210,6 +208,7 @@ public void handleAssignment(final Map<TaskId, Set<TopicPartition>> activeTasks,
}
}

final Map.Entry<TaskId, RuntimeException> first = taskCloseExceptions.entrySet().iterator().next();
// If all exceptions are task-migrated, we would just throw the first one.
throw first.getValue();
}
Expand Down Expand Up @@ -301,12 +300,11 @@ boolean tryToCompleteRestoration() {
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
final Set<TaskId> revokedTasks = new HashSet<>();
final Set<TopicPartition> remainingPartitions = new HashSet<>(revokedPartitions);

for (final Task task : tasks.values()) {
if (remainingPartitions.containsAll(task.inputPartitions())) {
revokedTasks.add(task.id());
task.suspend();
}
remainingPartitions.removeAll(task.inputPartitions());
}
Expand All @@ -316,17 +314,6 @@ void handleRevocation(final Collection<TopicPartition> revokedPartitions) {
"due to race condition of consumer detecting the heartbeat failure, or the tasks " +
"have been cleaned up by the handleAssignment callback.", remainingPartitions);
}

for (final TaskId taskId : revokedTasks) {
final Task task = tasks.get(taskId);
try {
task.suspend();
} catch (RuntimeException e) {
log.error("Failed to suspend task {} due to {}, Closing it uncleanly.", e, task.id());

task.closeDirty();
}
}
}

/**
Expand Down

0 comments on commit 9e51267

Please sign in to comment.